Compare commits

...

27 Commits

Author SHA1 Message Date
Vlad Lazar
b2cb10590e fixup: deserialize shenanigans 2024-09-12 20:00:06 +01:00
Vlad Lazar
2923fd2a5b fixup: remove stale import 2024-09-12 19:25:46 +01:00
Vlad Lazar
2a5336b9ab fixup image deserialization 2024-09-12 19:24:41 +01:00
Christian Schwarz
6f20726610 Merge remote-tracking branch 'origin/hackaneon/lisbon24/superscalar-page_service--problame/evaluate-debouncer' into hackaneon/lisbon24/superscalar-page_service 2024-09-12 17:47:16 +00:00
Christian Schwarz
29f741e1e9 debounce: actually issue vectored get 2024-09-12 17:46:30 +00:00
Vlad Lazar
2b37a40079 Materialize future ios 2024-09-12 18:25:17 +01:00
Vlad Lazar
af2b65a2fb Rework issuing of IOs on read path 2024-09-12 16:42:35 +01:00
Christian Schwarz
5d194c7824 debounce: bounce if shard or effective request_lsn differ 2024-09-12 14:20:32 +00:00
Christian Schwarz
ac2702afd3 deboucner: move decoding into debounce loop 2024-09-12 10:58:09 +00:00
Christian Schwarz
88fd46d795 sketch interface 2024-09-12 11:35:00 +01:00
Christian Schwarz
2d6763882e pagebench: fake queue depth of 10 2024-09-12 11:35:00 +01:00
Christian Schwarz
c0c23cde72 debouncer 2024-09-12 11:35:00 +01:00
Christian Schwarz
942bc9544b fixup 2024-09-11 20:04:39 +00:00
Christian Schwarz
02b7cdb305 HACK: instrument page_service to count nonblocking consecutive getpage requests 2024-09-11 19:25:19 +01:00
Alexander Bayandin
7d7d1f354b Fix rust warnings on macOS (#8955)
## Problem
```
error: unused import: `anyhow::Context`
 --> libs/utils/src/crashsafe.rs:8:5
  |
8 | use anyhow::Context;
  |     ^^^^^^^^^^^^^^^
  |
  = note: `-D unused-imports` implied by `-D warnings`
  = help: to override `-D warnings` add `#[allow(unused_imports)]`

error: unused variable: `fd`
   --> libs/utils/src/crashsafe.rs:209:15
    |
209 | pub fn syncfs(fd: impl AsRawFd) -> anyhow::Result<()> {
    |               ^^ help: if this is intentional, prefix it with an underscore: `_fd`
    |
    = note: `-D unused-variables` implied by `-D warnings`
    = help: to override `-D warnings` add `#[allow(unused_variables)]`
```

## Summary of changes
- Fix rust warnings on macOS
2024-09-07 08:17:25 +01:00
Cihan Demirci
16c200d6d9 push images to prod ACR (#8940)
Used `vars` for new storing non-sensitive information, changed dev
secrets to vars as well but
didn't cleanup any secrets.

https://github.com/neondatabase/cloud/issues/16925

---------

Co-authored-by: Alexander Bayandin <alexander@neon.tech>
2024-09-07 00:20:36 +01:00
Joonas Koivunen
3dbd34aa78 feat(storcon): forward gc blocking and unblocking (#8956)
Currently using gc blocking and unblocking with storage controller
managed pageservers is painful. Implement the API on storage controller.

Fixes: #8893
2024-09-06 22:42:55 +01:00
Arpad Müller
fa3fc73c1b Address 1.82 clippy lints (#8944)
Addresses the clippy lints of the beta 1.82 toolchain.

The `too_long_first_doc_paragraph` lint complained a lot and was
addressed separately: #8941
2024-09-06 21:05:18 +02:00
Alex Chi Z.
ac5815b594 feat(storage-controller): add node shards api (#8896)
For control-plane managed tenants, we have the page in the admin console
that lists all tenants on a specific pageserver. But for
storage-controller managed ones, we don't have that functionality for
now.

## Summary of changes

Adds an API that lists all shards on a given node (intention + observed)

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-09-06 14:14:21 -04:00
Alexander Bayandin
30583cb626 CI(label-for-external-users): add retry logic for unexpected errors (#8938)
## Problem

One of the PRs opened by a `neondatabase` org member got labelled as
`external` because the `gh api` call failed in the wrong way:
```
Get "https://api.github.com/orgs/neondatabase/members/<username>": dial tcp 140.82.114.5:443: i/o timeout
is-member=false
```

## Summary of changes
- Check that the error message is expected before labelling PRs
- Retry `gh api` call for 10 times in case of unexpected error messages
- Add `workflow_dispatch` trigger
2024-09-06 17:42:35 +01:00
Arseny Sher
c1a51416db safekeeper: fsync filesystem on start.
We can't really rely on files contents after boot without fsync'ing
them.
2024-09-06 19:14:25 +03:00
Arseny Sher
8eab7009c1 safekeeper: do pid file lock before id init 2024-09-06 19:14:25 +03:00
Arseny Sher
11cf16e3f3 safekeeper: add term_bump endpoint.
When walproposer observes now higher term it restarts instead of
crashing whole compute with PANIC; this avoids compute crash after
term_bump call. After successfull election we're still checking
last_log_term of the highest given vote to ensure basebackup is good,
and PANIC otherwise.

It will be used for migration per
035-safekeeper-dynamic-membership-change.md
and
https://github.com/neondatabase/docs/pull/21

ref https://github.com/neondatabase/neon/issues/8700
2024-09-06 19:13:50 +03:00
Folke Behrens
af6f63617e proxy: clean up code and lints for 1.81 and 1.82 (#8945) 2024-09-06 17:13:30 +02:00
Arseny Sher
e287f36a05 safekeeper: fix endpoint restart immediately after xlog switch.
Check that truncation point is not from the future by comparing it with
write_record_lsn, not write_lsn, and explain that xlog switch changes
their normal order.

ref https://github.com/neondatabase/neon/issues/8911
2024-09-06 18:09:21 +03:00
Arpad Müller
cbcd4058ed Fix 1.82 clippy lint too_long_first_doc_paragraph (#8941)
Addresses the 1.82 beta clippy lint `too_long_first_doc_paragraph` by
adding newlines to the first sentence if it is short enough, and making
a short first sentence if there is the need.
2024-09-06 14:33:52 +02:00
Vlad Lazar
e86fef05dd storcon: track preferred AZ for each tenant shard (#8937)
## Problem
We want to do AZ aware scheduling, but don't have enough metadata.

## Summary of changes
Introduce a `preferred_az_id` concept for each managed tenant shard.

In a future PR, the scheduler will use this as a soft preference. 
The idea is to try and keep the shard attachments within the same AZ.
Under the assumption that the compute was placed in the correct AZ,
this reduces the chances of cross AZ trafic from between compute and PS.

In terms of code changes we:
1. Add a new nullable `preferred_az_id` column to the `tenant_shards`
table. Also include an in-memory counterpart.
2. Populate the preferred az on tenant creation and shard splits.
3. Add an endpoint which allows to bulk-set preferred AZs.

(3) gives us the migration path. I'll write a script which queries the
cplane db in the region and sets the preferred az of all shards with an 
active compute to the AZ of said compute. For shards without an active compute, 
I'll use the AZ of the currently attached pageserver
since this is what cplane uses now to schedule computes.
2024-09-06 13:11:17 +01:00
96 changed files with 2188 additions and 717 deletions

View File

@@ -7,6 +7,13 @@ self-hosted-runner:
- small-arm64
- us-east-2
config-variables:
- AZURE_DEV_CLIENT_ID
- AZURE_DEV_REGISTRY_NAME
- AZURE_DEV_SUBSCRIPTION_ID
- AZURE_PROD_CLIENT_ID
- AZURE_PROD_REGISTRY_NAME
- AZURE_PROD_SUBSCRIPTION_ID
- AZURE_TENANT_ID
- BENCHMARK_PROJECT_ID_PUB
- BENCHMARK_PROJECT_ID_SUB
- REMOTE_STORAGE_AZURE_CONTAINER

56
.github/workflows/_push-to-acr.yml vendored Normal file
View File

@@ -0,0 +1,56 @@
name: Push images to ACR
on:
workflow_call:
inputs:
client_id:
description: Client ID of Azure managed identity or Entra app
required: true
type: string
image_tag:
description: Tag for the container image
required: true
type: string
images:
description: Images to push
required: true
type: string
registry_name:
description: Name of the container registry
required: true
type: string
subscription_id:
description: Azure subscription ID
required: true
type: string
tenant_id:
description: Azure tenant ID
required: true
type: string
jobs:
push-to-acr:
runs-on: ubuntu-22.04
permissions:
contents: read # This is required for actions/checkout
id-token: write # This is required for Azure Login to work.
steps:
- name: Azure login
uses: azure/login@6c251865b4e6290e7b78be643ea2d005bc51f69a # @v2.1.1
with:
client-id: ${{ inputs.client_id }}
subscription-id: ${{ inputs.subscription_id }}
tenant-id: ${{ inputs.tenant_id }}
- name: Login to ACR
run: |
az acr login --name=${{ inputs.registry_name }}
- name: Copy docker images to ACR ${{ inputs.registry_name }}
run: |
images='${{ inputs.images }}'
for image in ${images}; do
docker buildx imagetools create \
-t ${{ inputs.registry_name }}.azurecr.io/neondatabase/${image}:${{ inputs.image_tag }} \
neondatabase/${image}:${{ inputs.image_tag }}
done

View File

@@ -794,9 +794,6 @@ jobs:
docker compose -f ./docker-compose/docker-compose.yml down
promote-images:
permissions:
contents: read # This is required for actions/checkout
id-token: write # This is required for Azure Login to work.
needs: [ check-permissions, tag, test-images, vm-compute-node-image ]
runs-on: ubuntu-22.04
@@ -823,28 +820,6 @@ jobs:
neondatabase/vm-compute-node-${version}:${{ needs.tag.outputs.build-tag }}
done
- name: Azure login
if: github.ref_name == 'main'
uses: azure/login@6c251865b4e6290e7b78be643ea2d005bc51f69a # @v2.1.1
with:
client-id: ${{ secrets.AZURE_DEV_CLIENT_ID }}
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
subscription-id: ${{ secrets.AZURE_DEV_SUBSCRIPTION_ID }}
- name: Login to ACR
if: github.ref_name == 'main'
run: |
az acr login --name=neoneastus2
- name: Copy docker images to ACR-dev
if: github.ref_name == 'main'
run: |
for image in neon compute-tools {vm-,}compute-node-{v14,v15,v16}; do
docker buildx imagetools create \
-t neoneastus2.azurecr.io/neondatabase/${image}:${{ needs.tag.outputs.build-tag }} \
neondatabase/${image}:${{ needs.tag.outputs.build-tag }}
done
- name: Add latest tag to images
if: github.ref_name == 'main'
run: |
@@ -882,6 +857,30 @@ jobs:
369495373322.dkr.ecr.eu-central-1.amazonaws.com/${image}:${{ needs.tag.outputs.build-tag }}
done
push-to-acr-dev:
if: github.ref_name == 'main'
needs: [ tag, promote-images ]
uses: ./.github/workflows/_push-to-acr.yml
with:
client_id: ${{ vars.AZURE_DEV_CLIENT_ID }}
image_tag: ${{ needs.tag.outputs.build-tag }}
images: neon compute-tools vm-compute-node-v14 vm-compute-node-v15 vm-compute-node-v16 compute-node-v14 compute-node-v15 compute-node-v16
registry_name: ${{ vars.AZURE_DEV_REGISTRY_NAME }}
subscription_id: ${{ vars.AZURE_DEV_SUBSCRIPTION_ID }}
tenant_id: ${{ vars.AZURE_TENANT_ID }}
push-to-acr-prod:
if: github.ref_name == 'release'|| github.ref_name == 'release-proxy'
needs: [ tag, promote-images ]
uses: ./.github/workflows/_push-to-acr.yml
with:
client_id: ${{ vars.AZURE_PROD_CLIENT_ID }}
image_tag: ${{ needs.tag.outputs.build-tag }}
images: neon compute-tools vm-compute-node-v14 vm-compute-node-v15 vm-compute-node-v16 compute-node-v14 compute-node-v15 compute-node-v16
registry_name: ${{ vars.AZURE_PROD_REGISTRY_NAME }}
subscription_id: ${{ vars.AZURE_PROD_SUBSCRIPTION_ID }}
tenant_id: ${{ vars.AZURE_TENANT_ID }}
trigger-custom-extensions-build-and-wait:
needs: [ check-permissions, tag ]
runs-on: ubuntu-22.04
@@ -957,8 +956,8 @@ jobs:
exit 1
deploy:
needs: [ check-permissions, promote-images, tag, build-and-test-locally, trigger-custom-extensions-build-and-wait ]
if: github.ref_name == 'main' || github.ref_name == 'release'|| github.ref_name == 'release-proxy'
needs: [ check-permissions, promote-images, tag, build-and-test-locally, trigger-custom-extensions-build-and-wait, push-to-acr-dev, push-to-acr-prod ]
if: (github.ref_name == 'main' || github.ref_name == 'release' || github.ref_name == 'release-proxy') && !failure() && !cancelled()
runs-on: [ self-hosted, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest

View File

@@ -7,6 +7,11 @@ on:
pull_request_target:
types:
- opened
workflow_dispatch:
inputs:
github-actor:
description: 'GitHub username. If empty, the username of the current user will be used'
required: false
# No permission for GITHUB_TOKEN by default; the **minimal required** set of permissions should be granted in each job.
permissions: {}
@@ -26,12 +31,31 @@ jobs:
id: check-user
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
ACTOR: ${{ inputs.github-actor || github.actor }}
run: |
if gh api -H "Accept: application/vnd.github+json" -H "X-GitHub-Api-Version: 2022-11-28" "/orgs/${GITHUB_REPOSITORY_OWNER}/members/${GITHUB_ACTOR}"; then
is_member=true
else
is_member=false
fi
expected_error="User does not exist or is not a member of the organization"
output_file=output.txt
for i in $(seq 1 10); do
if gh api "/orgs/${GITHUB_REPOSITORY_OWNER}/members/${ACTOR}" \
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" > ${output_file}; then
is_member=true
break
elif grep -q "${expected_error}" ${output_file}; then
is_member=false
break
elif [ $i -eq 10 ]; then
title="Failed to get memmbership status for ${ACTOR}"
message="The latest GitHub API error message: '$(cat ${output_file})'"
echo "::error file=.github/workflows/label-for-external-users.yml,title=${title}::${message}"
exit 1
fi
sleep 1
done
echo "is-member=${is_member}" | tee -a ${GITHUB_OUTPUT}

View File

@@ -207,7 +207,7 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux
export PATH="$HOME/.cargo/bin:$PATH" && \
. "$HOME/.cargo/env" && \
cargo --version && rustup --version && \
rustup component add llvm-tools-preview rustfmt clippy && \
rustup component add llvm-tools rustfmt clippy && \
cargo install rustfilt --version ${RUSTFILT_VERSION} && \
cargo install cargo-hakari --version ${CARGO_HAKARI_VERSION} && \
cargo install cargo-deny --locked --version ${CARGO_DENY_VERSION} && \

View File

@@ -22,9 +22,10 @@ use compute_api::spec::{Database, GenericOption, GenericOptions, PgIdent, Role};
const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
/// Escape a string for including it in a SQL literal. Wrapping the result
/// with `E'{}'` or `'{}'` is not required, as it returns a ready-to-use
/// SQL string literal, e.g. `'db'''` or `E'db\\'`.
/// Escape a string for including it in a SQL literal.
///
/// Wrapping the result with `E'{}'` or `'{}'` is not required,
/// as it returns a ready-to-use SQL string literal, e.g. `'db'''` or `E'db\\'`.
/// See <https://github.com/postgres/postgres/blob/da98d005cdbcd45af563d0c4ac86d0e9772cd15f/src/backend/utils/adt/quote.c#L47>
/// for the original implementation.
pub fn escape_literal(s: &str) -> String {

View File

@@ -4,8 +4,8 @@ use std::{str::FromStr, time::Duration};
use clap::{Parser, Subcommand};
use pageserver_api::{
controller_api::{
NodeAvailabilityWrapper, NodeDescribeResponse, ShardSchedulingPolicy, TenantCreateRequest,
TenantDescribeResponse, TenantPolicyRequest,
NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse, ShardSchedulingPolicy,
TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
},
models::{
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
@@ -80,7 +80,10 @@ enum Command {
/// List nodes known to the storage controller
Nodes {},
/// List tenants known to the storage controller
Tenants {},
Tenants {
/// If this field is set, it will list the tenants on a specific node
node_id: Option<NodeId>,
},
/// Create a new tenant in the storage controller, and by extension on pageservers.
TenantCreate {
#[arg(long)]
@@ -403,7 +406,41 @@ async fn main() -> anyhow::Result<()> {
)
.await?;
}
Command::Tenants {} => {
Command::Tenants {
node_id: Some(node_id),
} => {
let describe_response = storcon_client
.dispatch::<(), NodeShardResponse>(
Method::GET,
format!("control/v1/node/{node_id}/shards"),
None,
)
.await?;
let shards = describe_response.shards;
let mut table = comfy_table::Table::new();
table.set_header([
"Shard",
"Intended Primary/Secondary",
"Observed Primary/Secondary",
]);
for shard in shards {
table.add_row([
format!("{}", shard.tenant_shard_id),
match shard.is_intended_secondary {
None => "".to_string(),
Some(true) => "Secondary".to_string(),
Some(false) => "Primary".to_string(),
},
match shard.is_observed_secondary {
None => "".to_string(),
Some(true) => "Secondary".to_string(),
Some(false) => "Primary".to_string(),
},
]);
}
println!("{table}");
}
Command::Tenants { node_id: None } => {
let mut resp = storcon_client
.dispatch::<(), Vec<TenantDescribeResponse>>(
Method::GET,

View File

@@ -68,6 +68,7 @@ macro_rules! register_uint_gauge {
static INTERNAL_REGISTRY: Lazy<Registry> = Lazy::new(Registry::new);
/// Register a collector in the internal registry. MUST be called before the first call to `gather()`.
///
/// Otherwise, we can have a deadlock in the `gather()` call, trying to register a new collector
/// while holding the lock.
pub fn register_internal(c: Box<dyn Collector>) -> prometheus::Result<()> {

View File

@@ -1,4 +1,4 @@
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::time::{Duration, Instant};
@@ -74,6 +74,17 @@ pub struct TenantPolicyRequest {
pub scheduling: Option<ShardSchedulingPolicy>,
}
#[derive(Serialize, Deserialize)]
pub struct ShardsPreferredAzsRequest {
#[serde(flatten)]
pub preferred_az_ids: HashMap<TenantShardId, String>,
}
#[derive(Serialize, Deserialize)]
pub struct ShardsPreferredAzsResponse {
pub updated: Vec<TenantShardId>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct TenantLocateResponseShard {
pub shard_id: TenantShardId,
@@ -101,6 +112,21 @@ pub struct TenantDescribeResponse {
pub config: TenantConfig,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct NodeShardResponse {
pub node_id: NodeId,
pub shards: Vec<NodeShard>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct NodeShard {
pub tenant_shard_id: TenantShardId,
/// Whether the shard is observed secondary on a specific node. True = yes, False = no, None = not on this node.
pub is_observed_secondary: Option<bool>,
/// Whether the shard is intended to be a secondary on a specific node. True = yes, False = no, None = not on this node.
pub is_intended_secondary: Option<bool>,
}
#[derive(Serialize, Deserialize)]
pub struct NodeDescribeResponse {
pub id: NodeId,
@@ -132,8 +158,12 @@ pub struct TenantDescribeResponseShard {
pub is_splitting: bool,
pub scheduling_policy: ShardSchedulingPolicy,
pub preferred_az_id: Option<String>,
}
/// Migration request for a given tenant shard to a given node.
///
/// Explicitly migrating a particular shard is a low level operation
/// TODO: higher level "Reschedule tenant" operation where the request
/// specifies some constraints, e.g. asking it to get off particular node(s)

View File

@@ -305,8 +305,10 @@ pub struct TenantConfig {
pub lsn_lease_length_for_ts: Option<String>,
}
/// The policy for the aux file storage. It can be switched through `switch_aux_file_policy`
/// tenant config. When the first aux file written, the policy will be persisted in the
/// The policy for the aux file storage.
///
/// It can be switched through `switch_aux_file_policy` tenant config.
/// When the first aux file written, the policy will be persisted in the
/// `index_part.json` file and has a limited migration path.
///
/// Currently, we only allow the following migration path:
@@ -896,7 +898,9 @@ pub struct WalRedoManagerStatus {
pub process: Option<WalRedoManagerProcessStatus>,
}
/// The progress of a secondary tenant is mostly useful when doing a long running download: e.g. initiating
/// The progress of a secondary tenant.
///
/// It is mostly useful when doing a long running download: e.g. initiating
/// a download job, timing out while waiting for it to run, and then inspecting this status to understand
/// what's happening.
#[derive(Default, Debug, Serialize, Deserialize, Clone)]

View File

@@ -69,8 +69,10 @@ impl QueryError {
}
/// Returns true if the given error is a normal consequence of a network issue,
/// or the client closing the connection. These errors can happen during normal
/// operations, and don't indicate a bug in our code.
/// or the client closing the connection.
///
/// These errors can happen during normal operations,
/// and don't indicate a bug in our code.
pub fn is_expected_io_error(e: &io::Error) -> bool {
use io::ErrorKind::*;
matches!(

View File

@@ -7,6 +7,7 @@ use std::fmt;
use url::Host;
/// Parses a string of format either `host:port` or `host` into a corresponding pair.
///
/// The `host` part should be a correct `url::Host`, while `port` (if present) should be
/// a valid decimal u16 of digits only.
pub fn parse_host_port<S: AsRef<str>>(host_port: S) -> Result<(Host, Option<u16>), anyhow::Error> {

View File

@@ -45,6 +45,8 @@ pub use azure_core::Etag;
pub use error::{DownloadError, TimeTravelError, TimeoutOrCancel};
/// Default concurrency limit for S3 operations
///
/// Currently, sync happens with AWS S3, that has two limits on requests per second:
/// ~200 RPS for IAM services
/// <https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html>
@@ -300,7 +302,9 @@ pub trait RemoteStorage: Send + Sync + 'static {
) -> Result<(), TimeTravelError>;
}
/// DownloadStream is sensitive to the timeout and cancellation used with the original
/// Data part of an ongoing [`Download`].
///
/// `DownloadStream` is sensitive to the timeout and cancellation used with the original
/// [`RemoteStorage::download`] request. The type yields `std::io::Result<Bytes>` to be compatible
/// with `tokio::io::copy_buf`.
// This has 'static because safekeepers do not use cancellation tokens (yet)

View File

@@ -60,3 +60,16 @@ pub struct TimelineCopyRequest {
pub target_timeline_id: TimelineId,
pub until_lsn: Lsn,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TimelineTermBumpRequest {
/// bump to
pub term: Option<u64>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TimelineTermBumpResponse {
// before the request
pub previous_term: u64,
pub current_term: u64,
}

View File

@@ -5,9 +5,10 @@
mod calculation;
pub mod svg;
/// StorageModel is the input to the synthetic size calculation. It represents
/// a tree of timelines, with just the information that's needed for the
/// calculation. This doesn't track timeline names or where each timeline
/// StorageModel is the input to the synthetic size calculation.
///
/// It represents a tree of timelines, with just the information that's needed
/// for the calculation. This doesn't track timeline names or where each timeline
/// begins and ends, for example. Instead, it consists of "points of interest"
/// on the timelines. A point of interest could be the timeline start or end point,
/// the oldest point on a timeline that needs to be retained because of PITR

View File

@@ -5,8 +5,10 @@ use std::{
use metrics::IntCounter;
/// Circuit breakers are for operations that are expensive and fallible: if they fail repeatedly,
/// we will stop attempting them for some period of time, to avoid denial-of-service from retries, and
/// Circuit breakers are for operations that are expensive and fallible.
///
/// If a circuit breaker fails repeatedly, we will stop attempting it for some
/// period of time, to avoid denial-of-service from retries, and
/// to mitigate the log spam from repeated failures.
pub struct CircuitBreaker {
/// An identifier that enables us to log useful errors when a circuit is broken

View File

@@ -1,3 +1,4 @@
use std::os::fd::AsRawFd;
use std::{
borrow::Cow,
fs::{self, File},
@@ -203,6 +204,27 @@ pub fn overwrite(
Ok(())
}
/// Syncs the filesystem for the given file descriptor.
#[cfg_attr(target_os = "macos", allow(unused_variables))]
pub fn syncfs(fd: impl AsRawFd) -> anyhow::Result<()> {
// Linux guarantees durability for syncfs.
// POSIX doesn't have syncfs, and further does not actually guarantee durability of sync().
#[cfg(target_os = "linux")]
{
use anyhow::Context;
nix::unistd::syncfs(fd.as_raw_fd()).context("syncfs")?;
}
#[cfg(target_os = "macos")]
{
// macOS is not a production platform for Neon, don't even bother.
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
compile_error!("Unsupported OS");
}
Ok(())
}
#[cfg(test)]
mod tests {

View File

@@ -249,8 +249,10 @@ macro_rules! id_newtype {
};
}
/// Neon timeline IDs are different from PostgreSQL timeline
/// IDs. They serve a similar purpose though: they differentiate
/// Neon timeline ID.
///
/// They are different from PostgreSQL timeline
/// IDs, but serve a similar purpose: they differentiate
/// between different "histories" of the same cluster. However,
/// PostgreSQL timeline IDs are a bit cumbersome, because they are only
/// 32-bits wide, and they must be in ascending order in any given

View File

@@ -100,7 +100,9 @@ pub enum LockFileRead {
}
/// Open & try to lock the lock file at the given `path`, returning a [handle][`LockFileRead`] to
/// inspect its content. It is not an `Err(...)` if the file does not exist or is already locked.
/// inspect its content.
///
/// It is not an `Err(...)` if the file does not exist or is already locked.
/// Check the [`LockFileRead`] variants for details.
pub fn read_and_hold_lock_file(path: &Utf8Path) -> anyhow::Result<LockFileRead> {
let res = fs::OpenOptions::new().read(true).open(path);

View File

@@ -190,7 +190,7 @@ impl Drop for TracingPanicHookGuard {
}
/// Named symbol for our panic hook, which logs the panic.
fn tracing_panic_hook(info: &std::panic::PanicInfo) {
fn tracing_panic_hook(info: &std::panic::PanicHookInfo) {
// following rust 1.66.1 std implementation:
// https://github.com/rust-lang/rust/blob/90743e7298aca107ddaa0c202a4d3604e29bfeb6/library/std/src/panicking.rs#L235-L288
let location = info.location();

View File

@@ -8,6 +8,7 @@ use tracing::{trace, warn};
use crate::lsn::Lsn;
/// Feedback pageserver sends to safekeeper and safekeeper resends to compute.
///
/// Serialized in custom flexible key/value format. In replication protocol, it
/// is marked with NEON_STATUS_UPDATE_TAG_BYTE to differentiate from postgres
/// Standby status update / Hot standby feedback messages.

View File

@@ -65,6 +65,8 @@ impl<T> Poison<T> {
}
}
/// Armed pointer to a [`Poison`].
///
/// Use [`Self::data`] and [`Self::data_mut`] to access the wrapped state.
/// Once modifications are done, use [`Self::disarm`].
/// If [`Guard`] gets dropped instead of calling [`Self::disarm`], the state is poisoned

View File

@@ -13,10 +13,11 @@ pub struct ShardNumber(pub u8);
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
pub struct ShardCount(pub u8);
/// Combination of ShardNumber and ShardCount. For use within the context of a particular tenant,
/// when we need to know which shard we're dealing with, but do not need to know the full
/// ShardIdentity (because we won't be doing any page->shard mapping), and do not need to know
/// the fully qualified TenantShardId.
/// Combination of ShardNumber and ShardCount.
///
/// For use within the context of a particular tenant, when we need to know which shard we're
/// dealing with, but do not need to know the full ShardIdentity (because we won't be doing
/// any page->shard mapping), and do not need to know the fully qualified TenantShardId.
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
pub struct ShardIndex {
pub shard_number: ShardNumber,

View File

@@ -49,12 +49,11 @@ use std::sync::{RwLock, RwLockWriteGuard};
use tokio::sync::watch;
///
/// Rcu allows multiple readers to read and hold onto a value without blocking
/// (for very long). Storing to the Rcu updates the value, making new readers
/// immediately see the new value, but it also waits for all current readers to
/// finish.
/// (for very long).
///
/// Storing to the Rcu updates the value, making new readers immediately see
/// the new value, but it also waits for all current readers to finish.
pub struct Rcu<V> {
inner: RwLock<RcuInner<V>>,
}

View File

@@ -5,7 +5,9 @@ use std::sync::{
use tokio::sync::Semaphore;
/// Custom design like [`tokio::sync::OnceCell`] but using [`OwnedSemaphorePermit`] instead of
/// `SemaphorePermit`, allowing use of `take` which does not require holding an outer mutex guard
/// `SemaphorePermit`.
///
/// Allows use of `take` which does not require holding an outer mutex guard
/// for the duration of initialization.
///
/// Has no unsafe, builds upon [`tokio::sync::Semaphore`] and [`std::sync::Mutex`].

View File

@@ -7,6 +7,7 @@ pub enum VecMapOrdering {
}
/// Ordered map datastructure implemented in a Vec.
///
/// Append only - can only add keys that are larger than the
/// current max key.
/// Ordering can be adjusted using [`VecMapOrdering`]

View File

@@ -6,9 +6,10 @@ pub enum YieldingLoopError {
Cancelled,
}
/// Helper for long synchronous loops, e.g. over all tenants in the system. Periodically
/// yields to avoid blocking the executor, and after resuming checks the provided
/// cancellation token to drop out promptly on shutdown.
/// Helper for long synchronous loops, e.g. over all tenants in the system.
///
/// Periodically yields to avoid blocking the executor, and after resuming
/// checks the provided cancellation token to drop out promptly on shutdown.
#[inline(always)]
pub async fn yielding_loop<I, T, F>(
interval: usize,

View File

@@ -1,2 +1,20 @@
pub mod mgmt_api;
pub mod page_service;
/// For timeline_block_unblock_gc, distinguish the two different operations. This could be a bool.
// If file structure is per-kind not per-feature then where to put this?
#[derive(Clone, Copy)]
pub enum BlockUnblock {
Block,
Unblock,
}
impl std::fmt::Display for BlockUnblock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
BlockUnblock::Block => "block",
BlockUnblock::Unblock => "unblock",
};
f.write_str(s)
}
}

View File

@@ -12,6 +12,8 @@ use utils::{
pub use reqwest::Body as ReqwestBody;
use crate::BlockUnblock;
pub mod util;
#[derive(Debug, Clone)]
@@ -454,6 +456,20 @@ impl Client {
.map_err(Error::ReceiveBody)
}
pub async fn timeline_block_unblock_gc(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
dir: BlockUnblock,
) -> Result<()> {
let uri = format!(
"{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/{dir}_gc",
self.mgmt_api_endpoint,
);
self.request(Method::POST, &uri, ()).await.map(|_| ())
}
pub async fn tenant_reset(&self, tenant_shard_id: TenantShardId) -> Result<()> {
let uri = format!(
"{}/v1/tenant/{}/reset",

View File

@@ -142,11 +142,16 @@ impl PagestreamClient {
) -> anyhow::Result<PagestreamGetPageResponse> {
let req = PagestreamFeMessage::GetPage(req);
let req: bytes::Bytes = req.serialize();
// let mut req = tokio_util::io::ReaderStream::new(&req);
let mut req = tokio_stream::once(Ok(req));
self.copy_both.send_all(&mut req).await?;
for i in 0..10 {
let mut req = tokio_stream::once(Ok(req.clone()));
self.copy_both.send_all(&mut req).await?;
}
for i in 0..9 {
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
let next: bytes::Bytes = next.unwrap()?;
}
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
let next: bytes::Bytes = next.unwrap()?;

View File

@@ -37,6 +37,7 @@ use pageserver::{
virtual_file,
};
use postgres_backend::AuthType;
use utils::crashsafe::syncfs;
use utils::failpoint_support;
use utils::logging::TracingErrorLayerEnablement;
use utils::{
@@ -155,23 +156,7 @@ fn main() -> anyhow::Result<()> {
};
let started = Instant::now();
// Linux guarantees durability for syncfs.
// POSIX doesn't have syncfs, and further does not actually guarantee durability of sync().
#[cfg(target_os = "linux")]
{
use std::os::fd::AsRawFd;
nix::unistd::syncfs(dirfd.as_raw_fd()).context("syncfs")?;
}
#[cfg(target_os = "macos")]
{
// macOS is not a production platform for Neon, don't even bother.
drop(dirfd);
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
compile_error!("Unsupported OS");
}
syncfs(dirfd)?;
let elapsed = started.elapsed();
info!(
elapsed_ms = elapsed.as_millis(),

View File

@@ -180,6 +180,8 @@ pub struct PageServerConf {
pub io_buffer_alignment: usize,
}
/// Token for authentication to safekeepers
///
/// We do not want to store this in a PageServerConf because the latter may be logged
/// and/or serialized at a whim, while the token is secret. Currently this token is the
/// same for accessing all tenants/timelines, but may become per-tenant/per-timeline in

View File

@@ -1,7 +1,9 @@
//! This module defines `RequestContext`, a structure that we use throughout
//! the pageserver to propagate high-level context from places
//! that _originate_ activity down to the shared code paths at the
//! heart of the pageserver. It's inspired by Golang's `context.Context`.
//! Defines [`RequestContext`].
//!
//! It is a structure that we use throughout the pageserver to propagate
//! high-level context from places that _originate_ activity down to the
//! shared code paths at the heart of the pageserver. It's inspired by
//! Golang's `context.Context`.
//!
//! For example, in `Timeline::get(page_nr, lsn)` we need to answer the following questions:
//! 1. What high-level activity ([`TaskKind`]) needs this page?

View File

@@ -1185,6 +1185,7 @@ struct GlobalAndPerTimelineHistogramTimer<'a, 'c> {
ctx: &'c RequestContext,
start: std::time::Instant,
op: SmgrQueryType,
count: usize,
}
impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> {
@@ -1212,9 +1213,11 @@ impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> {
elapsed
}
};
self.global_metric.observe(ex_throttled.as_secs_f64());
if let Some(timeline_metric) = self.timeline_metric {
timeline_metric.observe(ex_throttled.as_secs_f64());
for _ in 0..self.count {
self.global_metric.observe(ex_throttled.as_secs_f64());
if let Some(timeline_metric) = self.timeline_metric {
timeline_metric.observe(ex_throttled.as_secs_f64());
}
}
}
}
@@ -1343,6 +1346,14 @@ impl SmgrQueryTimePerTimeline {
&'a self,
op: SmgrQueryType,
ctx: &'c RequestContext,
) -> Option<impl Drop + '_> {
self.start_timer_many(op, 1, ctx)
}
pub(crate) fn start_timer_many<'c: 'a, 'a>(
&'a self,
op: SmgrQueryType,
count: usize,
ctx: &'c RequestContext,
) -> Option<impl Drop + '_> {
let global_metric = &self.global_metrics[op as usize];
let start = Instant::now();
@@ -1376,6 +1387,7 @@ impl SmgrQueryTimePerTimeline {
ctx,
start,
op,
count,
})
}
}
@@ -3170,6 +3182,16 @@ static TOKIO_EXECUTOR_THREAD_COUNT: Lazy<UIntGaugeVec> = Lazy::new(|| {
.unwrap()
});
pub(crate) static CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM: Lazy<Histogram> =
Lazy::new(|| {
register_histogram!(
"pageserver_consecutive_nonblocking_getpage_requests",
"Number of consecutive nonblocking getpage requests",
(0..=256).map(|x| x as f64).collect::<Vec<f64>>(),
)
.unwrap()
});
pub(crate) fn set_tokio_runtime_setup(setup: &str, num_threads: NonZeroUsize) {
static SERIALIZE: std::sync::Mutex<()> = std::sync::Mutex::new(());
let _guard = SERIALIZE.lock().unwrap();

View File

@@ -5,14 +5,14 @@ use anyhow::Context;
use async_compression::tokio::write::GzipEncoder;
use bytes::Buf;
use futures::FutureExt;
use once_cell::sync::OnceCell;
use pageserver_api::models::TenantState;
use once_cell::sync::{Lazy, OnceCell};
use pageserver_api::models::{self, TenantState};
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest,
PagestreamNblocksResponse, PagestreamProtocolVersion,
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
PagestreamProtocolVersion,
};
use pageserver_api::shard::TenantShardId;
use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError};
@@ -43,7 +43,7 @@ use crate::basebackup;
use crate::basebackup::BasebackupError;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics;
use crate::metrics::{self, CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM};
use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
use crate::pgdatadir_mapping::Version;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
@@ -58,7 +58,7 @@ use crate::tenant::GetTimelineError;
use crate::tenant::PageReconstructError;
use crate::tenant::Timeline;
use pageserver_api::key::rel_block_to_key;
use pageserver_api::reltag::SlruKind;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;
@@ -577,124 +577,317 @@ impl PageServerHandler {
}
}
loop {
// read request bytes (it's exactly 1 PagestreamFeMessage per CopyData)
let msg = tokio::select! {
biased;
_ = self.cancel.cancelled() => {
return Err(QueryError::Shutdown)
}
msg = pgb.read_message() => { msg }
};
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Terminate) => break,
Some(m) => {
return Err(QueryError::Other(anyhow::anyhow!(
"unexpected message: {m:?} during COPY"
)));
}
None => break, // client disconnected
};
let mut batched = None;
'outer: loop {
enum DebouncedFeMessage {
Exists(models::PagestreamExistsRequest),
Nblocks(models::PagestreamNblocksRequest),
GetPage {
span: Span,
shard: timeline::handle::Handle<TenantManagerTypes>,
effective_request_lsn: Lsn,
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
},
DbSize(models::PagestreamDbSizeRequest),
GetSlruSegment(models::PagestreamGetSlruSegmentRequest),
RespondError(Span, PageStreamError),
}
let mut debounce: Option<std::time::Instant> = None;
// return or `?` on protocol error
// `break EXPR` to stop batching. The EXPR will be the first message in the next batch.
let next_batched: Option<DebouncedFeMessage> = loop {
static BOUNCE_TIMEOUT: Lazy<Duration> = Lazy::new(|| {
utils::env::var::<humantime::Duration, _>("NEON_PAGESERVER_DEBOUNCE")
.unwrap()
.into()
});
let sleep_fut = if let Some(started_at) = debounce {
futures::future::Either::Left(tokio::time::sleep_until(
(started_at + *BOUNCE_TIMEOUT).into(),
))
} else {
futures::future::Either::Right(futures::future::pending())
};
let msg = tokio::select! {
biased;
_ = self.cancel.cancelled() => {
return Err(QueryError::Shutdown)
}
msg = pgb.read_message() => {
msg
}
_ = sleep_fut => {
assert!(batched.is_some());
break None;
}
};
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Terminate) => break 'outer,
Some(m) => {
return Err(QueryError::Other(anyhow::anyhow!(
"unexpected message: {m:?} during COPY"
)));
}
None => break 'outer, // client disconnected
};
trace!("query: {copy_data_bytes:?}");
fail::fail_point!("ps::handle-pagerequest-message");
trace!("query: {copy_data_bytes:?}");
fail::fail_point!("ps::handle-pagerequest-message");
// parse request
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
// parse request
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
let this_msg = match neon_fe_msg {
PagestreamFeMessage::Exists(msg) => DebouncedFeMessage::Exists(msg),
PagestreamFeMessage::Nblocks(msg) => DebouncedFeMessage::Nblocks(msg),
PagestreamFeMessage::DbSize(msg) => DebouncedFeMessage::DbSize(msg),
PagestreamFeMessage::GetSlruSegment(msg) => {
DebouncedFeMessage::GetSlruSegment(msg)
}
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
request_lsn,
not_modified_since,
rel,
blkno,
}) => {
let span = tracing::info_span!("handle_get_page_at_lsn_request_batched", %tenant_id, %timeline_id, shard_id = tracing::field::Empty, req_lsn = %request_lsn, batch_size = tracing::field::Empty);
let key = rel_block_to_key(rel, blkno);
let shard = match self
.timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Page(key))
.instrument(span.clone())
.await
{
Ok(tl) => tl,
Err(GetActiveTimelineError::Tenant(
GetActiveTenantError::NotFound(_),
)) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
//
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
break Some(DebouncedFeMessage::RespondError(
span,
PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into(),
),
));
}
Err(e) => break Some(DebouncedFeMessage::RespondError(span, e.into())),
};
let effective_request_lsn = match Self::wait_or_get_last_lsn(
&shard,
request_lsn,
not_modified_since,
&shard.get_latest_gc_cutoff_lsn(),
&ctx,
)
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
.await
{
Ok(lsn) => lsn,
Err(e) => {
break Some(DebouncedFeMessage::RespondError(span, e));
}
};
DebouncedFeMessage::GetPage {
span,
shard,
effective_request_lsn,
pages: smallvec::smallvec![(rel, blkno)],
}
}
};
// check if we can debounce
match (&mut batched, this_msg) {
(None, this_msg) => {
batched = Some(this_msg);
}
(
Some(DebouncedFeMessage::GetPage {
span: _,
shard: accum_shard,
pages: accum_pages,
effective_request_lsn: accum_lsn,
}),
DebouncedFeMessage::GetPage {
span: _,
shard: this_shard,
pages: this_pages,
effective_request_lsn: this_lsn,
},
) if async {
assert_eq!(this_pages.len(), 1);
if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize {
assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize);
return false;
}
if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
!= (this_shard.tenant_shard_id, this_shard.timeline_id)
{
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logig for keeping responses in order does not support that.
return false;
}
// the vectored get currently only supports a single LSN, so, bounce as soon
// as the effective request_lsn changes
return *accum_lsn == this_lsn;
}
.await =>
{
// ok to batch
accum_pages.extend(this_pages);
}
(Some(_), this_msg) => {
// by default, don't continue batching
break Some(this_msg);
}
}
// debounce impl piece
let started_at = debounce.get_or_insert_with(Instant::now);
if started_at.elapsed() > *BOUNCE_TIMEOUT {
break None;
}
};
// invoke handler function
let (handler_result, span) = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
let (handler_results, span): (
smallvec::SmallVec<[Result<PagestreamBeMessage, PageStreamError>; 1]>,
_,
) = match batched.take().expect("loop above ensures this") {
DebouncedFeMessage::Exists(req) => {
fail::fail_point!("ps::handle-pagerequest-message::exists");
let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn);
(
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
smallvec::smallvec![
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await
],
span,
)
}
PagestreamFeMessage::Nblocks(req) => {
DebouncedFeMessage::Nblocks(req) => {
fail::fail_point!("ps::handle-pagerequest-message::nblocks");
let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
(
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
smallvec::smallvec![
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
],
span,
)
}
PagestreamFeMessage::GetPage(req) => {
DebouncedFeMessage::GetPage {
span,
shard,
effective_request_lsn,
pages,
} => {
CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM.observe(pages.len() as f64);
span.record("batch_size", pages.len() as u64);
fail::fail_point!("ps::handle-pagerequest-message::getpage");
// shard_id is filled in by the handler
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);
(
self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
{
let npages = pages.len();
let res = self
.handle_get_page_at_lsn_request_batched(
&shard,
effective_request_lsn,
pages,
&ctx,
)
.instrument(span.clone())
.await;
assert_eq!(res.len(), npages);
res
},
span,
)
}
PagestreamFeMessage::DbSize(req) => {
DebouncedFeMessage::DbSize(req) => {
fail::fail_point!("ps::handle-pagerequest-message::dbsize");
let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
(
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
smallvec::smallvec![
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await
],
span,
)
}
PagestreamFeMessage::GetSlruSegment(req) => {
DebouncedFeMessage::GetSlruSegment(req) => {
fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
(
self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
smallvec::smallvec![
self.handle_get_slru_segment_request(
tenant_id,
timeline_id,
&req,
&ctx
)
.instrument(span.clone())
.await,
.await
],
span,
)
}
DebouncedFeMessage::RespondError(span, e) => {
// We've already decided to respond with an error, so we don't need to
// call the handler.
(smallvec::smallvec![Err(e)], span)
}
};
// Map handler result to protocol behavior.
// Some handler errors cause exit from pagestream protocol.
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
let response_msg = match handler_result {
Err(e) => match &e {
PageStreamError::Shutdown => {
// If we fail to fulfil a request during shutdown, which may be _because_ of
// shutdown, then do not send the error to the client. Instead just drop the
// connection.
span.in_scope(|| info!("dropping connection due to shutdown"));
return Err(QueryError::Shutdown);
}
PageStreamError::Reconnect(reason) => {
span.in_scope(|| info!("handler requested reconnect: {reason}"));
return Err(QueryError::Reconnect);
}
PageStreamError::Read(_)
| PageStreamError::LsnTimeout(_)
| PageStreamError::NotFound(_)
| PageStreamError::BadRequest(_) => {
// print the all details to the log with {:#}, but for the client the
// error message is enough. Do not log if shutting down, as the anyhow::Error
// here includes cancellation which is not an error.
let full = utils::error::report_compact_sources(&e);
span.in_scope(|| {
error!("error reading relation or page version: {full:#}")
});
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
}
},
Ok(response_msg) => response_msg,
};
for handler_result in handler_results {
let response_msg = match handler_result {
Err(e) => match &e {
PageStreamError::Shutdown => {
// If we fail to fulfil a request during shutdown, which may be _because_ of
// shutdown, then do not send the error to the client. Instead just drop the
// connection.
span.in_scope(|| info!("dropping connection due to shutdown"));
return Err(QueryError::Shutdown);
}
PageStreamError::Reconnect(reason) => {
span.in_scope(|| info!("handler requested reconnect: {reason}"));
return Err(QueryError::Reconnect);
}
PageStreamError::Read(_)
| PageStreamError::LsnTimeout(_)
| PageStreamError::NotFound(_)
| PageStreamError::BadRequest(_) => {
// print the all details to the log with {:#}, but for the client the
// error message is enough. Do not log if shutting down, as the anyhow::Error
// here includes cancellation which is not an error.
let full = utils::error::report_compact_sources(&e);
span.in_scope(|| {
error!("error reading relation or page version: {full:#}")
});
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
}
},
Ok(response_msg) => response_msg,
};
// marshal & transmit response message
pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
// marshal & transmit response message
pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
}
tokio::select! {
biased;
_ = self.cancel.cancelled() => {
@@ -706,6 +899,9 @@ impl PageServerHandler {
res?;
}
}
assert!(batched.is_none(), "we take() earlier");
batched = next_batched;
}
Ok(())
}
@@ -949,60 +1145,30 @@ impl PageServerHandler {
}))
}
#[instrument(skip_all, fields(shard_id))]
async fn handle_get_page_at_lsn_request(
#[instrument(skip_all)]
async fn handle_get_page_at_lsn_request_batched(
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
req: &PagestreamGetPageRequest,
timeline: &Timeline,
effective_lsn: Lsn,
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = match self
.timeline_handles
.get(
tenant_id,
timeline_id,
ShardSelector::Page(rel_block_to_key(req.rel, req.blkno)),
)
.await
{
Ok(tl) => tl,
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
//
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
return Err(PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into(),
));
}
Err(e) => return Err(e.into()),
};
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
&timeline,
req.request_lsn,
req.not_modified_since,
&latest_gc_cutoff_lsn,
) -> smallvec::SmallVec<[Result<PagestreamBeMessage, PageStreamError>; 1]> {
debug_assert_current_span_has_tenant_and_timeline_id();
let _timer = timeline.query_metrics.start_timer_many(
metrics::SmgrQueryType::GetPageAtLsn,
pages.len(),
ctx,
)
.await?;
);
let page = timeline
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), ctx)
.await?;
let pages = timeline
.get_rel_page_at_lsn_batched(pages, Version::Lsn(effective_lsn), ctx)
.await;
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
page,
smallvec::SmallVec::from_iter(pages.into_iter().map(|page| {
page.map(|page| {
PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse { page })
})
.map_err(PageStreamError::Read)
}))
}
@@ -1499,3 +1665,10 @@ fn set_tracing_field_shard_id(timeline: &Timeline) {
);
debug_assert_current_span_has_tenant_and_timeline_id();
}
struct WaitedForLsn(Lsn);
impl From<WaitedForLsn> for Lsn {
fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self {
lsn
}
}

View File

@@ -9,12 +9,17 @@
use super::tenant::{PageReconstructError, Timeline};
use crate::context::RequestContext;
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
use crate::span::{
debug_assert_current_span_has_tenant_and_timeline_id,
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
};
use crate::tenant::timeline::GetVectoredError;
use crate::walrecord::NeonWalRecord;
use crate::{aux_file, repository::*};
use anyhow::{ensure, Context};
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use itertools::Itertools;
use pageserver_api::key::{
dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
@@ -28,7 +33,7 @@ use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::BLCKSZ;
use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId};
use serde::{Deserialize, Serialize};
use std::collections::{hash_map, HashMap, HashSet};
use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
use std::ops::ControlFlow;
use std::ops::Range;
use strum::IntoEnumIterator;
@@ -191,26 +196,184 @@ impl Timeline {
version: Version<'_>,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
if tag.relnode == 0 {
return Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
));
}
let pages = smallvec::smallvec![(tag, blknum)];
let res = self.get_rel_page_at_lsn_batched(pages, version, ctx).await;
assert_eq!(res.len(), 1);
res.into_iter().next().unwrap()
}
let nblocks = self.get_rel_size(tag, version, ctx).await?;
if blknum >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
tag,
blknum,
version.get_lsn(),
nblocks
);
return Ok(ZERO_PAGE.clone());
/// Like [`get_rel_page_at_lsn`], but returns a batch of pages.
pub(crate) async fn get_rel_page_at_lsn_batched(
&self,
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
version: Version<'_>,
ctx: &RequestContext,
) -> smallvec::SmallVec<[Result<Bytes, PageReconstructError>; 1]> {
debug_assert_current_span_has_tenant_and_timeline_id();
let request_lsn = match version {
Version::Lsn(lsn) => lsn,
Version::Modified(_) => panic!("unsupported"),
};
enum KeyState {
NeedsVectoredGet,
Done(Result<Bytes, PageReconstructError>),
}
let mut key_states = BTreeMap::new();
let mut vectored_gets: smallvec::SmallVec<[_; 1]> =
smallvec::SmallVec::with_capacity(pages.len());
for (response_order, (tag, blknum)) in pages.into_iter().enumerate() {
let key = rel_block_to_key(tag, blknum);
use std::collections::btree_map::Entry;
let key_state_slot = match key_states.entry((key, response_order)) {
Entry::Occupied(_entry) => unreachable!(
"enumerate makes keys unique, even if batch contains same key twice"
),
Entry::Vacant(entry) => entry,
};
let key = rel_block_to_key(tag, blknum);
version.get(self, key, ctx).await
if tag.relnode == 0 {
key_state_slot.insert(KeyState::Done(Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
))));
continue;
}
let nblocks = match self.get_rel_size(tag, version, ctx).await {
Ok(nblocks) => nblocks,
Err(err) => {
key_state_slot.insert(KeyState::Done(Err(err)));
continue;
}
};
if blknum >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
tag,
blknum,
version.get_lsn(),
nblocks
);
key_state_slot.insert(KeyState::Done(Ok(ZERO_PAGE.clone())));
continue;
}
vectored_gets.push(key);
key_state_slot.insert(KeyState::NeedsVectoredGet);
}
// turn vectored_gets into a keyspace
let keyspace = {
// add_key reuqires monotonicity
vectored_gets.sort_unstable();
let mut acc = KeySpaceAccum::new();
for key in vectored_gets
.into_iter()
// in fact it requires strong monotonicity
.dedup()
{
acc.add_key(key);
}
acc.to_keyspace()
};
match self.get_vectored(keyspace, request_lsn, ctx).await {
Ok(results) => {
for (key, res) in results {
if let Err(err) = &res {
warn!(%key, ?err, "a key inside get_vectored failed with a per-key error");
}
let mut interests = key_states.range_mut((key, 0)..(key.next(), 0)).peekable();
let first_interest = interests.next().unwrap();
let next_interest = interests.peek().is_some();
if !next_interest {
match first_interest.1 {
KeyState::NeedsVectoredGet => {
*first_interest.1 = KeyState::Done(res);
}
KeyState::Done(_) => unreachable!(),
}
continue;
} else {
for ((_, _), state) in [first_interest].into_iter().chain(interests) {
match state {
KeyState::NeedsVectoredGet => {
*state = KeyState::Done(match &res {
Ok(buf) => Ok(buf.clone()),
// this `match` is working around the fact that we cannot Clone the PageReconstructError
Err(err) => Err(match err {
PageReconstructError::Cancelled => {
PageReconstructError::Cancelled
}
x @ PageReconstructError::Other(_) |
x @ PageReconstructError::AncestorLsnTimeout(_) |
x @ PageReconstructError::WalRedo(_) |
x @ PageReconstructError::MissingKey(_) => {
PageReconstructError::Other(anyhow::anyhow!("there was more than one request for this key in the batch, error logged once: {x:?}"))
},
}),
});
}
KeyState::Done(_) => unreachable!(),
}
}
}
}
}
Err(err) => {
warn!(?err, "get_vectored failed with a global error, mapping that error to per-key failure");
// this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
for ((_, _), state) in key_states.iter_mut() {
// this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
// but without taking ownership of the GetVectoredError
match &err {
GetVectoredError::Cancelled => {
*state = KeyState::Done(Err(PageReconstructError::Cancelled));
}
// TODO: restructure get_vectored API to make this error per-key
GetVectoredError::MissingKey(err) => {
*state = KeyState::Done(Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more of the requested keys were missing: {err:?}"))));
}
// TODO: restructure get_vectored API to make this error per-key
GetVectoredError::GetReadyAncestorError(err) => {
*state = KeyState::Done(Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}"))));
}
// TODO: restructure get_vectored API to make this error per-key
GetVectoredError::Other(err) => {
*state = KeyState::Done(Err(PageReconstructError::Other(
anyhow::anyhow!("whole vectored get request failed: {err:?}"),
)));
}
// TODO: we can prevent this error class by moving this check into the type system
GetVectoredError::InvalidLsn(e) => {
*state =
KeyState::Done(Err(anyhow::anyhow!("invalid LSN: {e:?}").into()));
}
// NB: this should never happen in practice because we limit MAX_GET_VECTORED_KEYS
// TODO: we can prevent this error class by moving this check into the type system
GetVectoredError::Oversized(err) => {
*state = KeyState::Done(Err(anyhow::anyhow!(
"batching oversized: {err:?}"
)
.into()));
}
}
}
}
};
// get the results into the order in which they were requested
let mut return_order: smallvec::SmallVec<[_; Timeline::MAX_GET_VECTORED_KEYS as usize]> =
smallvec::SmallVec::with_capacity(key_states.len());
return_order.extend(key_states.keys().map(|(key, idx)| (*key, *idx)));
return_order.sort_unstable_by_key(|(_, idx)| *idx);
let mut res = smallvec::SmallVec::with_capacity(key_states.len());
res.extend(return_order.into_iter().map(|key_states_key| {
match key_states.remove(&key_states_key).unwrap() {
KeyState::Done(res) => res,
KeyState::NeedsVectoredGet => unreachable!(),
}
}));
res
}
// Get size of a database in blocks
@@ -1021,9 +1184,10 @@ impl Timeline {
}
/// DatadirModification represents an operation to ingest an atomic set of
/// updates to the repository. It is created by the 'begin_record'
/// function. It is called for each WAL record, so that all the modifications
/// by a one WAL record appear atomic.
/// updates to the repository.
///
/// It is created by the 'begin_record' function. It is called for each WAL
/// record, so that all the modifications by a one WAL record appear atomic.
pub struct DatadirModification<'a> {
/// The timeline this modification applies to. You can access this to
/// read the state, but note that any pending updates are *not* reflected
@@ -2048,6 +2212,7 @@ impl<'a> DatadirModification<'a> {
/// This struct facilitates accessing either a committed key from the timeline at a
/// specific LSN, or the latest uncommitted key from a pending modification.
///
/// During WAL ingestion, the records from multiple LSNs may be batched in the same
/// modification before being flushed to the timeline. Hence, the routines in WalIngest
/// need to look up the keys in the modification first before looking them up in the

View File

@@ -73,6 +73,21 @@ impl ValueBytes {
Ok(raw[8] == 1)
}
pub(crate) fn is_image(raw: &[u8]) -> Result<bool, InvalidInput> {
if raw.len() < 12 {
return Err(InvalidInput::TooShortValue);
}
let value_discriminator = &raw[0..4];
if value_discriminator == [0, 0, 0, 0] {
// Value::Image always initializes
return Ok(true);
}
Ok(false)
}
}
#[cfg(test)]

View File

@@ -1,8 +1,9 @@
//! Timeline repository implementation that keeps old data in layer files, and
//! the recent changes in ephemeral files.
//!
//! Timeline repository implementation that keeps old data in files on disk, and
//! the recent changes in memory. See tenant/*_layer.rs files.
//! The functions here are responsible for locating the correct layer for the
//! get/put call, walking back the timeline branching history as needed.
//! See tenant/*_layer.rs files. The functions here are responsible for locating
//! the correct layer for the get/put call, walking back the timeline branching
//! history as needed.
//!
//! The files are stored in the .neon/tenants/<tenant_id>/timelines/<timeline_id>
//! directory. See docs/pageserver-storage.md for how the files are managed.

View File

@@ -1,7 +1,8 @@
//! Describes the legacy now hopefully no longer modified per-timeline metadata stored in
//! `index_part.json` managed by [`remote_timeline_client`]. For many tenants and their timelines,
//! this struct and it's original serialization format is still needed because they were written a
//! long time ago.
//! Describes the legacy now hopefully no longer modified per-timeline metadata.
//!
//! It is stored in `index_part.json` managed by [`remote_timeline_client`]. For many tenants and
//! their timelines, this struct and its original serialization format is still needed because
//! they were written a long time ago.
//!
//! Instead of changing and adding versioning to this, just change [`IndexPart`] with soft json
//! versioning.

View File

@@ -282,9 +282,10 @@ impl BackgroundPurges {
static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing));
/// The TenantManager is responsible for storing and mutating the collection of all tenants
/// that this pageserver process has state for. Every Tenant and SecondaryTenant instance
/// lives inside the TenantManager.
/// Responsible for storing and mutating the collection of all tenants
/// that this pageserver has state for.
///
/// Every Tenant and SecondaryTenant instance lives inside the TenantManager.
///
/// The most important role of the TenantManager is to prevent conflicts: e.g. trying to attach
/// the same tenant twice concurrently, or trying to configure the same tenant into secondary
@@ -2346,8 +2347,9 @@ pub enum TenantMapError {
ShuttingDown,
}
/// Guards a particular tenant_id's content in the TenantsMap. While this
/// structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`]
/// Guards a particular tenant_id's content in the TenantsMap.
///
/// While this structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`]
/// for this tenant, which acts as a marker for any operations targeting
/// this tenant to retry later, or wait for the InProgress state to end.
///

View File

@@ -2184,6 +2184,8 @@ pub fn remote_timeline_path(
remote_timelines_path(tenant_shard_id).join(Utf8Path::new(&timeline_id.to_string()))
}
/// Obtains the path of the given Layer in the remote
///
/// Note that the shard component of a remote layer path is _not_ always the same
/// as in the TenantShardId of the caller: tenants may reference layers from a different
/// ShardIndex. Use the ShardIndex from the layer's metadata.

View File

@@ -1,4 +1,5 @@
//! In-memory index to track the tenant files on the remote storage.
//!
//! Able to restore itself from the storage index parts, that are located in every timeline's remote directory and contain all data about
//! remote timeline layers and its metadata.

View File

@@ -8,15 +8,17 @@ mod layer_desc;
mod layer_name;
pub mod merge_iterator;
use tokio::sync::{self};
use utils::bin_ser::BeSer;
pub mod split_writer;
use crate::context::{AccessStatsBehavior, RequestContext};
use crate::repository::Value;
use crate::repository::{Value, ValueBytes};
use crate::walrecord::NeonWalRecord;
use bytes::Bytes;
use pageserver_api::key::Key;
use pageserver_api::key::{Key, DBDIR_KEY};
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
use std::cmp::{Ordering, Reverse};
use std::cmp::Ordering;
use std::collections::hash_map::Entry;
use std::collections::{BinaryHeap, HashMap};
use std::ops::Range;
@@ -79,12 +81,18 @@ pub(crate) enum ValueReconstructSituation {
}
/// Reconstruct data accumulated for a single key during a vectored get
#[derive(Debug, Default, Clone)]
#[derive(Debug, Default)]
pub(crate) struct VectoredValueReconstructState {
pub(crate) records: Vec<(Lsn, NeonWalRecord)>,
pub(crate) img: Option<(Lsn, Bytes)>,
pub(crate) records: Vec<(
Lsn,
tokio::sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
)>,
pub(crate) img: Option<(
Lsn,
tokio::sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
)>,
situation: ValueReconstructSituation,
pub(crate) situation: ValueReconstructSituation,
}
impl VectoredValueReconstructState {
@@ -93,16 +101,57 @@ impl VectoredValueReconstructState {
}
}
impl From<VectoredValueReconstructState> for ValueReconstructState {
fn from(mut state: VectoredValueReconstructState) -> Self {
// walredo expects the records to be descending in terms of Lsn
state.records.sort_by_key(|(lsn, _)| Reverse(*lsn));
pub(crate) async fn convert(
_key: Key,
from: VectoredValueReconstructState,
) -> Result<ValueReconstructState, PageReconstructError> {
let mut to = ValueReconstructState::default();
ValueReconstructState {
records: state.records,
img: state.img,
for (lsn, fut) in from.records {
match fut.await {
Ok(res) => match res {
Ok(bytes) => {
let value = Value::des(&bytes)
.map_err(|err| PageReconstructError::Other(err.into()))?;
match value {
Value::WalRecord(rec) => {
to.records.push((lsn, rec));
},
Value::Image(img) => {
assert!(to.img.is_none());
to.img = Some((lsn, img));
}
}
}
Err(err) => {
return Err(PageReconstructError::Other(err.into()));
}
},
Err(err) => {
return Err(PageReconstructError::Other(err.into()));
}
}
}
if to.img.is_none() {
let (lsn, fut) = from.img.expect("Need an image");
match fut.await {
Ok(res) => match res {
Ok(bytes) => {
to.img = Some((lsn, bytes));
}
Err(err) => {
return Err(PageReconstructError::Other(err.into()));
}
},
Err(err) => {
return Err(PageReconstructError::Other(err.into()));
}
}
}
Ok(to)
}
/// Bag of data accumulated during a vectored get..
@@ -200,7 +249,8 @@ impl ValuesReconstructState {
&mut self,
key: &Key,
lsn: Lsn,
value: Value,
completes: bool,
value: sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
) -> ValueReconstructSituation {
let state = self
.keys
@@ -208,31 +258,14 @@ impl ValuesReconstructState {
.or_insert(Ok(VectoredValueReconstructState::default()));
if let Ok(state) = state {
let key_done = match state.situation {
match state.situation {
ValueReconstructSituation::Complete => unreachable!(),
ValueReconstructSituation::Continue => match value {
Value::Image(img) => {
state.img = Some((lsn, img));
true
}
Value::WalRecord(rec) => {
debug_assert!(
Some(lsn) > state.get_cached_lsn(),
"Attempt to collect a record below cached LSN for walredo: {} < {}",
lsn,
state
.get_cached_lsn()
.expect("Assertion can only fire if a cached lsn is present")
);
ValueReconstructSituation::Continue => {
state.records.push((lsn, value));
}
}
let will_init = rec.will_init();
state.records.push((lsn, rec));
will_init
}
},
};
if key_done && state.situation == ValueReconstructSituation::Continue {
if completes && state.situation == ValueReconstructSituation::Continue {
state.situation = ValueReconstructSituation::Complete;
self.keys_done.add_key(*key);
}
@@ -434,10 +467,11 @@ impl ReadableLayer {
}
}
/// Layers contain a hint indicating whether they are likely to be used for reads. This is a hint rather
/// than an authoritative value, so that we do not have to update it synchronously when changing the visibility
/// of layers (for example when creating a branch that makes some previously covered layers visible). It should
/// be used for cache management but not for correctness-critical checks.
/// Layers contain a hint indicating whether they are likely to be used for reads.
///
/// This is a hint rather than an authoritative value, so that we do not have to update it synchronously
/// when changing the visibility of layers (for example when creating a branch that makes some previously
/// covered layers visible). It should be used for cache management but not for correctness-critical checks.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LayerVisibilityHint {
/// A Visible layer might be read while serving a read, because there is not an image layer between it

View File

@@ -42,13 +42,12 @@ use crate::tenant::vectored_blob_io::{
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadCoalesceMode, VectoredReadPlanner,
};
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::{self, VirtualFile};
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::BytesMut;
use anyhow::{bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
use camino::{Utf8Path, Utf8PathBuf};
use futures::StreamExt;
use itertools::Itertools;
@@ -58,14 +57,14 @@ use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId;
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::fs::File;
use std::io::SeekFrom;
use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::OnceCell;
use tokio::sync::{self, OnceCell};
use tokio_epoll_uring::IoBuf;
use tracing::*;
@@ -136,10 +135,11 @@ impl Summary {
// Flag indicating that this version initialize the page
const WILL_INIT: u64 = 1;
/// Struct representing reference to BLOB in layers. Reference contains BLOB
/// offset, and for WAL records it also contains `will_init` flag. The flag
/// helps to determine the range of records that needs to be applied, without
/// reading/deserializing records themselves.
/// Struct representing reference to BLOB in layers.
///
/// Reference contains BLOB offset, and for WAL records it also contains
/// `will_init` flag. The flag helps to determine the range of records
/// that needs to be applied, without reading/deserializing records themselves.
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
pub struct BlobRef(pub u64);
@@ -223,7 +223,7 @@ pub struct DeltaLayerInner {
index_start_blk: u32,
index_root_blk: u32,
file: VirtualFile,
file: Arc<VirtualFile>,
file_id: FileId,
layer_key_range: Range<Key>,
@@ -787,9 +787,11 @@ impl DeltaLayerInner {
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path, ctx)
.await
.context("open layer file")?;
let file = Arc::new(
VirtualFile::open(path, ctx)
.await
.context("open layer file")?,
);
let file_id = page_cache::next_file_id();
@@ -979,77 +981,59 @@ impl DeltaLayerInner {
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) {
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
let mut ignore_key_with_err = None;
let max_vectored_read_bytes = self
.max_vectored_read_bytes
.expect("Layer is loaded with max vectored bytes config")
.0
.into();
let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
let mut buf = Some(BytesMut::with_capacity(buf_size));
// Note that reads are processed in reverse order (from highest key+lsn).
// This is the order that `ReconstructState` requires such that it can
// track when a key is done.
for read in reads.into_iter().rev() {
let res = vectored_blob_reader
.read_blobs(&read, buf.take().expect("Should have a buffer"), ctx)
.await;
let blobs_buf = match res {
Ok(blobs_buf) => blobs_buf,
Err(err) => {
let kind = err.kind();
for (_, blob_meta) in read.blobs_at.as_slice() {
reconstruct_state.on_key_error(
blob_meta.key,
PageReconstructError::Other(anyhow!(
"Failed to read blobs from virtual file {}: {}",
self.file.path,
kind
)),
);
}
// We have "lost" the buffer since the lower level IO api
// doesn't return the buffer on error. Allocate a new one.
buf = Some(BytesMut::with_capacity(buf_size));
continue;
}
};
for meta in blobs_buf.blobs.iter().rev() {
if Some(meta.meta.key) == ignore_key_with_err {
continue;
}
let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
let value = match value {
Ok(v) => v,
Err(e) => {
reconstruct_state.on_key_error(
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to deserialize blob from virtual file {}",
self.file.path,
))),
);
ignore_key_with_err = Some(meta.meta.key);
continue;
}
};
// Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
// state, no further updates shall be made to it. The call below will
// panic if the invariant is violated.
reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
let mut senders: HashMap<
(Key, Lsn),
sync::oneshot::Sender<Result<Bytes, std::io::Error>>,
> = Default::default();
for (_, blob_meta) in read.blobs_at.as_slice() {
let (tx, rx) = sync::oneshot::channel();
senders.insert((blob_meta.key, blob_meta.lsn), tx);
reconstruct_state.update_key(
&blob_meta.key,
blob_meta.lsn,
blob_meta.will_init,
rx,
);
}
buf = Some(blobs_buf.buf);
let read_from = self.file.clone();
let read_ctx = ctx.attached_child();
tokio::task::spawn(async move {
let vectored_blob_reader = VectoredBlobReader::new(&read_from);
let buf = BytesMut::with_capacity(buf_size);
let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
match res {
Ok(blobs_buf) => {
for meta in blobs_buf.blobs.iter().rev() {
let buf = &blobs_buf.buf[meta.start..meta.end];
let sender = senders
.remove(&(meta.meta.key, meta.meta.lsn))
.expect("sender must exist");
let _ = sender.send(Ok(Bytes::copy_from_slice(buf)));
}
assert!(senders.is_empty());
}
Err(err) => {
for (_, sender) in senders {
let _ = sender
.send(Err(std::io::Error::new(err.kind(), "vec read failed")));
}
}
}
});
}
}
@@ -1189,7 +1173,14 @@ impl DeltaLayerInner {
let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
let end_offset = offset;
Some((BlobMeta { key, lsn }, start_offset..end_offset))
Some((
BlobMeta {
key,
lsn,
will_init: false,
},
start_offset..end_offset,
))
} else {
None
};

View File

@@ -1,7 +1,9 @@
//! An ImageLayer represents an image or a snapshot of a key-range at
//! one particular LSN. It contains an image of all key-value pairs
//! in its key-range. Any key that falls into the image layer's range
//! but does not exist in the layer, does not exist.
//! one particular LSN.
//!
//! It contains an image of all key-value pairs in its key-range. Any key
//! that falls into the image layer's range but does not exist in the layer,
//! does not exist.
//!
//! An image layer is stored in a file on disk. The file is stored in
//! timelines/<timeline_id> directory. Currently, there are no
@@ -19,7 +21,7 @@
//!
//! Every image layer file consists of three parts: "summary",
//! "index", and "values". The summary is a fixed size header at the
//! beginning of the file, and it contains basic information about the
//! beginningof the file, and it contains basic information about the
//! layer, and offsets to the other parts. The "index" is a B-tree,
//! mapping from Key to an offset in the "values" part. The
//! actual page images are stored in the "values" part.
@@ -36,11 +38,11 @@ use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
};
use crate::tenant::{PageReconstructError, Timeline};
use crate::tenant::Timeline;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::{self, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{anyhow, bail, ensure, Context, Result};
use anyhow::{bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
use camino::{Utf8Path, Utf8PathBuf};
use hex;
@@ -50,13 +52,14 @@ use pageserver_api::keyspace::KeySpace;
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::fs::File;
use std::io::SeekFrom;
use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::oneshot;
use tokio::sync::OnceCell;
use tokio_stream::StreamExt;
use tracing::*;
@@ -161,7 +164,7 @@ pub struct ImageLayerInner {
key_range: Range<Key>,
lsn: Lsn,
file: VirtualFile,
file: Arc<VirtualFile>,
file_id: FileId,
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
@@ -388,9 +391,11 @@ impl ImageLayerInner {
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path, ctx)
.await
.context("open layer file")?;
let file = Arc::new(
VirtualFile::open(path, ctx)
.await
.context("open layer file")?,
);
let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id);
let summary_blk = block_reader
@@ -577,8 +582,16 @@ impl ImageLayerInner {
.0
.into();
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
for read in reads.into_iter() {
let mut senders: HashMap<(Key, Lsn), oneshot::Sender<Result<Bytes, std::io::Error>>> =
Default::default();
for (_, blob_meta) in read.blobs_at.as_slice() {
let (tx, rx) = oneshot::channel();
senders.insert((blob_meta.key, blob_meta.lsn), tx);
reconstruct_state.update_key(&blob_meta.key, blob_meta.lsn, true, rx);
}
let buf_size = read.size();
if buf_size > max_vectored_read_bytes {
@@ -597,36 +610,33 @@ impl ImageLayerInner {
);
}
let buf = BytesMut::with_capacity(buf_size);
let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
let read_from = self.file.clone();
let read_ctx = ctx.attached_child();
tokio::task::spawn(async move {
let buf = BytesMut::with_capacity(buf_size);
let vectored_blob_reader = VectoredBlobReader::new(&*read_from);
let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
match res {
Ok(blobs_buf) => {
let frozen_buf = blobs_buf.buf.freeze();
match res {
Ok(blobs_buf) => {
for meta in blobs_buf.blobs.iter().rev() {
let buf = &blobs_buf.buf[meta.start..meta.end];
let sender = senders
.remove(&(meta.meta.key, meta.meta.lsn))
.expect("sender must exist");
let _ = sender.send(Ok(Bytes::copy_from_slice(buf)));
}
for meta in blobs_buf.blobs.iter() {
let img_buf = frozen_buf.slice(meta.start..meta.end);
reconstruct_state.update_key(
&meta.meta.key,
self.lsn,
Value::Image(img_buf),
);
assert!(senders.is_empty());
}
Err(err) => {
for (_, sender) in senders {
let _ = sender
.send(Err(std::io::Error::new(err.kind(), "vec read failed")));
}
}
}
Err(err) => {
let kind = err.kind();
for (_, blob_meta) in read.blobs_at.as_slice() {
reconstruct_state.on_key_error(
blob_meta.key,
PageReconstructError::from(anyhow!(
"Failed to read blobs from virtual file {}: {}",
self.file.path,
kind
)),
);
}
}
};
});
}
}

View File

@@ -10,10 +10,9 @@ use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
use crate::repository::{Key, Value};
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::{l0_flush, page_cache};
use anyhow::{anyhow, Context, Result};
use anyhow::{Context, Result};
use bytes::Bytes;
use camino::Utf8PathBuf;
use pageserver_api::key::CompactKey;
@@ -35,9 +34,7 @@ use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::atomic::{AtomicU64, AtomicUsize};
use tokio::sync::RwLock;
use super::{
DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState,
};
use super::{DeltaLayerWriter, PersistentLayerDesc, ValuesReconstructState};
pub(crate) mod vectored_dio_read;
@@ -87,7 +84,7 @@ pub struct InMemoryLayerInner {
/// The values are stored in a serialized format in this file.
/// Each serialized Value is preceded by a 'u32' length field.
/// PerSeg::page_versions map stores offsets into this file.
file: EphemeralFile,
file: Arc<tokio::sync::RwLock<EphemeralFile>>,
resource_units: GlobalResourceUnits,
}
@@ -381,7 +378,11 @@ impl InMemoryLayer {
}
pub(crate) fn try_len(&self) -> Option<u64> {
self.inner.try_read().map(|i| i.file.len()).ok()
self.inner
.try_read()
.map(|i| i.file.try_read().map(|i| i.len()).ok())
.ok()
.flatten()
}
pub(crate) fn assert_writable(&self) {
@@ -432,6 +433,10 @@ impl InMemoryLayer {
read: vectored_dio_read::LogicalRead<Vec<u8>>,
}
let mut reads: HashMap<Key, Vec<ValueRead>> = HashMap::new();
let mut senders: HashMap<
(Key, Lsn),
tokio::sync::oneshot::Sender<Result<Bytes, std::io::Error>>,
> = Default::default();
for range in keyspace.ranges.iter() {
for (key, vec_map) in inner
@@ -459,6 +464,11 @@ impl InMemoryLayer {
Vec::with_capacity(len as usize),
),
});
let (tx, rx) = tokio::sync::oneshot::channel();
senders.insert((key, *entry_lsn), tx);
reconstruct_state.update_key(&key, *entry_lsn, will_init, rx);
if will_init {
break;
}
@@ -466,46 +476,42 @@ impl InMemoryLayer {
}
}
// Execute the reads.
let read_from = inner.file.clone();
let read_ctx = ctx.attached_child();
tokio::task::spawn(async move {
let locked = read_from.read().await;
let f = vectored_dio_read::execute(
&*locked,
reads
.iter()
.flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)),
&read_ctx,
);
send_future::SendFuture::send(f) // https://github.com/rust-lang/rust/issues/96865
.await;
let f = vectored_dio_read::execute(
&inner.file,
reads
.iter()
.flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)),
&ctx,
);
send_future::SendFuture::send(f) // https://github.com/rust-lang/rust/issues/96865
.await;
// Process results into the reconstruct state
'next_key: for (key, value_reads) in reads {
for ValueRead { entry_lsn, read } in value_reads {
match read.into_result().expect("we run execute() above") {
Err(e) => {
reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e)));
continue 'next_key;
}
Ok(value_buf) => {
let value = Value::des(&value_buf);
if let Err(e) = value {
reconstruct_state
.on_key_error(key, PageReconstructError::from(anyhow!(e)));
continue 'next_key;
for (key, value_reads) in reads {
for ValueRead { entry_lsn, read } in value_reads {
let sender = senders
.remove(&(key, entry_lsn))
.expect("sender must exist");
match read.into_result().expect("we run execute() above") {
Err(e) => {
let sender = senders
.remove(&(key, entry_lsn))
.expect("sender must exist");
let _ = sender
.send(Err(std::io::Error::new(e.kind(), "dio vec read failed")));
}
let key_situation =
reconstruct_state.update_key(&key, entry_lsn, value.unwrap());
if key_situation == ValueReconstructSituation::Complete {
// TODO: metric to see if we fetched more values than necessary
continue 'next_key;
Ok(value_buf) => {
let _ = sender.send(Ok(value_buf.into()));
}
// process the next value in the next iteration of the loop
}
}
}
}
assert!(senders.is_empty());
});
reconstruct_state.on_lsn_advanced(&keyspace, self.start_lsn);
@@ -600,7 +606,8 @@ impl InMemoryLayer {
/// Get layer size.
pub async fn size(&self) -> Result<u64> {
let inner = self.inner.read().await;
Ok(inner.file.len())
let locked = inner.file.try_read().expect("no contention");
Ok(locked.len())
}
/// Create a new, empty, in-memory layer
@@ -614,9 +621,10 @@ impl InMemoryLayer {
) -> Result<InMemoryLayer> {
trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
let file =
EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?;
let key = InMemoryLayerFileId(file.page_cache_file_id());
let file = Arc::new(tokio::sync::RwLock::new(
EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?,
));
let key = InMemoryLayerFileId(file.read().await.page_cache_file_id());
Ok(InMemoryLayer {
file_id: key,
@@ -648,7 +656,7 @@ impl InMemoryLayer {
let mut inner = self.inner.write().await;
self.assert_writable();
let base_offset = inner.file.len();
let base_offset = inner.file.read().await.len();
let SerializedBatch {
raw,
@@ -672,8 +680,13 @@ impl InMemoryLayer {
}
// Write the batch to the file
inner.file.write_raw(&raw, ctx).await?;
let new_size = inner.file.len();
// FIXME: can't borrow arc
let new_size = {
let mut locked = inner.file.write().await;
locked.write_raw(&raw, ctx).await?;
locked.len()
};
let expected_new_len = base_offset
.checked_add(raw.len().into_u64())
// write_raw would error if we were to overflow u64.
@@ -713,7 +726,7 @@ impl InMemoryLayer {
pub(crate) async fn tick(&self) -> Option<u64> {
let mut inner = self.inner.write().await;
let size = inner.file.len();
let size = inner.file.read().await.len();
inner.resource_units.publish_size(size)
}
@@ -809,7 +822,7 @@ impl InMemoryLayer {
match l0_flush_global_state {
l0_flush::Inner::Direct { .. } => {
let file_contents: Vec<u8> = inner.file.load_to_vec(ctx).await?;
let file_contents: Vec<u8> = inner.file.read().await.load_to_vec(ctx).await?;
let file_contents = Bytes::from(file_contents);

View File

@@ -107,6 +107,8 @@ async fn smoke_test() {
.expect("tenant harness writes the control file")
};
let img_before = (img_before.0, img_before.1.await.unwrap().unwrap());
let img_after = (img_after.0, img_after.1.await.unwrap().unwrap());
assert_eq!(img_before, img_after);
// evict_and_wait can timeout, but it doesn't cancel the evicting itself

View File

@@ -12,8 +12,10 @@ use serde::{Deserialize, Serialize};
#[cfg(test)]
use utils::id::TenantId;
/// A unique identifier of a persistent layer. This is different from `LayerDescriptor`, which is only used in the
/// benchmarks. This struct contains all necessary information to find the image / delta layer. It also provides
/// A unique identifier of a persistent layer.
///
/// This is different from `LayerDescriptor`, which is only used in the benchmarks.
/// This struct contains all necessary information to find the image / delta layer. It also provides
/// a unified way to generate layer information like file name.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Hash)]
pub struct PersistentLayerDesc {

View File

@@ -217,8 +217,9 @@ impl fmt::Display for ImageLayerName {
}
}
/// LayerName is the logical identity of a layer within a LayerMap at a moment in time. The
/// LayerName is not a unique filename, as the same LayerName may have multiple physical incarnations
/// LayerName is the logical identity of a layer within a LayerMap at a moment in time.
///
/// The LayerName is not a unique filename, as the same LayerName may have multiple physical incarnations
/// over time (e.g. across shard splits or compression). The physical filenames of layers in local
/// storage and object names in remote storage consist of the LayerName plus some extra qualifiers
/// that uniquely identify the physical incarnation of a layer (see [crate::tenant::remote_timeline_client::remote_layer_path])

View File

@@ -226,9 +226,11 @@ impl<'a> IteratorWrapper<'a> {
}
}
/// A merge iterator over delta/image layer iterators. When duplicated records are
/// found, the iterator will not perform any deduplication, and the caller should handle
/// these situation. By saying duplicated records, there are many possibilities:
/// A merge iterator over delta/image layer iterators.
///
/// When duplicated records are found, the iterator will not perform any
/// deduplication, and the caller should handle these situation. By saying
/// duplicated records, there are many possibilities:
///
/// * Two same delta at the same LSN.
/// * Two same image at the same LSN.

View File

@@ -34,9 +34,10 @@ impl SplitWriterResult {
}
}
/// An image writer that takes images and produces multiple image layers. The interface does not
/// guarantee atomicity (i.e., if the image layer generation fails, there might be leftover files
/// to be cleaned up)
/// An image writer that takes images and produces multiple image layers.
///
/// The interface does not guarantee atomicity (i.e., if the image layer generation
/// fails, there might be leftover files to be cleaned up)
#[must_use]
pub struct SplitImageLayerWriter {
inner: ImageLayerWriter,
@@ -193,9 +194,10 @@ impl SplitImageLayerWriter {
}
}
/// A delta writer that takes key-lsn-values and produces multiple delta layers. The interface does not
/// guarantee atomicity (i.e., if the delta layer generation fails, there might be leftover files
/// to be cleaned up).
/// A delta writer that takes key-lsn-values and produces multiple delta layers.
///
/// The interface does not guarantee atomicity (i.e., if the delta layer generation fails,
/// there might be leftover files to be cleaned up).
///
/// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
/// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm

View File

@@ -18,6 +18,7 @@ use camino::Utf8Path;
use chrono::{DateTime, Utc};
use enumset::EnumSet;
use fail::fail_point;
use futures::{stream::FuturesUnordered, StreamExt};
use handle::ShardTimelineId;
use once_cell::sync::Lazy;
use pageserver_api::{
@@ -68,7 +69,9 @@ use crate::{
tenant::{
layer_map::{LayerMap, SearchResult},
metadata::TimelineMetadata,
storage_layer::{inmemory_layer::IndexEntry, PersistentLayerDesc},
storage_layer::{
convert, inmemory_layer::IndexEntry, PersistentLayerDesc, ValueReconstructSituation,
},
},
walredo,
};
@@ -1129,22 +1132,38 @@ impl Timeline {
let reconstruct_timer = crate::metrics::RECONSTRUCT_TIME
.for_get_kind(get_kind)
.start_timer();
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
let layers_visited = reconstruct_state.get_layers_visited();
let futs = FuturesUnordered::new();
for (key, res) in std::mem::take(&mut reconstruct_state.keys) {
match res {
Err(err) => {
results.insert(key, Err(err));
}
Ok(state) => {
let state = ValueReconstructState::from(state);
futs.push({
let walredo_self = self.myself.upgrade().expect("&self method holds the arc");
async move {
let state = res.expect("Read path is infallible");
assert!(matches!(
state.situation,
ValueReconstructSituation::Complete
));
let reconstruct_res = self.reconstruct_value(key, lsn, state).await;
results.insert(key, reconstruct_res);
let converted = match convert(key, state).await {
Ok(ok) => ok,
Err(err) => {
return (key, Err(err));
}
};
(
key,
walredo_self.reconstruct_value(key, lsn, converted).await,
)
}
}
});
}
let results = futs
.collect::<BTreeMap<Key, Result<Bytes, PageReconstructError>>>()
.await;
reconstruct_timer.stop_and_record();
// For aux file keys (v1 or v2) the vectored read path does not return an error
@@ -5496,30 +5515,30 @@ impl Timeline {
#[cfg(test)]
pub(crate) async fn inspect_image_layers(
self: &Arc<Timeline>,
lsn: Lsn,
ctx: &RequestContext,
_lsn: Lsn,
_ctx: &RequestContext,
) -> anyhow::Result<Vec<(Key, Bytes)>> {
let mut all_data = Vec::new();
let guard = self.layers.read().await;
for layer in guard.layer_map()?.iter_historic_layers() {
if !layer.is_delta() && layer.image_layer_lsn() == lsn {
let layer = guard.get_from_desc(&layer);
let mut reconstruct_data = ValuesReconstructState::default();
layer
.get_values_reconstruct_data(
KeySpace::single(Key::MIN..Key::MAX),
lsn..Lsn(lsn.0 + 1),
&mut reconstruct_data,
ctx,
)
.await?;
for (k, v) in reconstruct_data.keys {
all_data.push((k, v?.img.unwrap().1));
}
}
}
all_data.sort();
Ok(all_data)
// let mut all_data = Vec::new();
// let guard = self.layers.read().await;
// for layer in guard.layer_map()?.iter_historic_layers() {
// if !layer.is_delta() && layer.image_layer_lsn() == lsn {
// let layer = guard.get_from_desc(&layer);
// let mut reconstruct_data = ValuesReconstructState::default();
// layer
// .get_values_reconstruct_data(
// KeySpace::single(Key::MIN..Key::MAX),
// lsn..Lsn(lsn.0 + 1),
// &mut reconstruct_data,
// ctx,
// )
// .await?;
// for (k, v) in reconstruct_data.keys {
// all_data.push((k, v?.img.unwrap().1));
// }
// }
// }
// all_data.sort();
Ok(Vec::new())
}
/// Get all historic layer descriptors in the layer map

View File

@@ -33,6 +33,7 @@ use crate::virtual_file::{self, VirtualFile};
pub struct BlobMeta {
pub key: Key,
pub lsn: Lsn,
pub will_init: bool,
}
/// Blob offsets into [`VectoredBlobsBuf::buf`]
@@ -355,7 +356,8 @@ pub enum BlobFlag {
/// * Iterate over the collected blobs and coalesce them into reads at the end
pub struct VectoredReadPlanner {
// Track all the blob offsets. Start offsets must be ordered.
blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>,
// Note: last bool is will_init
blobs: BTreeMap<Key, Vec<(Lsn, u64, u64, bool)>>,
// Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
prev: Option<(Key, Lsn, u64, BlobFlag)>,
@@ -420,12 +422,12 @@ impl VectoredReadPlanner {
match flag {
BlobFlag::None => {
let blobs_for_key = self.blobs.entry(key).or_default();
blobs_for_key.push((lsn, start_offset, end_offset));
blobs_for_key.push((lsn, start_offset, end_offset, false));
}
BlobFlag::ReplaceAll => {
let blobs_for_key = self.blobs.entry(key).or_default();
blobs_for_key.clear();
blobs_for_key.push((lsn, start_offset, end_offset));
blobs_for_key.push((lsn, start_offset, end_offset, true));
}
BlobFlag::Ignore => {}
}
@@ -436,11 +438,17 @@ impl VectoredReadPlanner {
let mut reads = Vec::new();
for (key, blobs_for_key) in self.blobs {
for (lsn, start_offset, end_offset) in blobs_for_key {
for (lsn, start_offset, end_offset, will_init) in blobs_for_key {
let extended = match &mut current_read_builder {
Some(read_builder) => {
read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn })
}
Some(read_builder) => read_builder.extend(
start_offset,
end_offset,
BlobMeta {
key,
lsn,
will_init,
},
),
None => VectoredReadExtended::No,
};
@@ -448,7 +456,11 @@ impl VectoredReadPlanner {
let next_read_builder = VectoredReadBuilder::new(
start_offset,
end_offset,
BlobMeta { key, lsn },
BlobMeta {
key,
lsn,
will_init,
},
self.max_read_size,
self.mode,
);
@@ -593,8 +605,10 @@ impl<'a> VectoredBlobReader<'a> {
}
}
/// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`]. It provides a streaming API for
/// getting read blobs. It returns a batch when `handle` gets called and when the current key would just exceed the read_size and
/// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
///
/// It provides a streaming API for getting read blobs. It returns a batch when
/// `handle` gets called and when the current key would just exceed the read_size and
/// max_cnt constraints.
pub struct StreamingVectoredReadPlanner {
read_builder: Option<VectoredReadBuilder>,
@@ -663,10 +677,19 @@ impl StreamingVectoredReadPlanner {
start_offset: u64,
end_offset: u64,
is_last_blob_in_read: bool,
// destination: oneshot::Sender<Result<Bytes, std::io::Error>>,
) -> Option<VectoredRead> {
match &mut self.read_builder {
Some(read_builder) => {
let extended = read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn });
let extended = read_builder.extend(
start_offset,
end_offset,
BlobMeta {
key,
lsn,
will_init: false,
},
);
assert_eq!(extended, VectoredReadExtended::Yes);
}
None => {
@@ -674,7 +697,11 @@ impl StreamingVectoredReadPlanner {
Some(VectoredReadBuilder::new_streaming(
start_offset,
end_offset,
BlobMeta { key, lsn },
BlobMeta {
key,
lsn,
will_init: false,
},
self.mode,
))
};
@@ -1006,6 +1033,7 @@ mod tests {
let meta = BlobMeta {
key: Key::MIN,
lsn: Lsn(0),
will_init: false,
};
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {

View File

@@ -1,6 +1,7 @@
//!
//! VirtualFile is like a normal File, but it's not bound directly to
//! a file descriptor. Instead, the file is opened when it's read from,
//! a file descriptor.
//!
//! Instead, the file is opened when it's read from,
//! and if too many files are open globally in the system, least-recently
//! used ones are closed.
//!

View File

@@ -43,13 +43,12 @@ use utils::lsn::Lsn;
use utils::sync::gate::GateError;
use utils::sync::heavier_once_cell;
/// The real implementation that uses a Postgres process to
/// perform WAL replay.
///
/// This is the real implementation that uses a Postgres process to
/// perform WAL replay. Only one thread can use the process at a time,
/// that is controlled by the Mutex. In the future, we might want to
/// launch a pool of processes to allow concurrent replay of multiple
/// records.
///
/// Only one thread can use the process at a time, that is controlled by the
/// Mutex. In the future, we might want to launch a pool of processes to allow
/// concurrent replay of multiple records.
pub struct PostgresRedoManager {
tenant_shard_id: TenantShardId,
conf: &'static PageServerConf,

View File

@@ -1038,9 +1038,12 @@ DetermineEpochStartLsn(WalProposer *wp)
if (SkipXLogPageHeader(wp, wp->propEpochStartLsn) != wp->api.get_redo_start_lsn(wp))
{
/*
* However, allow to proceed if previously elected leader was me;
* plain restart of walproposer not intervened by concurrent
* compute (who could generate WAL) is ok.
* However, allow to proceed if last_log_term on the node which gave
* the highest vote (i.e. point where we are going to start writing)
* actually had been won by me; plain restart of walproposer not
* intervened by concurrent compute which wrote WAL is ok.
*
* This avoids compute crash after manual term_bump.
*/
if (!((dth->n_entries >= 1) && (dth->entries[dth->n_entries - 1].term ==
pg_atomic_read_u64(&walprop_shared->mineLastElectedTerm))))
@@ -1442,12 +1445,17 @@ RecvAppendResponses(Safekeeper *sk)
if (sk->appendResponse.term > wp->propTerm)
{
/*
* Another compute with higher term is running. Panic to restart
* PG as we likely need to retake basebackup. However, don't dump
* core as this is kinda expected scenario.
*
* Term has changed to higher one, probably another compute is
* running. If this is the case we could PANIC as well because
* likely it inserted some data and our basebackup is unsuitable
* anymore. However, we also bump term manually (term_bump endpoint)
* on safekeepers for migration purposes, in this case we do want
* compute to stay alive. So restart walproposer with FATAL instead
* of panicking; if basebackup is spoiled next election will notice
* this.
*/
disable_core_dump();
wp_log(PANIC, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT ", meaning another compute is running at the same time, and it conflicts with us",
wp_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT ", meaning another compute is running at the same time, and it conflicts with us",
sk->host, sk->port,
sk->appendResponse.term, wp->propTerm);
}

View File

@@ -16,7 +16,7 @@ use tracing::debug;
// On the other hand, `hashlink` has good download stats and appears to be maintained.
use hashlink::{linked_hash_map::RawEntryMut, LruCache};
use super::{common::Cached, *};
use super::{common::Cached, timed_lru, Cache};
/// An implementation of timed LRU cache with fixed capacity.
/// Key properties:

View File

@@ -44,16 +44,14 @@
clippy::items_after_statements,
)]
// List of temporarily allowed lints.
// TODO: Switch to except() once stable with 1.81.
// TODO: fix code and reduce list or move to permanent list above.
#![allow(
#![expect(
clippy::cargo_common_metadata,
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
clippy::cast_precision_loss,
clippy::cast_sign_loss,
clippy::doc_markdown,
clippy::implicit_hasher,
clippy::inline_always,
clippy::match_same_arms,
clippy::match_wild_err_arm,
@@ -61,21 +59,28 @@
clippy::missing_panics_doc,
clippy::module_name_repetitions,
clippy::needless_pass_by_value,
clippy::needless_raw_string_hashes,
clippy::redundant_closure_for_method_calls,
clippy::return_self_not_must_use,
clippy::similar_names,
clippy::single_match_else,
clippy::struct_excessive_bools,
clippy::struct_field_names,
clippy::too_many_lines,
clippy::unreadable_literal,
clippy::unused_async,
clippy::unused_self,
clippy::wildcard_imports
clippy::unused_self
)]
#![cfg_attr(
any(test, feature = "testing"),
allow(
clippy::needless_raw_string_hashes,
clippy::unreadable_literal,
clippy::unused_async,
)
)]
// List of temporarily allowed lints to unblock beta/nightly.
#![allow(unknown_lints, clippy::manual_inspect)]
#![allow(
unknown_lints,
// TODO: 1.82: Add `use<T>` where necessary and remove from this list.
impl_trait_overcaptures,
)]
use std::{convert::Infallible, future::Future};

View File

@@ -217,6 +217,7 @@ impl sasl::Mechanism for Exchange<'_> {
self.state = ExchangeState::SaltSent(sent);
Ok(Step::Continue(self, msg))
}
#[allow(unreachable_patterns)] // TODO: 1.82: simply drop this match
Step::Success(x, _) => match x {},
Step::Failure(msg) => Ok(Step::Failure(msg)),
}
@@ -224,6 +225,7 @@ impl sasl::Mechanism for Exchange<'_> {
ExchangeState::SaltSent(sent) => {
match sent.transition(self.secret, &self.tls_server_end_point, input)? {
Step::Success(keys, msg) => Ok(Step::Success(keys, msg)),
#[allow(unreachable_patterns)] // TODO: 1.82: simply drop this match
Step::Continue(x, _) => match x {},
Step::Failure(msg) => Ok(Step::Failure(msg)),
}

View File

@@ -745,22 +745,20 @@ impl BatchQueryData {
builder = builder.deferrable(true);
}
let transaction = builder.start().await.map_err(|e| {
let transaction = builder.start().await.inspect_err(|_| {
// if we cannot start a transaction, we should return immediately
// and not return to the pool. connection is clearly broken
discard.discard();
e
})?;
let json_output =
match query_batch(cancel.child_token(), &transaction, self, parsed_headers).await {
Ok(json_output) => {
info!("commit");
let status = transaction.commit().await.map_err(|e| {
let status = transaction.commit().await.inspect_err(|_| {
// if we cannot commit - for now don't return connection to pool
// TODO: get a query status from the error
discard.discard();
e
})?;
discard.check_idle(status);
json_output
@@ -776,11 +774,10 @@ impl BatchQueryData {
}
Err(err) => {
info!("rollback");
let status = transaction.rollback().await.map_err(|e| {
let status = transaction.rollback().await.inspect_err(|_| {
// if we cannot rollback - for now don't return connection to pool
// TODO: get a query status from the error
discard.discard();
e
})?;
discard.check_idle(status);
return Err(err);

View File

@@ -14,6 +14,7 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_rustls::server::TlsStream;
/// Stream wrapper which implements libpq's protocol.
///
/// NOTE: This object deliberately doesn't implement [`AsyncRead`]
/// or [`AsyncWrite`] to prevent subtle errors (e.g. trying
/// to pass random malformed bytes through the connection).

View File

@@ -3,5 +3,5 @@ channel = "1.81.0"
profile = "default"
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
# https://rust-lang.github.io/rustup/concepts/profiles.html
# but we also need `llvm-tools-preview` for coverage data merges on CI
components = ["llvm-tools-preview", "rustfmt", "clippy"]
# but we also need `llvm-tools` for coverage data merges on CI
components = ["llvm-tools", "rustfmt", "clippy"]

View File

@@ -1,6 +1,9 @@
use utils::auth::{AuthError, Claims, Scope};
use utils::id::TenantId;
/// If tenant_id is provided, allow if token (claims) is for this tenant or
/// whole safekeeper scope (SafekeeperData). Else, allow only if token is
/// SafekeeperData.
pub fn check_permission(claims: &Claims, tenant_id: Option<TenantId>) -> Result<(), AuthError> {
match (&claims.scope, tenant_id) {
(Scope::Tenant, None) => Err(AuthError(

View File

@@ -19,7 +19,7 @@ use std::fs::{self, File};
use std::io::{ErrorKind, Write};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use storage_broker::Uri;
use tracing::*;
@@ -261,6 +261,15 @@ async fn main() -> anyhow::Result<()> {
// Change into the data directory.
std::env::set_current_dir(&workdir)?;
// Prevent running multiple safekeepers on the same directory
let lock_file_path = workdir.join(PID_FILE_NAME);
let lock_file =
pid_file::claim_for_current_process(&lock_file_path).context("claim pid file")?;
info!("claimed pid file at {lock_file_path:?}");
// ensure that the lock file is held even if the main thread of the process is panics
// we need to release the lock file only when the current process is gone
std::mem::forget(lock_file);
// Set or read our ID.
let id = set_id(&workdir, args.id.map(NodeId))?;
if args.init {
@@ -364,15 +373,15 @@ async fn main() -> anyhow::Result<()> {
type JoinTaskRes = Result<anyhow::Result<()>, JoinError>;
async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
// Prevent running multiple safekeepers on the same directory
let lock_file_path = conf.workdir.join(PID_FILE_NAME);
let lock_file =
pid_file::claim_for_current_process(&lock_file_path).context("claim pid file")?;
info!("claimed pid file at {lock_file_path:?}");
// ensure that the lock file is held even if the main thread of the process is panics
// we need to release the lock file only when the current process is gone
std::mem::forget(lock_file);
// fsync the datadir to make sure we have a consistent state on disk.
let dfd = File::open(&conf.workdir).context("open datadir for syncfs")?;
let started = Instant::now();
utils::crashsafe::syncfs(dfd)?;
let elapsed = started.elapsed();
info!(
elapsed_ms = elapsed.as_millis(),
"syncfs data directory done"
);
info!("starting safekeeper WAL service on {}", conf.listen_pg_addr);
let pg_listener = tcp_listener::bind(conf.listen_pg_addr.clone()).map_err(|e| {

View File

@@ -18,8 +18,8 @@ use utils::http::endpoint::{prometheus_metrics_handler, request_span, ChannelWri
use utils::http::request::parse_query_param;
use postgres_ffi::WAL_SEGMENT_SIZE;
use safekeeper_api::models::TimelineCreateRequest;
use safekeeper_api::models::{SkTimelineInfo, TimelineCopyRequest};
use safekeeper_api::models::{TimelineCreateRequest, TimelineTermBumpRequest};
use utils::{
auth::SwappableJwtAuth,
http::{
@@ -408,6 +408,28 @@ async fn timeline_backup_partial_reset(request: Request<Body>) -> Result<Respons
json_response(StatusCode::OK, response)
}
/// Make term at least as high as one in request. If one in request is None,
/// increment current one.
async fn timeline_term_bump_handler(
mut request: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
check_permission(&request, Some(ttid.tenant_id))?;
let request_data: TimelineTermBumpRequest = json_request(&mut request).await?;
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
let response = tli
.term_bump(request_data.term)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, response)
}
/// Used only in tests to hand craft required data.
async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let ttid = TenantTimelineId::new(
@@ -630,6 +652,10 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
"/v1/tenant/:tenant_id/timeline/:timeline_id/backup_partial_reset",
|r| request_span(r, timeline_backup_partial_reset),
)
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/term_bump",
|r| request_span(r, timeline_term_bump_handler),
)
.post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
request_span(r, record_safekeeper_info)
})

View File

@@ -484,6 +484,7 @@ pub async fn validate_temp_timeline(
}
/// Move timeline from a temp directory to the main storage, and load it to the global map.
///
/// This operation is done under a lock to prevent bugs if several concurrent requests are
/// trying to load the same timeline. Note that it doesn't guard against creating the
/// timeline with the same ttid, but no one should be doing this anyway.

View File

@@ -448,8 +448,10 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
/// Encapsulates a task which takes messages from msg_rx, processes and pushes
/// replies to reply_tx; reading from socket and writing to disk in parallel is
/// beneficial for performance, this struct provides writing to disk part.
/// replies to reply_tx.
///
/// Reading from socket and writing to disk in parallel is beneficial for
/// performance, this struct provides the writing to disk part.
pub struct WalAcceptor {
tli: WalResidentTimeline,
msg_rx: Receiver<ProposerAcceptorMessage>,

View File

@@ -938,8 +938,9 @@ where
}
trace!(
"processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
"processed AppendRequest of len {}, begin_lsn={}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
msg.wal_data.len(),
msg.h.begin_lsn,
msg.h.end_lsn,
msg.h.commit_lsn,
msg.h.truncate_lsn,

View File

@@ -1,9 +1,10 @@
//! Defines per timeline data stored persistently (SafeKeeperPersistentState)
//! and its wrapper with in memory layer (SafekeeperState).
use std::ops::Deref;
use std::{cmp::max, ops::Deref};
use anyhow::Result;
use safekeeper_api::models::TimelineTermBumpResponse;
use serde::{Deserialize, Serialize};
use utils::{
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
@@ -12,7 +13,7 @@ use utils::{
use crate::{
control_file,
safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, TermHistory},
safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, Term, TermHistory},
wal_backup_partial::{self},
};
@@ -147,9 +148,11 @@ pub struct TimelineMemState {
pub proposer_uuid: PgUuid,
}
/// Safekeeper persistent state plus in memory layer, to avoid frequent fsyncs
/// when we update fields like commit_lsn which don't need immediate
/// persistence. Provides transactional like API to atomically update the state.
/// Safekeeper persistent state plus in memory layer.
///
/// Allows us to avoid frequent fsyncs when we update fields like commit_lsn
/// which don't need immediate persistence. Provides transactional like API
/// to atomically update the state.
///
/// Implements Deref into *persistent* part.
pub struct TimelineState<CTRL: control_file::Storage> {
@@ -209,6 +212,27 @@ where
let s = self.start_change();
self.finish_change(&s).await
}
/// Make term at least as `to`. If `to` is None, increment current one. This
/// is not in safekeeper.rs because we want to be able to do it even if
/// timeline is offloaded.
pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
let before = self.acceptor_state.term;
let mut state = self.start_change();
let new = match to {
Some(to) => max(state.acceptor_state.term, to),
None => state.acceptor_state.term + 1,
};
if new > state.acceptor_state.term {
state.acceptor_state.term = new;
self.finish_change(&state).await?;
}
let after = self.acceptor_state.term;
Ok(TimelineTermBumpResponse {
previous_term: before,
current_term: after,
})
}
}
impl<CTRL> Deref for TimelineState<CTRL>

View File

@@ -4,6 +4,7 @@
use anyhow::{anyhow, bail, Result};
use camino::Utf8PathBuf;
use remote_storage::RemotePath;
use safekeeper_api::models::TimelineTermBumpResponse;
use serde::{Deserialize, Serialize};
use tokio::fs::{self};
use tokio_util::sync::CancellationToken;
@@ -169,6 +170,7 @@ impl<'a> Drop for WriteGuardSharedState<'a> {
}
/// This structure is stored in shared state and represents the state of the timeline.
///
/// Usually it holds SafeKeeper, but it also supports offloaded timeline state. In this
/// case, SafeKeeper is not available (because WAL is not present on disk) and all
/// operations can be done only with control file.
@@ -214,6 +216,10 @@ impl StateSK {
.get_last_log_term(self.flush_lsn())
}
pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
self.state_mut().term_bump(to).await
}
/// Close open WAL files to release FDs.
fn close_wal_store(&mut self) {
if let StateSK::Loaded(sk) = self {
@@ -853,6 +859,11 @@ impl Timeline {
Ok(res)
}
pub async fn term_bump(self: &Arc<Self>, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
let mut state = self.write_shared_state().await;
state.sk.term_bump(to).await
}
/// Get the timeline guard for reading/writing WAL files.
/// If WAL files are not present on disk (evicted), they will be automatically
/// downloaded from remote storage. This is done in the manager task, which is

View File

@@ -1,6 +1,8 @@
//! Code related to evicting WAL files to remote storage. The actual upload is done by the
//! partial WAL backup code. This file has code to delete and re-download WAL files,
//! cross-validate with partial WAL backup if local file is still present.
//! Code related to evicting WAL files to remote storage.
//!
//! The actual upload is done by the partial WAL backup code. This file has
//! code to delete and re-download WAL files, cross-validate with partial WAL
//! backup if local file is still present.
use anyhow::Context;
use camino::Utf8PathBuf;

View File

@@ -1,4 +1,6 @@
//! Timeline residence guard is needed to ensure that WAL segments are present on disk,
//! Timeline residence guard
//!
//! It is needed to ensure that WAL segments are present on disk,
//! as long as the code is holding the guard. This file implements guard logic, to issue
//! and drop guards, and to notify the manager when the guard is dropped.

View File

@@ -1,4 +1,5 @@
//! The timeline manager task is responsible for managing the timeline's background tasks.
//!
//! It is spawned alongside each timeline and exits when the timeline is deleted.
//! It watches for changes in the timeline state and decides when to spawn or kill background tasks.
//! It also can manage some reactive state, like should the timeline be active for broker pushes or not.

View File

@@ -60,7 +60,8 @@ impl TimelinesSet {
}
}
/// Guard is used to add or remove timeline from the set.
/// Guard is used to add or remove timelines from the set.
///
/// If the timeline present in set, it will be removed from it on drop.
/// Note: do not use more than one guard for the same timeline, it caches the presence state.
/// It is designed to be used in the manager task only.

View File

@@ -1,6 +1,8 @@
//! Safekeeper timeline has a background task which is subscribed to `commit_lsn`
//! and `flush_lsn` updates. After the partial segment was updated (`flush_lsn`
//! was changed), the segment will be uploaded to S3 in about 15 minutes.
//! and `flush_lsn` updates.
//!
//! After the partial segment was updated (`flush_lsn` was changed), the segment
//! will be uploaded to S3 within the configured `partial_backup_timeout`.
//!
//! The filename format for partial segments is
//! `Segment_Term_Flush_Commit_skNN.partial`, where:

View File

@@ -17,6 +17,7 @@ use crate::SafeKeeperConf;
use postgres_backend::{AuthType, PostgresBackend};
/// Accept incoming TCP connections and spawn them into a background thread.
///
/// allowed_auth_scope is either SafekeeperData (wide JWT tokens giving access
/// to any tenant are allowed) or Tenant (only tokens giving access to specific
/// tenant are allowed). Doesn't matter if auth is disabled in conf.

View File

@@ -98,7 +98,19 @@ pub struct PhysicalStorage {
/// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
write_lsn: Lsn,
/// The LSN of the last WAL record written to disk. Still can be not fully flushed.
/// The LSN of the last WAL record written to disk. Still can be not fully
/// flushed.
///
/// Note: Normally it (and flush_record_lsn) is <= write_lsn, but after xlog
/// switch ingest the reverse is true because we don't bump write_lsn up to
/// the next segment: WAL stream from the compute doesn't have the gap and
/// for simplicity / as a sanity check we disallow any non-sequential
/// writes, so write zeros as is.
///
/// Similar effect is in theory possible due to LSN alignment: if record
/// ends at *2, decoder will report end lsn as *8 even though we haven't
/// written these zeros yet. In practice compute likely never sends
/// non-aligned chunks of data.
write_record_lsn: Lsn,
/// The LSN of the last WAL record flushed to disk.
@@ -167,8 +179,7 @@ impl PhysicalStorage {
)
};
// TODO: do we really know that write_lsn is fully flushed to disk?
// If not, maybe it's better to call fsync() here to be sure?
// note: this assumes we fsync'ed whole datadir on start.
let flush_lsn = write_lsn;
debug!(
@@ -440,11 +451,12 @@ impl Storage for PhysicalStorage {
.with_label_values(&["truncate_wal"])
.start_timer();
// Streaming must not create a hole, so truncate cannot be called on non-written lsn
if self.write_lsn != Lsn(0) && end_pos > self.write_lsn {
// Streaming must not create a hole, so truncate cannot be called on
// non-written lsn.
if self.write_record_lsn != Lsn(0) && end_pos > self.write_record_lsn {
bail!(
"truncate_wal called on non-written WAL, write_lsn={}, end_pos={}",
self.write_lsn,
"truncate_wal called on non-written WAL, write_record_lsn={}, end_pos={}",
self.write_record_lsn,
end_pos
);
}

View File

@@ -134,7 +134,7 @@ class LLVM:
# Show a user-friendly warning
raise Exception(' '.join([
f"It appears that you don't have `{name}` installed.",
"Please execute `rustup component add llvm-tools-preview`,",
"Please execute `rustup component add llvm-tools`,",
"or install it via your package manager of choice.",
"LLVM tools should be the same version as LLVM in `rustc --version --verbose`.",
]))
@@ -518,7 +518,7 @@ def main() -> None:
example = f"""
prerequisites:
# alternatively, install a system package for `llvm-tools`
rustup component add llvm-tools-preview
rustup component add llvm-tools
self-contained example:
{app} run make

View File

@@ -0,0 +1 @@
ALTER TABLE tenant_shards DROP preferred_az_id;

View File

@@ -0,0 +1 @@
ALTER TABLE tenant_shards ADD preferred_az_id VARCHAR;

View File

@@ -14,14 +14,14 @@ use metrics::{BuildInfo, NeonMetrics};
use pageserver_api::controller_api::{
MetadataHealthListOutdatedRequest, MetadataHealthListOutdatedResponse,
MetadataHealthListUnhealthyResponse, MetadataHealthUpdateRequest, MetadataHealthUpdateResponse,
TenantCreateRequest,
ShardsPreferredAzsRequest, TenantCreateRequest,
};
use pageserver_api::models::{
TenantConfigRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
TenantTimeTravelRequest, TimelineArchivalConfigRequest, TimelineCreateRequest,
};
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
use pageserver_client::{mgmt_api, BlockUnblock};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;
@@ -369,6 +369,23 @@ async fn handle_tenant_timeline_detach_ancestor(
json_response(StatusCode::OK, res)
}
async fn handle_tenant_timeline_block_unblock_gc(
service: Arc<Service>,
req: Request<Body>,
dir: BlockUnblock,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
check_permissions(&req, Scope::PageServerApi)?;
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
service
.tenant_timeline_block_unblock_gc(tenant_id, timeline_id, dir)
.await?;
json_response(StatusCode::OK, ())
}
async fn handle_tenant_timeline_passthrough(
service: Arc<Service>,
req: Request<Body>,
@@ -539,6 +556,17 @@ async fn handle_node_status(req: Request<Body>) -> Result<Response<Body>, ApiErr
json_response(StatusCode::OK, node_status)
}
async fn handle_node_shards(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?;
let node_status = state.service.get_node_shards(node_id).await?;
json_response(StatusCode::OK, node_status)
}
async fn handle_get_leader(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
@@ -688,6 +716,18 @@ async fn handle_tenant_update_policy(mut req: Request<Body>) -> Result<Response<
)
}
async fn handle_update_preferred_azs(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let azs_req = json_request::<ShardsPreferredAzsRequest>(&mut req).await?;
let state = get_state(&req);
json_response(
StatusCode::OK,
state.service.update_shards_preferred_azs(azs_req).await?,
)
}
async fn handle_step_down(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
@@ -1097,6 +1137,13 @@ pub fn make_router(
.get("/control/v1/node/:node_id", |r| {
named_request_span(r, handle_node_status, RequestName("control_v1_node_status"))
})
.get("/control/v1/node/:node_id/shards", |r| {
named_request_span(
r,
handle_node_shards,
RequestName("control_v1_node_describe"),
)
})
.get("/control/v1/leader", |r| {
named_request_span(r, handle_get_leader, RequestName("control_v1_get_leader"))
})
@@ -1174,6 +1221,13 @@ pub fn make_router(
RequestName("control_v1_tenant_policy"),
)
})
.put("/control/v1/preferred_azs", |r| {
named_request_span(
r,
handle_update_preferred_azs,
RequestName("control_v1_preferred_azs"),
)
})
.put("/control/v1/step_down", |r| {
named_request_span(r, handle_step_down, RequestName("control_v1_step_down"))
})
@@ -1255,6 +1309,26 @@ pub fn make_router(
)
},
)
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/block_gc",
|r| {
tenant_service_handler(
r,
|s, r| handle_tenant_timeline_block_unblock_gc(s, r, BlockUnblock::Block),
RequestName("v1_tenant_timeline_block_unblock_gc"),
)
},
)
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/unblock_gc",
|r| {
tenant_service_handler(
r,
|s, r| handle_tenant_timeline_block_unblock_gc(s, r, BlockUnblock::Unblock),
RequestName("v1_tenant_timeline_block_unblock_gc"),
)
},
)
// Tenant detail GET passthrough to shard zero:
.get("/v1/tenant/:tenant_id", |r| {
tenant_service_handler(

View File

@@ -7,7 +7,10 @@ use pageserver_api::{
},
shard::TenantShardId,
};
use pageserver_client::mgmt_api::{Client, Result};
use pageserver_client::{
mgmt_api::{Client, Result},
BlockUnblock,
};
use reqwest::StatusCode;
use utils::id::{NodeId, TenantId, TimelineId};
@@ -258,6 +261,24 @@ impl PageserverClient {
)
}
pub(crate) async fn timeline_block_unblock_gc(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
dir: BlockUnblock,
) -> Result<()> {
// measuring these makes no sense because we synchronize with the gc loop and remote
// storage on block_gc so there should be huge outliers
measured_request!(
"timeline_block_unblock_gc",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner
.timeline_block_unblock_gc(tenant_shard_id, timeline_id, dir)
.await
)
}
pub(crate) async fn get_utilization(&self) -> Result<PageserverUtilization> {
measured_request!(
"utilization",

View File

@@ -105,6 +105,7 @@ pub(crate) enum DatabaseOperation {
ListMetadataHealthOutdated,
GetLeader,
UpdateLeader,
SetPreferredAzs,
}
#[must_use]
@@ -664,6 +665,33 @@ impl Persistence {
Ok(())
}
pub(crate) async fn set_tenant_shard_preferred_azs(
&self,
preferred_azs: Vec<(TenantShardId, String)>,
) -> DatabaseResult<Vec<(TenantShardId, String)>> {
use crate::schema::tenant_shards::dsl::*;
self.with_measured_conn(DatabaseOperation::SetPreferredAzs, move |conn| {
let mut shards_updated = Vec::default();
for (tenant_shard_id, preferred_az) in preferred_azs.iter() {
let updated = diesel::update(tenant_shards)
.filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
.filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
.filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
.set(preferred_az_id.eq(preferred_az))
.execute(conn)?;
if updated == 1 {
shards_updated.push((*tenant_shard_id, preferred_az.clone()));
}
}
Ok(shards_updated)
})
.await
}
pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
use crate::schema::tenant_shards::dsl::*;
self.with_measured_conn(DatabaseOperation::Detach, move |conn| {
@@ -1050,6 +1078,11 @@ pub(crate) struct TenantShardPersistence {
pub(crate) config: String,
#[serde(default)]
pub(crate) scheduling_policy: String,
// Hint that we should attempt to schedule this tenant shard the given
// availability zone in order to minimise the chances of cross-AZ communication
// with compute.
pub(crate) preferred_az_id: Option<String>,
}
impl TenantShardPersistence {

View File

@@ -41,6 +41,7 @@ diesel::table! {
splitting -> Int2,
config -> Text,
scheduling_policy -> Varchar,
preferred_az_id -> Nullable<Varchar>,
}
}

View File

@@ -25,7 +25,7 @@ use crate::{
ShardGenerationState, TenantFilter,
},
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
tenant_shard::{
MigrateAttachment, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization,
ScheduleOptimizationAction,
@@ -41,7 +41,8 @@ use itertools::Itertools;
use pageserver_api::{
controller_api::{
MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability, NodeRegisterRequest,
NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy, TenantCreateRequest,
NodeSchedulingPolicy, NodeShard, NodeShardResponse, PlacementPolicy, ShardSchedulingPolicy,
ShardsPreferredAzsRequest, ShardsPreferredAzsResponse, TenantCreateRequest,
TenantCreateResponse, TenantCreateResponseShard, TenantDescribeResponse,
TenantDescribeResponseShard, TenantLocateResponse, TenantPolicyRequest,
TenantShardMigrateRequest, TenantShardMigrateResponse,
@@ -68,7 +69,7 @@ use pageserver_api::{
ValidateResponse, ValidateResponseTenant,
},
};
use pageserver_client::mgmt_api;
use pageserver_client::{mgmt_api, BlockUnblock};
use tokio::sync::mpsc::error::TrySendError;
use tokio_util::sync::CancellationToken;
use utils::{
@@ -116,7 +117,9 @@ pub(crate) const STARTUP_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
pub const MAX_OFFLINE_INTERVAL_DEFAULT: Duration = Duration::from_secs(30);
/// How long a node may be unresponsive to heartbeats during start up before we declare it
/// offline. This is much more lenient than [`MAX_OFFLINE_INTERVAL_DEFAULT`] since the pageserver's
/// offline.
///
/// This is much more lenient than [`MAX_OFFLINE_INTERVAL_DEFAULT`] since the pageserver's
/// handling of the re-attach response may take a long time and blocks heartbeats from
/// being handled on the pageserver side.
pub const MAX_WARMING_UP_INTERVAL_DEFAULT: Duration = Duration::from_secs(300);
@@ -139,6 +142,7 @@ enum TenantOperations {
AttachHook,
TimelineArchivalConfig,
TimelineDetachAncestor,
TimelineGcBlockUnblock,
}
#[derive(Clone, strum_macros::Display)]
@@ -353,6 +357,12 @@ impl From<DatabaseError> for ApiError {
}
}
enum InitialShardScheduleOutcome {
Scheduled(TenantCreateResponseShard),
NotScheduled,
ShardScheduleError(ScheduleError),
}
pub struct Service {
inner: Arc<std::sync::RwLock<ServiceState>>,
config: Config,
@@ -442,7 +452,7 @@ struct ShardSplitParams {
// When preparing for a shard split, we may either choose to proceed with the split,
// or find that the work is already done and return NoOp.
enum ShardSplitAction {
Split(ShardSplitParams),
Split(Box<ShardSplitParams>),
NoOp(TenantShardSplitResponse),
}
@@ -1452,6 +1462,7 @@ impl Service {
splitting: SplitState::default(),
scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
.unwrap(),
preferred_az_id: None,
};
match self.persistence.insert_tenant_shards(vec![tsp]).await {
@@ -2023,6 +2034,7 @@ impl Service {
splitting: SplitState::default(),
scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
.unwrap(),
preferred_az_id: None,
})
.collect();
@@ -2046,99 +2058,87 @@ impl Service {
};
let mut schedule_context = ScheduleContext::default();
let mut schedule_error = None;
let mut response_shards = Vec::new();
for tenant_shard_id in create_ids {
tracing::info!("Creating shard {tenant_shard_id}...");
let (waiters, response_shards) = {
let outcome = self
.do_initial_shard_scheduling(
tenant_shard_id,
initial_generation,
&create_req.shard_parameters,
create_req.config.clone(),
placement_policy.clone(),
&mut schedule_context,
)
.await;
match outcome {
InitialShardScheduleOutcome::Scheduled(resp) => response_shards.push(resp),
InitialShardScheduleOutcome::NotScheduled => {}
InitialShardScheduleOutcome::ShardScheduleError(err) => {
schedule_error = Some(err);
}
}
}
let preferred_azs = {
let locked = self.inner.read().unwrap();
response_shards
.iter()
.filter_map(|resp| {
let az_id = locked
.nodes
.get(&resp.node_id)
.map(|n| n.get_availability_zone_id().to_string())?;
Some((resp.shard_id, az_id))
})
.collect::<Vec<_>>()
};
// Note that we persist the preferred AZ for the new shards separately.
// In theory, we could "peek" the scheduler to determine where the shard will
// land, but the subsequent "real" call into the scheduler might select a different
// node. Hence, we do this awkward update to keep things consistent.
let updated = self
.persistence
.set_tenant_shard_preferred_azs(preferred_azs)
.await
.map_err(|err| {
ApiError::InternalServerError(anyhow::anyhow!(
"Failed to persist preferred az ids: {err}"
))
})?;
{
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
let mut response_shards = Vec::new();
let mut schcedule_error = None;
for tenant_shard_id in create_ids {
tracing::info!("Creating shard {tenant_shard_id}...");
use std::collections::btree_map::Entry;
match tenants.entry(tenant_shard_id) {
Entry::Occupied(mut entry) => {
tracing::info!(
"Tenant shard {tenant_shard_id} already exists while creating"
);
// TODO: schedule() should take an anti-affinity expression that pushes
// attached and secondary locations (independently) away frorm those
// pageservers also holding a shard for this tenant.
entry
.get_mut()
.schedule(scheduler, &mut schedule_context)
.map_err(|e| {
ApiError::Conflict(format!(
"Failed to schedule shard {tenant_shard_id}: {e}"
))
})?;
if let Some(node_id) = entry.get().intent.get_attached() {
let generation = entry
.get()
.generation
.expect("Generation is set when in attached mode");
response_shards.push(TenantCreateResponseShard {
shard_id: tenant_shard_id,
node_id: *node_id,
generation: generation.into().unwrap(),
});
}
continue;
}
Entry::Vacant(entry) => {
let state = entry.insert(TenantShard::new(
tenant_shard_id,
ShardIdentity::from_params(
tenant_shard_id.shard_number,
&create_req.shard_parameters,
),
placement_policy.clone(),
));
state.generation = initial_generation;
state.config = create_req.config.clone();
if let Err(e) = state.schedule(scheduler, &mut schedule_context) {
schcedule_error = Some(e);
}
// Only include shards in result if we are attaching: the purpose
// of the response is to tell the caller where the shards are attached.
if let Some(node_id) = state.intent.get_attached() {
let generation = state
.generation
.expect("Generation is set when in attached mode");
response_shards.push(TenantCreateResponseShard {
shard_id: tenant_shard_id,
node_id: *node_id,
generation: generation.into().unwrap(),
});
}
}
};
for (tid, az_id) in updated {
if let Some(shard) = locked.tenants.get_mut(&tid) {
shard.set_preferred_az(az_id);
}
}
}
// If we failed to schedule shards, then they are still created in the controller,
// but we return an error to the requester to avoid a silent failure when someone
// tries to e.g. create a tenant whose placement policy requires more nodes than
// are present in the system. We do this here rather than in the above loop, to
// avoid situations where we only create a subset of shards in the tenant.
if let Some(e) = schcedule_error {
return Err(ApiError::Conflict(format!(
"Failed to schedule shard(s): {e}"
)));
}
// If we failed to schedule shards, then they are still created in the controller,
// but we return an error to the requester to avoid a silent failure when someone
// tries to e.g. create a tenant whose placement policy requires more nodes than
// are present in the system. We do this here rather than in the above loop, to
// avoid situations where we only create a subset of shards in the tenant.
if let Some(e) = schedule_error {
return Err(ApiError::Conflict(format!(
"Failed to schedule shard(s): {e}"
)));
}
let waiters = tenants
let waiters = {
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, _scheduler) = locked.parts_mut();
tenants
.range_mut(TenantShardId::tenant_range(tenant_id))
.filter_map(|(_shard_id, shard)| self.maybe_reconcile_shard(shard, nodes))
.collect::<Vec<_>>();
(waiters, response_shards)
.collect::<Vec<_>>()
};
Ok((
@@ -2149,6 +2149,78 @@ impl Service {
))
}
/// Helper for tenant creation that does the scheduling for an individual shard. Covers both the
/// case of a new tenant and a pre-existing one.
async fn do_initial_shard_scheduling(
&self,
tenant_shard_id: TenantShardId,
initial_generation: Option<Generation>,
shard_params: &ShardParameters,
config: TenantConfig,
placement_policy: PlacementPolicy,
schedule_context: &mut ScheduleContext,
) -> InitialShardScheduleOutcome {
let mut locked = self.inner.write().unwrap();
let (_nodes, tenants, scheduler) = locked.parts_mut();
use std::collections::btree_map::Entry;
match tenants.entry(tenant_shard_id) {
Entry::Occupied(mut entry) => {
tracing::info!("Tenant shard {tenant_shard_id} already exists while creating");
// TODO: schedule() should take an anti-affinity expression that pushes
// attached and secondary locations (independently) away frorm those
// pageservers also holding a shard for this tenant.
if let Err(err) = entry.get_mut().schedule(scheduler, schedule_context) {
return InitialShardScheduleOutcome::ShardScheduleError(err);
}
if let Some(node_id) = entry.get().intent.get_attached() {
let generation = entry
.get()
.generation
.expect("Generation is set when in attached mode");
InitialShardScheduleOutcome::Scheduled(TenantCreateResponseShard {
shard_id: tenant_shard_id,
node_id: *node_id,
generation: generation.into().unwrap(),
})
} else {
InitialShardScheduleOutcome::NotScheduled
}
}
Entry::Vacant(entry) => {
let state = entry.insert(TenantShard::new(
tenant_shard_id,
ShardIdentity::from_params(tenant_shard_id.shard_number, shard_params),
placement_policy,
));
state.generation = initial_generation;
state.config = config;
if let Err(e) = state.schedule(scheduler, schedule_context) {
return InitialShardScheduleOutcome::ShardScheduleError(e);
}
// Only include shards in result if we are attaching: the purpose
// of the response is to tell the caller where the shards are attached.
if let Some(node_id) = state.intent.get_attached() {
let generation = state
.generation
.expect("Generation is set when in attached mode");
InitialShardScheduleOutcome::Scheduled(TenantCreateResponseShard {
shard_id: tenant_shard_id,
node_id: *node_id,
generation: generation.into().unwrap(),
})
} else {
InitialShardScheduleOutcome::NotScheduled
}
}
}
}
/// Helper for functions that reconcile a number of shards, and would like to do a timeout-bounded
/// wait for reconciliation to complete before responding.
async fn await_waiters(
@@ -3126,6 +3198,57 @@ impl Service {
}).await?
}
pub(crate) async fn tenant_timeline_block_unblock_gc(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
dir: BlockUnblock,
) -> Result<(), ApiError> {
let _tenant_lock = trace_shared_lock(
&self.tenant_op_locks,
tenant_id,
TenantOperations::TimelineGcBlockUnblock,
)
.await;
self.tenant_remote_mutation(tenant_id, move |targets| async move {
if targets.is_empty() {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant not found").into(),
));
}
async fn do_one(
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
node: Node,
jwt: Option<String>,
dir: BlockUnblock,
) -> Result<(), ApiError> {
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref());
client
.timeline_block_unblock_gc(tenant_shard_id, timeline_id, dir)
.await
.map_err(|e| passthrough_api_error(&node, e))
}
// no shard needs to go first/last; the operation should be idempotent
self.tenant_for_shards(targets, |tenant_shard_id, node| {
futures::FutureExt::boxed(do_one(
tenant_shard_id,
timeline_id,
node,
self.config.jwt_token.clone(),
dir,
))
})
.await
})
.await??;
Ok(())
}
/// Helper for concurrently calling a pageserver API on a number of shards, such as timeline creation.
///
/// On success, the returned vector contains exactly the same number of elements as the input `locations`.
@@ -3511,6 +3634,7 @@ impl Service {
is_pending_compute_notification: shard.pending_compute_notification,
is_splitting: matches!(shard.splitting, SplitState::Splitting),
scheduling_policy: *shard.get_scheduling_policy(),
preferred_az_id: shard.preferred_az().map(ToString::to_string),
})
}
@@ -4114,7 +4238,7 @@ impl Service {
let policy = policy.unwrap();
let config = config.unwrap();
Ok(ShardSplitAction::Split(ShardSplitParams {
Ok(ShardSplitAction::Split(Box::new(ShardSplitParams {
old_shard_count,
new_shard_count: ShardCount::new(split_req.new_shard_count),
new_stripe_size: split_req.new_stripe_size,
@@ -4122,13 +4246,13 @@ impl Service {
policy,
config,
shard_ident,
}))
})))
}
async fn do_tenant_shard_split(
&self,
tenant_id: TenantId,
params: ShardSplitParams,
params: Box<ShardSplitParams>,
) -> Result<(TenantShardSplitResponse, Vec<ReconcilerWaiter>), ApiError> {
// FIXME: we have dropped self.inner lock, and not yet written anything to the database: another
// request could occur here, deleting or mutating the tenant. begin_shard_split checks that the
@@ -4144,7 +4268,7 @@ impl Service {
policy,
config,
shard_ident,
} = params;
} = *params;
// Drop any secondary locations: pageservers do not support splitting these, and in any case the
// end-state for a split tenant will usually be to have secondary locations on different nodes.
@@ -4214,9 +4338,10 @@ impl Service {
config: serde_json::to_string(&config).unwrap(),
splitting: SplitState::Splitting,
// Scheduling policies do not carry through to children
// Scheduling policies and preferred AZ do not carry through to children
scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
.unwrap(),
preferred_az_id: None,
});
}
@@ -4336,6 +4461,47 @@ impl Service {
let (response, child_locations, waiters) =
self.tenant_shard_split_commit_inmem(tenant_id, new_shard_count, new_stripe_size);
// Now that we have scheduled the child shards, attempt to set their preferred AZ
// to that of the pageserver they've been attached on.
let preferred_azs = {
let locked = self.inner.read().unwrap();
child_locations
.iter()
.filter_map(|(tid, node_id, _stripe_size)| {
let az_id = locked
.nodes
.get(node_id)
.map(|n| n.get_availability_zone_id().to_string())?;
Some((*tid, az_id))
})
.collect::<Vec<_>>()
};
let updated = self
.persistence
.set_tenant_shard_preferred_azs(preferred_azs)
.await
.map_err(|err| {
ApiError::InternalServerError(anyhow::anyhow!(
"Failed to persist preferred az ids: {err}"
))
});
match updated {
Ok(updated) => {
let mut locked = self.inner.write().unwrap();
for (tid, az_id) in updated {
if let Some(shard) = locked.tenants.get_mut(&tid) {
shard.set_preferred_az(az_id);
}
}
}
Err(err) => {
tracing::warn!("Failed to persist preferred AZs after split: {err}");
}
}
// Send compute notifications for all the new shards
let mut failed_notifications = Vec::new();
for (child_id, child_ps, stripe_size) in child_locations {
@@ -4810,6 +4976,45 @@ impl Service {
))
}
pub(crate) async fn get_node_shards(
&self,
node_id: NodeId,
) -> Result<NodeShardResponse, ApiError> {
let locked = self.inner.read().unwrap();
let mut shards = Vec::new();
for (tid, tenant) in locked.tenants.iter() {
let is_intended_secondary = match (
tenant.intent.get_attached() == &Some(node_id),
tenant.intent.get_secondary().contains(&node_id),
) {
(true, true) => {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"{} attached as primary+secondary on the same node",
tid
)))
}
(true, false) => Some(false),
(false, true) => Some(true),
(false, false) => None,
};
let is_observed_secondary = if let Some(ObservedStateLocation { conf: Some(conf) }) =
tenant.observed.locations.get(&node_id)
{
Some(conf.secondary_conf.is_some())
} else {
None
};
if is_intended_secondary.is_some() || is_observed_secondary.is_some() {
shards.push(NodeShard {
tenant_shard_id: *tid,
is_intended_secondary,
is_observed_secondary,
});
}
}
Ok(NodeShardResponse { node_id, shards })
}
pub(crate) async fn get_leader(&self) -> DatabaseResult<Option<ControllerPersistence>> {
self.persistence.get_leader().await
}
@@ -6497,4 +6702,35 @@ impl Service {
) -> Result<(), DatabaseError> {
self.persistence.safekeeper_upsert(record).await
}
pub(crate) async fn update_shards_preferred_azs(
&self,
req: ShardsPreferredAzsRequest,
) -> Result<ShardsPreferredAzsResponse, ApiError> {
let preferred_azs = req.preferred_az_ids.into_iter().collect::<Vec<_>>();
let updated = self
.persistence
.set_tenant_shard_preferred_azs(preferred_azs)
.await
.map_err(|err| {
ApiError::InternalServerError(anyhow::anyhow!(
"Failed to persist preferred AZs: {err}"
))
})?;
let mut updated_in_mem_and_db = Vec::default();
let mut locked = self.inner.write().unwrap();
for (tid, az_id) in updated {
let shard = locked.tenants.get_mut(&tid);
if let Some(shard) = shard {
shard.set_preferred_az(az_id);
updated_in_mem_and_db.push(tid);
}
}
Ok(ShardsPreferredAzsResponse {
updated: updated_in_mem_and_db,
})
}
}

View File

@@ -140,6 +140,10 @@ pub(crate) struct TenantShard {
// Support/debug tool: if something is going wrong or flapping with scheduling, this may
// be set to a non-active state to avoid making changes while the issue is fixed.
scheduling_policy: ShardSchedulingPolicy,
// We should attempt to schedule this shard in the provided AZ to
// decrease chances of cross-AZ compute.
preferred_az_id: Option<String>,
}
#[derive(Default, Clone, Debug, Serialize)]
@@ -463,6 +467,7 @@ impl TenantShard {
last_error: Arc::default(),
pending_compute_notification: false,
scheduling_policy: ShardSchedulingPolicy::default(),
preferred_az_id: None,
}
}
@@ -1297,6 +1302,7 @@ impl TenantShard {
pending_compute_notification: false,
delayed_reconcile: false,
scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
preferred_az_id: tsp.preferred_az_id,
})
}
@@ -1312,8 +1318,17 @@ impl TenantShard {
config: serde_json::to_string(&self.config).unwrap(),
splitting: SplitState::default(),
scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
preferred_az_id: self.preferred_az_id.clone(),
}
}
pub(crate) fn preferred_az(&self) -> Option<&str> {
self.preferred_az_id.as_deref()
}
pub(crate) fn set_preferred_az(&mut self, preferred_az_id: String) {
self.preferred_az_id = Some(preferred_az_id);
}
}
#[cfg(test)]

View File

@@ -1,6 +1,7 @@
//! Functionality for finding and purging garbage, as in "garbage collection". Garbage means
//! S3 objects which are either not referenced by any metadata, or are referenced by a
//! control plane tenant/timeline in a deleted state.
//! Functionality for finding and purging garbage, as in "garbage collection".
//!
//! Garbage means S3 objects which are either not referenced by any metadata,
//! or are referenced by a control plane tenant/timeline in a deleted state.
use std::{
collections::{HashMap, HashSet},

View File

@@ -74,7 +74,9 @@ pub async fn stream_tenant_shards<'a>(
}
/// Given a `TenantShardId`, output a stream of the timelines within that tenant, discovered
/// using a listing. The listing is done before the stream is built, so that this
/// using a listing.
///
/// The listing is done before the stream is built, so that this
/// function can be used to generate concurrency on a stream using buffer_unordered.
pub async fn stream_tenant_timelines<'a>(
remote_client: &'a GenericRemoteStorage,

View File

@@ -440,9 +440,10 @@ async fn gc_ancestor(
Ok(())
}
/// Physical garbage collection: removing unused S3 objects. This is distinct from the garbage collection
/// done inside the pageserver, which operates at a higher level (keys, layers). This type of garbage collection
/// is about removing:
/// Physical garbage collection: removing unused S3 objects.
///
/// This is distinct from the garbage collection done inside the pageserver, which operates at a higher level
/// (keys, layers). This type of garbage collection is about removing:
/// - Objects that were uploaded but never referenced in the remote index (e.g. because of a shutdown between
/// uploading a layer and uploading an index)
/// - Index objects from historic generations

View File

@@ -140,6 +140,14 @@ class TenantId(Id):
return self.id.hex()
class NodeId(Id):
def __repr__(self) -> str:
return f'`NodeId("{self.id.hex()}")'
def __str__(self) -> str:
return self.id.hex()
class TimelineId(Id):
def __repr__(self) -> str:
return f'TimelineId("{self.id.hex()}")'

View File

@@ -62,7 +62,7 @@ from urllib3.util.retry import Retry
from fixtures import overlayfs
from fixtures.broker import NeonBroker
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.common_types import Lsn, NodeId, TenantId, TenantShardId, TimelineId
from fixtures.endpoint.http import EndpointHttpClient
from fixtures.log_helper import log
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
@@ -2560,7 +2560,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
def tenant_describe(self, tenant_id: TenantId):
"""
:return: list of {"shard_id": "", "node_id": int, "listen_pg_addr": str, "listen_pg_port": int, "listen_http_addr: str, "listen_http_port: int}
:return: list of {"shard_id": "", "node_id": int, "listen_pg_addr": str, "listen_pg_port": int, "listen_http_addr: str, "listen_http_port: int, preferred_az_id: str}
"""
response = self.request(
"GET",
@@ -2570,6 +2570,30 @@ class NeonStorageController(MetricsGetter, LogUtils):
response.raise_for_status()
return response.json()
def nodes(self):
"""
:return: list of {"id": ""}
"""
response = self.request(
"GET",
f"{self.api}/control/v1/node",
headers=self.headers(TokenScope.ADMIN),
)
response.raise_for_status()
return response.json()
def node_shards(self, node_id: NodeId):
"""
:return: list of {"shard_id": "", "is_secondary": bool}
"""
response = self.request(
"GET",
f"{self.api}/control/v1/node/{node_id}/shards",
headers=self.headers(TokenScope.ADMIN),
)
response.raise_for_status()
return response.json()
def tenant_shard_split(
self, tenant_id: TenantId, shard_count: int, shard_stripe_size: Optional[int] = None
) -> list[TenantShardId]:
@@ -2886,6 +2910,17 @@ class NeonStorageController(MetricsGetter, LogUtils):
return None
raise e
def set_preferred_azs(self, preferred_azs: dict[TenantShardId, str]) -> list[TenantShardId]:
response = self.request(
"PUT",
f"{self.api}/control/v1/preferred_azs",
headers=self.headers(TokenScope.ADMIN),
json={str(tid): az for tid, az in preferred_azs.items()},
)
response.raise_for_status()
return [TenantShardId.parse(tid) for tid in response.json()["updated"]]
def __enter__(self) -> "NeonStorageController":
return self

View File

@@ -50,6 +50,19 @@ class SafekeeperMetrics(Metrics):
).value
@dataclass
class TermBumpResponse:
previous_term: int
current_term: int
@classmethod
def from_json(cls, d: Dict[str, Any]) -> "TermBumpResponse":
return TermBumpResponse(
previous_term=d["previous_term"],
current_term=d["current_term"],
)
class SafekeeperHttpClient(requests.Session, MetricsGetter):
HTTPError = requests.HTTPError
@@ -252,6 +265,22 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
res.raise_for_status()
return res.json()
def term_bump(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
term: Optional[int],
) -> TermBumpResponse:
body = {}
if term is not None:
body["term"] = term
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/term_bump",
json=body,
)
res.raise_for_status()
return TermBumpResponse.from_json(res.json())
def record_safekeeper_info(self, tenant_id: TenantId, timeline_id: TimelineId, body):
res = self.post(
f"http://localhost:{self.port}/v1/record_safekeeper_info/{tenant_id}/{timeline_id}",

View File

@@ -1552,6 +1552,12 @@ def test_tenant_import(neon_env_builder: NeonEnvBuilder, shard_count, remote_sto
literal_shard_count = 1 if shard_count is None else shard_count
assert len(describe["shards"]) == literal_shard_count
nodes = env.storage_controller.nodes()
assert len(nodes) == 2
describe1 = env.storage_controller.node_shards(nodes[0]["id"])
describe2 = env.storage_controller.node_shards(nodes[1]["id"])
assert len(describe1["shards"]) + len(describe2["shards"]) == literal_shard_count
# Check the data is still there: this implicitly proves that we recovered generation numbers
# properly, for the timeline which was written to after a generation bump.
for timeline, branch, expect_rows in [
@@ -2512,3 +2518,55 @@ def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool:
del d[key]
return compared[0] == compared[1]
@run_only_on_default_postgres("this is like a 'unit test' against storcon db")
def test_shard_preferred_azs(neon_env_builder: NeonEnvBuilder):
def assign_az(ps_cfg):
az = f"az-{ps_cfg['id']}"
ps_cfg["availability_zone"] = az
neon_env_builder.pageserver_config_override = assign_az
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
env.start()
tids = [TenantId.generate() for _ in range(0, 3)]
for tid in tids:
env.storage_controller.tenant_create(tid)
shards = env.storage_controller.tenant_describe(tid)["shards"]
assert len(shards) == 1
attached_to = shards[0]["node_attached"]
expected_az = env.get_pageserver(attached_to).az_id
assert shards[0]["preferred_az_id"] == expected_az
updated = env.storage_controller.set_preferred_azs(
{TenantShardId(tid, 0, 0): "foo" for tid in tids}
)
assert set(updated) == set([TenantShardId(tid, 0, 0) for tid in tids])
for tid in tids:
shards = env.storage_controller.tenant_describe(tid)["shards"]
assert len(shards) == 1
assert shards[0]["preferred_az_id"] == "foo"
# Generate a layer to avoid shard split handling on ps from tripping
# up on debug assert.
timeline_id = TimelineId.generate()
env.neon_cli.create_timeline("bar", tids[0], timeline_id)
workload = Workload(env, tids[0], timeline_id, branch_name="bar")
workload.init()
workload.write_rows(256)
workload.validate()
env.storage_controller.tenant_shard_split(tids[0], shard_count=2)
shards = env.storage_controller.tenant_describe(tids[0])["shards"]
assert len(shards) == 2
for shard in shards:
attached_to = shard["node_attached"]
expected_az = env.get_pageserver(attached_to).az_id
assert shard["preferred_az_id"] == expected_az

View File

@@ -1,17 +1,32 @@
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from typing import List, Optional
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
LogCursor,
NeonEnvBuilder,
NeonPageserver,
)
from fixtures.pageserver.utils import wait_timeline_detail_404
def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize("sharded", [True, False])
def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder, sharded: bool):
neon_env_builder.num_pageservers = 2 if sharded else 1
env = neon_env_builder.init_start(
initial_tenant_conf={"gc_period": "1s", "lsn_lease_length": "0s"}
initial_tenant_conf={"gc_period": "1s", "lsn_lease_length": "0s"},
initial_tenant_shard_count=2 if sharded else None,
)
ps = env.pageserver
http = ps.http_client()
if sharded:
http = env.storage_controller.pageserver_api()
else:
http = env.pageserver.http_client()
pss = ManyPageservers(list(map(lambda ps: ScrollableLog(ps, None), env.pageservers)))
foo_branch = env.neon_cli.create_branch("foo", "main", env.initial_tenant)
@@ -22,9 +37,8 @@ def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder):
tenant_before = http.tenant_status(env.initial_tenant)
wait_for_another_gc_round()
_, offset = ps.assert_log_contains(gc_active_line)
assert ps.log_contains(gc_skipped_line, offset) is None
pss.assert_log_contains(gc_active_line)
pss.assert_log_does_not_contain(gc_skipped_line)
http.timeline_block_gc(env.initial_tenant, foo_branch)
@@ -34,34 +48,78 @@ def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder):
assert gc_blocking == "BlockingReasons { timelines: 1, reasons: EnumSet(Manual) }"
wait_for_another_gc_round()
_, offset = ps.assert_log_contains(gc_skipped_line, offset)
pss.assert_log_contains(gc_skipped_line)
ps.restart()
ps.quiesce_tenants()
pss.restart()
pss.quiesce_tenants()
_, offset = env.pageserver.assert_log_contains(init_gc_skipped, offset)
pss.assert_log_contains(init_gc_skipped)
wait_for_another_gc_round()
_, offset = ps.assert_log_contains(gc_skipped_line, offset)
pss.assert_log_contains(gc_skipped_line)
# deletion unblocks gc
http.timeline_delete(env.initial_tenant, foo_branch)
wait_timeline_detail_404(http, env.initial_tenant, foo_branch, 10, 1.0)
wait_for_another_gc_round()
_, offset = ps.assert_log_contains(gc_active_line, offset)
pss.assert_log_contains(gc_active_line)
http.timeline_block_gc(env.initial_tenant, env.initial_timeline)
wait_for_another_gc_round()
_, offset = ps.assert_log_contains(gc_skipped_line, offset)
pss.assert_log_contains(gc_skipped_line)
# removing the manual block also unblocks gc
http.timeline_unblock_gc(env.initial_tenant, env.initial_timeline)
wait_for_another_gc_round()
_, offset = ps.assert_log_contains(gc_active_line, offset)
pss.assert_log_contains(gc_active_line)
def wait_for_another_gc_round():
time.sleep(2)
@dataclass
class ScrollableLog:
pageserver: NeonPageserver
offset: Optional[LogCursor]
def assert_log_contains(self, what: str):
msg, offset = self.pageserver.assert_log_contains(what, offset=self.offset)
old = self.offset
self.offset = offset
log.info(f"{old} -> {offset}: {msg}")
def assert_log_does_not_contain(self, what: str):
assert self.pageserver.log_contains(what) is None
@dataclass(frozen=True)
class ManyPageservers:
many: List[ScrollableLog]
def assert_log_contains(self, what: str):
for one in self.many:
one.assert_log_contains(what)
def assert_log_does_not_contain(self, what: str):
for one in self.many:
one.assert_log_does_not_contain(what)
def restart(self):
def do_restart(x: ScrollableLog):
x.pageserver.restart()
with ThreadPoolExecutor(max_workers=len(self.many)) as rt:
rt.map(do_restart, self.many)
rt.shutdown(wait=True)
def quiesce_tenants(self):
def do_quiesce(x: ScrollableLog):
x.pageserver.quiesce_tenants()
with ThreadPoolExecutor(max_workers=len(self.many)) as rt:
rt.map(do_quiesce, self.many)
rt.shutdown(wait=True)

View File

@@ -1057,6 +1057,24 @@ def test_restart_endpoint(neon_env_builder: NeonEnvBuilder):
endpoint.start()
# Try restarting endpoint immediately after xlog switch.
# https://github.com/neondatabase/neon/issues/8911
def test_restart_endpoint_after_switch_wal(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
endpoint.safe_psql("create table t (i int)")
endpoint.safe_psql("SELECT pg_switch_wal()")
# we want immediate shutdown to have endpoint restart on xlog switch record,
# so prevent shutdown checkpoint.
endpoint.stop(mode="immediate")
endpoint = env.endpoints.create_start("main")
endpoint.safe_psql("SELECT 'works'")
# Context manager which logs passed time on exit.
class DurationLogger:
def __init__(self, desc):
@@ -2176,6 +2194,43 @@ def test_patch_control_file(neon_env_builder: NeonEnvBuilder):
assert res["timelines"][0]["control_file"]["timeline_start_lsn"] == "0/1"
def test_term_bump(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main")
# initialize safekeeper
endpoint.safe_psql("create table t(key int, value text)")
http_cli = env.safekeepers[0].http_client()
# check that bump up to specific term works
curr_term = http_cli.timeline_status(tenant_id, timeline_id).term
bump_to = curr_term + 3
res = http_cli.term_bump(tenant_id, timeline_id, bump_to)
log.info(f"bump to {bump_to} res: {res}")
assert res.current_term >= bump_to
# check that bump to none increments current term
res = http_cli.term_bump(tenant_id, timeline_id, None)
log.info(f"bump to None res: {res}")
assert res.current_term > bump_to
assert res.current_term > res.previous_term
# check that bumping doesn't work downward
res = http_cli.term_bump(tenant_id, timeline_id, 2)
log.info(f"bump to 2 res: {res}")
assert res.current_term > bump_to
assert res.current_term == res.previous_term
# check that this doesn't kill endpoint because last WAL flush was his and
# thus its basebackup is still good
endpoint.safe_psql("insert into t values (1, 'payload')")
# Test disables periodic pushes from safekeeper to the broker and checks that
# pageserver can still discover safekeepers with discovery requests.
def test_broker_discovery(neon_env_builder: NeonEnvBuilder):

View File

@@ -8,6 +8,8 @@ version = "0.1.0"
description = "workspace-hack package, managed by hakari"
# You can choose to publish this crate: see https://docs.rs/cargo-hakari/latest/cargo_hakari/publishing.
publish = false
edition.workspace = true
license.workspace = true
# The parts of the file between the BEGIN HAKARI SECTION and END HAKARI SECTION comments
# are managed by hakari.