mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 22:50:38 +00:00
Compare commits
27 Commits
joonas/syn
...
vlad/test-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b2cb10590e | ||
|
|
2923fd2a5b | ||
|
|
2a5336b9ab | ||
|
|
6f20726610 | ||
|
|
29f741e1e9 | ||
|
|
2b37a40079 | ||
|
|
af2b65a2fb | ||
|
|
5d194c7824 | ||
|
|
ac2702afd3 | ||
|
|
88fd46d795 | ||
|
|
2d6763882e | ||
|
|
c0c23cde72 | ||
|
|
942bc9544b | ||
|
|
02b7cdb305 | ||
|
|
7d7d1f354b | ||
|
|
16c200d6d9 | ||
|
|
3dbd34aa78 | ||
|
|
fa3fc73c1b | ||
|
|
ac5815b594 | ||
|
|
30583cb626 | ||
|
|
c1a51416db | ||
|
|
8eab7009c1 | ||
|
|
11cf16e3f3 | ||
|
|
af6f63617e | ||
|
|
e287f36a05 | ||
|
|
cbcd4058ed | ||
|
|
e86fef05dd |
7
.github/actionlint.yml
vendored
7
.github/actionlint.yml
vendored
@@ -7,6 +7,13 @@ self-hosted-runner:
|
||||
- small-arm64
|
||||
- us-east-2
|
||||
config-variables:
|
||||
- AZURE_DEV_CLIENT_ID
|
||||
- AZURE_DEV_REGISTRY_NAME
|
||||
- AZURE_DEV_SUBSCRIPTION_ID
|
||||
- AZURE_PROD_CLIENT_ID
|
||||
- AZURE_PROD_REGISTRY_NAME
|
||||
- AZURE_PROD_SUBSCRIPTION_ID
|
||||
- AZURE_TENANT_ID
|
||||
- BENCHMARK_PROJECT_ID_PUB
|
||||
- BENCHMARK_PROJECT_ID_SUB
|
||||
- REMOTE_STORAGE_AZURE_CONTAINER
|
||||
|
||||
56
.github/workflows/_push-to-acr.yml
vendored
Normal file
56
.github/workflows/_push-to-acr.yml
vendored
Normal file
@@ -0,0 +1,56 @@
|
||||
name: Push images to ACR
|
||||
on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
client_id:
|
||||
description: Client ID of Azure managed identity or Entra app
|
||||
required: true
|
||||
type: string
|
||||
image_tag:
|
||||
description: Tag for the container image
|
||||
required: true
|
||||
type: string
|
||||
images:
|
||||
description: Images to push
|
||||
required: true
|
||||
type: string
|
||||
registry_name:
|
||||
description: Name of the container registry
|
||||
required: true
|
||||
type: string
|
||||
subscription_id:
|
||||
description: Azure subscription ID
|
||||
required: true
|
||||
type: string
|
||||
tenant_id:
|
||||
description: Azure tenant ID
|
||||
required: true
|
||||
type: string
|
||||
|
||||
jobs:
|
||||
push-to-acr:
|
||||
runs-on: ubuntu-22.04
|
||||
permissions:
|
||||
contents: read # This is required for actions/checkout
|
||||
id-token: write # This is required for Azure Login to work.
|
||||
|
||||
steps:
|
||||
- name: Azure login
|
||||
uses: azure/login@6c251865b4e6290e7b78be643ea2d005bc51f69a # @v2.1.1
|
||||
with:
|
||||
client-id: ${{ inputs.client_id }}
|
||||
subscription-id: ${{ inputs.subscription_id }}
|
||||
tenant-id: ${{ inputs.tenant_id }}
|
||||
|
||||
- name: Login to ACR
|
||||
run: |
|
||||
az acr login --name=${{ inputs.registry_name }}
|
||||
|
||||
- name: Copy docker images to ACR ${{ inputs.registry_name }}
|
||||
run: |
|
||||
images='${{ inputs.images }}'
|
||||
for image in ${images}; do
|
||||
docker buildx imagetools create \
|
||||
-t ${{ inputs.registry_name }}.azurecr.io/neondatabase/${image}:${{ inputs.image_tag }} \
|
||||
neondatabase/${image}:${{ inputs.image_tag }}
|
||||
done
|
||||
53
.github/workflows/build_and_test.yml
vendored
53
.github/workflows/build_and_test.yml
vendored
@@ -794,9 +794,6 @@ jobs:
|
||||
docker compose -f ./docker-compose/docker-compose.yml down
|
||||
|
||||
promote-images:
|
||||
permissions:
|
||||
contents: read # This is required for actions/checkout
|
||||
id-token: write # This is required for Azure Login to work.
|
||||
needs: [ check-permissions, tag, test-images, vm-compute-node-image ]
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
@@ -823,28 +820,6 @@ jobs:
|
||||
neondatabase/vm-compute-node-${version}:${{ needs.tag.outputs.build-tag }}
|
||||
done
|
||||
|
||||
- name: Azure login
|
||||
if: github.ref_name == 'main'
|
||||
uses: azure/login@6c251865b4e6290e7b78be643ea2d005bc51f69a # @v2.1.1
|
||||
with:
|
||||
client-id: ${{ secrets.AZURE_DEV_CLIENT_ID }}
|
||||
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
|
||||
subscription-id: ${{ secrets.AZURE_DEV_SUBSCRIPTION_ID }}
|
||||
|
||||
- name: Login to ACR
|
||||
if: github.ref_name == 'main'
|
||||
run: |
|
||||
az acr login --name=neoneastus2
|
||||
|
||||
- name: Copy docker images to ACR-dev
|
||||
if: github.ref_name == 'main'
|
||||
run: |
|
||||
for image in neon compute-tools {vm-,}compute-node-{v14,v15,v16}; do
|
||||
docker buildx imagetools create \
|
||||
-t neoneastus2.azurecr.io/neondatabase/${image}:${{ needs.tag.outputs.build-tag }} \
|
||||
neondatabase/${image}:${{ needs.tag.outputs.build-tag }}
|
||||
done
|
||||
|
||||
- name: Add latest tag to images
|
||||
if: github.ref_name == 'main'
|
||||
run: |
|
||||
@@ -882,6 +857,30 @@ jobs:
|
||||
369495373322.dkr.ecr.eu-central-1.amazonaws.com/${image}:${{ needs.tag.outputs.build-tag }}
|
||||
done
|
||||
|
||||
push-to-acr-dev:
|
||||
if: github.ref_name == 'main'
|
||||
needs: [ tag, promote-images ]
|
||||
uses: ./.github/workflows/_push-to-acr.yml
|
||||
with:
|
||||
client_id: ${{ vars.AZURE_DEV_CLIENT_ID }}
|
||||
image_tag: ${{ needs.tag.outputs.build-tag }}
|
||||
images: neon compute-tools vm-compute-node-v14 vm-compute-node-v15 vm-compute-node-v16 compute-node-v14 compute-node-v15 compute-node-v16
|
||||
registry_name: ${{ vars.AZURE_DEV_REGISTRY_NAME }}
|
||||
subscription_id: ${{ vars.AZURE_DEV_SUBSCRIPTION_ID }}
|
||||
tenant_id: ${{ vars.AZURE_TENANT_ID }}
|
||||
|
||||
push-to-acr-prod:
|
||||
if: github.ref_name == 'release'|| github.ref_name == 'release-proxy'
|
||||
needs: [ tag, promote-images ]
|
||||
uses: ./.github/workflows/_push-to-acr.yml
|
||||
with:
|
||||
client_id: ${{ vars.AZURE_PROD_CLIENT_ID }}
|
||||
image_tag: ${{ needs.tag.outputs.build-tag }}
|
||||
images: neon compute-tools vm-compute-node-v14 vm-compute-node-v15 vm-compute-node-v16 compute-node-v14 compute-node-v15 compute-node-v16
|
||||
registry_name: ${{ vars.AZURE_PROD_REGISTRY_NAME }}
|
||||
subscription_id: ${{ vars.AZURE_PROD_SUBSCRIPTION_ID }}
|
||||
tenant_id: ${{ vars.AZURE_TENANT_ID }}
|
||||
|
||||
trigger-custom-extensions-build-and-wait:
|
||||
needs: [ check-permissions, tag ]
|
||||
runs-on: ubuntu-22.04
|
||||
@@ -957,8 +956,8 @@ jobs:
|
||||
exit 1
|
||||
|
||||
deploy:
|
||||
needs: [ check-permissions, promote-images, tag, build-and-test-locally, trigger-custom-extensions-build-and-wait ]
|
||||
if: github.ref_name == 'main' || github.ref_name == 'release'|| github.ref_name == 'release-proxy'
|
||||
needs: [ check-permissions, promote-images, tag, build-and-test-locally, trigger-custom-extensions-build-and-wait, push-to-acr-dev, push-to-acr-prod ]
|
||||
if: (github.ref_name == 'main' || github.ref_name == 'release' || github.ref_name == 'release-proxy') && !failure() && !cancelled()
|
||||
|
||||
runs-on: [ self-hosted, small ]
|
||||
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
|
||||
|
||||
34
.github/workflows/label-for-external-users.yml
vendored
34
.github/workflows/label-for-external-users.yml
vendored
@@ -7,6 +7,11 @@ on:
|
||||
pull_request_target:
|
||||
types:
|
||||
- opened
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
github-actor:
|
||||
description: 'GitHub username. If empty, the username of the current user will be used'
|
||||
required: false
|
||||
|
||||
# No permission for GITHUB_TOKEN by default; the **minimal required** set of permissions should be granted in each job.
|
||||
permissions: {}
|
||||
@@ -26,12 +31,31 @@ jobs:
|
||||
id: check-user
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
|
||||
ACTOR: ${{ inputs.github-actor || github.actor }}
|
||||
run: |
|
||||
if gh api -H "Accept: application/vnd.github+json" -H "X-GitHub-Api-Version: 2022-11-28" "/orgs/${GITHUB_REPOSITORY_OWNER}/members/${GITHUB_ACTOR}"; then
|
||||
is_member=true
|
||||
else
|
||||
is_member=false
|
||||
fi
|
||||
expected_error="User does not exist or is not a member of the organization"
|
||||
output_file=output.txt
|
||||
|
||||
for i in $(seq 1 10); do
|
||||
if gh api "/orgs/${GITHUB_REPOSITORY_OWNER}/members/${ACTOR}" \
|
||||
-H "Accept: application/vnd.github+json" \
|
||||
-H "X-GitHub-Api-Version: 2022-11-28" > ${output_file}; then
|
||||
|
||||
is_member=true
|
||||
break
|
||||
elif grep -q "${expected_error}" ${output_file}; then
|
||||
is_member=false
|
||||
break
|
||||
elif [ $i -eq 10 ]; then
|
||||
title="Failed to get memmbership status for ${ACTOR}"
|
||||
message="The latest GitHub API error message: '$(cat ${output_file})'"
|
||||
echo "::error file=.github/workflows/label-for-external-users.yml,title=${title}::${message}"
|
||||
|
||||
exit 1
|
||||
fi
|
||||
|
||||
sleep 1
|
||||
done
|
||||
|
||||
echo "is-member=${is_member}" | tee -a ${GITHUB_OUTPUT}
|
||||
|
||||
|
||||
@@ -207,7 +207,7 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux
|
||||
export PATH="$HOME/.cargo/bin:$PATH" && \
|
||||
. "$HOME/.cargo/env" && \
|
||||
cargo --version && rustup --version && \
|
||||
rustup component add llvm-tools-preview rustfmt clippy && \
|
||||
rustup component add llvm-tools rustfmt clippy && \
|
||||
cargo install rustfilt --version ${RUSTFILT_VERSION} && \
|
||||
cargo install cargo-hakari --version ${CARGO_HAKARI_VERSION} && \
|
||||
cargo install cargo-deny --locked --version ${CARGO_DENY_VERSION} && \
|
||||
|
||||
@@ -22,9 +22,10 @@ use compute_api::spec::{Database, GenericOption, GenericOptions, PgIdent, Role};
|
||||
|
||||
const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
|
||||
|
||||
/// Escape a string for including it in a SQL literal. Wrapping the result
|
||||
/// with `E'{}'` or `'{}'` is not required, as it returns a ready-to-use
|
||||
/// SQL string literal, e.g. `'db'''` or `E'db\\'`.
|
||||
/// Escape a string for including it in a SQL literal.
|
||||
///
|
||||
/// Wrapping the result with `E'{}'` or `'{}'` is not required,
|
||||
/// as it returns a ready-to-use SQL string literal, e.g. `'db'''` or `E'db\\'`.
|
||||
/// See <https://github.com/postgres/postgres/blob/da98d005cdbcd45af563d0c4ac86d0e9772cd15f/src/backend/utils/adt/quote.c#L47>
|
||||
/// for the original implementation.
|
||||
pub fn escape_literal(s: &str) -> String {
|
||||
|
||||
@@ -4,8 +4,8 @@ use std::{str::FromStr, time::Duration};
|
||||
use clap::{Parser, Subcommand};
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
NodeAvailabilityWrapper, NodeDescribeResponse, ShardSchedulingPolicy, TenantCreateRequest,
|
||||
TenantDescribeResponse, TenantPolicyRequest,
|
||||
NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse, ShardSchedulingPolicy,
|
||||
TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
|
||||
},
|
||||
models::{
|
||||
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
|
||||
@@ -80,7 +80,10 @@ enum Command {
|
||||
/// List nodes known to the storage controller
|
||||
Nodes {},
|
||||
/// List tenants known to the storage controller
|
||||
Tenants {},
|
||||
Tenants {
|
||||
/// If this field is set, it will list the tenants on a specific node
|
||||
node_id: Option<NodeId>,
|
||||
},
|
||||
/// Create a new tenant in the storage controller, and by extension on pageservers.
|
||||
TenantCreate {
|
||||
#[arg(long)]
|
||||
@@ -403,7 +406,41 @@ async fn main() -> anyhow::Result<()> {
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Command::Tenants {} => {
|
||||
Command::Tenants {
|
||||
node_id: Some(node_id),
|
||||
} => {
|
||||
let describe_response = storcon_client
|
||||
.dispatch::<(), NodeShardResponse>(
|
||||
Method::GET,
|
||||
format!("control/v1/node/{node_id}/shards"),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
let shards = describe_response.shards;
|
||||
let mut table = comfy_table::Table::new();
|
||||
table.set_header([
|
||||
"Shard",
|
||||
"Intended Primary/Secondary",
|
||||
"Observed Primary/Secondary",
|
||||
]);
|
||||
for shard in shards {
|
||||
table.add_row([
|
||||
format!("{}", shard.tenant_shard_id),
|
||||
match shard.is_intended_secondary {
|
||||
None => "".to_string(),
|
||||
Some(true) => "Secondary".to_string(),
|
||||
Some(false) => "Primary".to_string(),
|
||||
},
|
||||
match shard.is_observed_secondary {
|
||||
None => "".to_string(),
|
||||
Some(true) => "Secondary".to_string(),
|
||||
Some(false) => "Primary".to_string(),
|
||||
},
|
||||
]);
|
||||
}
|
||||
println!("{table}");
|
||||
}
|
||||
Command::Tenants { node_id: None } => {
|
||||
let mut resp = storcon_client
|
||||
.dispatch::<(), Vec<TenantDescribeResponse>>(
|
||||
Method::GET,
|
||||
|
||||
@@ -68,6 +68,7 @@ macro_rules! register_uint_gauge {
|
||||
static INTERNAL_REGISTRY: Lazy<Registry> = Lazy::new(Registry::new);
|
||||
|
||||
/// Register a collector in the internal registry. MUST be called before the first call to `gather()`.
|
||||
///
|
||||
/// Otherwise, we can have a deadlock in the `gather()` call, trying to register a new collector
|
||||
/// while holding the lock.
|
||||
pub fn register_internal(c: Box<dyn Collector>) -> prometheus::Result<()> {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::collections::HashSet;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::str::FromStr;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
@@ -74,6 +74,17 @@ pub struct TenantPolicyRequest {
|
||||
pub scheduling: Option<ShardSchedulingPolicy>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct ShardsPreferredAzsRequest {
|
||||
#[serde(flatten)]
|
||||
pub preferred_az_ids: HashMap<TenantShardId, String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct ShardsPreferredAzsResponse {
|
||||
pub updated: Vec<TenantShardId>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct TenantLocateResponseShard {
|
||||
pub shard_id: TenantShardId,
|
||||
@@ -101,6 +112,21 @@ pub struct TenantDescribeResponse {
|
||||
pub config: TenantConfig,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct NodeShardResponse {
|
||||
pub node_id: NodeId,
|
||||
pub shards: Vec<NodeShard>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct NodeShard {
|
||||
pub tenant_shard_id: TenantShardId,
|
||||
/// Whether the shard is observed secondary on a specific node. True = yes, False = no, None = not on this node.
|
||||
pub is_observed_secondary: Option<bool>,
|
||||
/// Whether the shard is intended to be a secondary on a specific node. True = yes, False = no, None = not on this node.
|
||||
pub is_intended_secondary: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct NodeDescribeResponse {
|
||||
pub id: NodeId,
|
||||
@@ -132,8 +158,12 @@ pub struct TenantDescribeResponseShard {
|
||||
pub is_splitting: bool,
|
||||
|
||||
pub scheduling_policy: ShardSchedulingPolicy,
|
||||
|
||||
pub preferred_az_id: Option<String>,
|
||||
}
|
||||
|
||||
/// Migration request for a given tenant shard to a given node.
|
||||
///
|
||||
/// Explicitly migrating a particular shard is a low level operation
|
||||
/// TODO: higher level "Reschedule tenant" operation where the request
|
||||
/// specifies some constraints, e.g. asking it to get off particular node(s)
|
||||
|
||||
@@ -305,8 +305,10 @@ pub struct TenantConfig {
|
||||
pub lsn_lease_length_for_ts: Option<String>,
|
||||
}
|
||||
|
||||
/// The policy for the aux file storage. It can be switched through `switch_aux_file_policy`
|
||||
/// tenant config. When the first aux file written, the policy will be persisted in the
|
||||
/// The policy for the aux file storage.
|
||||
///
|
||||
/// It can be switched through `switch_aux_file_policy` tenant config.
|
||||
/// When the first aux file written, the policy will be persisted in the
|
||||
/// `index_part.json` file and has a limited migration path.
|
||||
///
|
||||
/// Currently, we only allow the following migration path:
|
||||
@@ -896,7 +898,9 @@ pub struct WalRedoManagerStatus {
|
||||
pub process: Option<WalRedoManagerProcessStatus>,
|
||||
}
|
||||
|
||||
/// The progress of a secondary tenant is mostly useful when doing a long running download: e.g. initiating
|
||||
/// The progress of a secondary tenant.
|
||||
///
|
||||
/// It is mostly useful when doing a long running download: e.g. initiating
|
||||
/// a download job, timing out while waiting for it to run, and then inspecting this status to understand
|
||||
/// what's happening.
|
||||
#[derive(Default, Debug, Serialize, Deserialize, Clone)]
|
||||
|
||||
@@ -69,8 +69,10 @@ impl QueryError {
|
||||
}
|
||||
|
||||
/// Returns true if the given error is a normal consequence of a network issue,
|
||||
/// or the client closing the connection. These errors can happen during normal
|
||||
/// operations, and don't indicate a bug in our code.
|
||||
/// or the client closing the connection.
|
||||
///
|
||||
/// These errors can happen during normal operations,
|
||||
/// and don't indicate a bug in our code.
|
||||
pub fn is_expected_io_error(e: &io::Error) -> bool {
|
||||
use io::ErrorKind::*;
|
||||
matches!(
|
||||
|
||||
@@ -7,6 +7,7 @@ use std::fmt;
|
||||
use url::Host;
|
||||
|
||||
/// Parses a string of format either `host:port` or `host` into a corresponding pair.
|
||||
///
|
||||
/// The `host` part should be a correct `url::Host`, while `port` (if present) should be
|
||||
/// a valid decimal u16 of digits only.
|
||||
pub fn parse_host_port<S: AsRef<str>>(host_port: S) -> Result<(Host, Option<u16>), anyhow::Error> {
|
||||
|
||||
@@ -45,6 +45,8 @@ pub use azure_core::Etag;
|
||||
|
||||
pub use error::{DownloadError, TimeTravelError, TimeoutOrCancel};
|
||||
|
||||
/// Default concurrency limit for S3 operations
|
||||
///
|
||||
/// Currently, sync happens with AWS S3, that has two limits on requests per second:
|
||||
/// ~200 RPS for IAM services
|
||||
/// <https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html>
|
||||
@@ -300,7 +302,9 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
) -> Result<(), TimeTravelError>;
|
||||
}
|
||||
|
||||
/// DownloadStream is sensitive to the timeout and cancellation used with the original
|
||||
/// Data part of an ongoing [`Download`].
|
||||
///
|
||||
/// `DownloadStream` is sensitive to the timeout and cancellation used with the original
|
||||
/// [`RemoteStorage::download`] request. The type yields `std::io::Result<Bytes>` to be compatible
|
||||
/// with `tokio::io::copy_buf`.
|
||||
// This has 'static because safekeepers do not use cancellation tokens (yet)
|
||||
|
||||
@@ -60,3 +60,16 @@ pub struct TimelineCopyRequest {
|
||||
pub target_timeline_id: TimelineId,
|
||||
pub until_lsn: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct TimelineTermBumpRequest {
|
||||
/// bump to
|
||||
pub term: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct TimelineTermBumpResponse {
|
||||
// before the request
|
||||
pub previous_term: u64,
|
||||
pub current_term: u64,
|
||||
}
|
||||
|
||||
@@ -5,9 +5,10 @@
|
||||
mod calculation;
|
||||
pub mod svg;
|
||||
|
||||
/// StorageModel is the input to the synthetic size calculation. It represents
|
||||
/// a tree of timelines, with just the information that's needed for the
|
||||
/// calculation. This doesn't track timeline names or where each timeline
|
||||
/// StorageModel is the input to the synthetic size calculation.
|
||||
///
|
||||
/// It represents a tree of timelines, with just the information that's needed
|
||||
/// for the calculation. This doesn't track timeline names or where each timeline
|
||||
/// begins and ends, for example. Instead, it consists of "points of interest"
|
||||
/// on the timelines. A point of interest could be the timeline start or end point,
|
||||
/// the oldest point on a timeline that needs to be retained because of PITR
|
||||
|
||||
@@ -5,8 +5,10 @@ use std::{
|
||||
|
||||
use metrics::IntCounter;
|
||||
|
||||
/// Circuit breakers are for operations that are expensive and fallible: if they fail repeatedly,
|
||||
/// we will stop attempting them for some period of time, to avoid denial-of-service from retries, and
|
||||
/// Circuit breakers are for operations that are expensive and fallible.
|
||||
///
|
||||
/// If a circuit breaker fails repeatedly, we will stop attempting it for some
|
||||
/// period of time, to avoid denial-of-service from retries, and
|
||||
/// to mitigate the log spam from repeated failures.
|
||||
pub struct CircuitBreaker {
|
||||
/// An identifier that enables us to log useful errors when a circuit is broken
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::os::fd::AsRawFd;
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
fs::{self, File},
|
||||
@@ -203,6 +204,27 @@ pub fn overwrite(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Syncs the filesystem for the given file descriptor.
|
||||
#[cfg_attr(target_os = "macos", allow(unused_variables))]
|
||||
pub fn syncfs(fd: impl AsRawFd) -> anyhow::Result<()> {
|
||||
// Linux guarantees durability for syncfs.
|
||||
// POSIX doesn't have syncfs, and further does not actually guarantee durability of sync().
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
use anyhow::Context;
|
||||
nix::unistd::syncfs(fd.as_raw_fd()).context("syncfs")?;
|
||||
}
|
||||
#[cfg(target_os = "macos")]
|
||||
{
|
||||
// macOS is not a production platform for Neon, don't even bother.
|
||||
}
|
||||
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
|
||||
{
|
||||
compile_error!("Unsupported OS");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
|
||||
@@ -249,8 +249,10 @@ macro_rules! id_newtype {
|
||||
};
|
||||
}
|
||||
|
||||
/// Neon timeline IDs are different from PostgreSQL timeline
|
||||
/// IDs. They serve a similar purpose though: they differentiate
|
||||
/// Neon timeline ID.
|
||||
///
|
||||
/// They are different from PostgreSQL timeline
|
||||
/// IDs, but serve a similar purpose: they differentiate
|
||||
/// between different "histories" of the same cluster. However,
|
||||
/// PostgreSQL timeline IDs are a bit cumbersome, because they are only
|
||||
/// 32-bits wide, and they must be in ascending order in any given
|
||||
|
||||
@@ -100,7 +100,9 @@ pub enum LockFileRead {
|
||||
}
|
||||
|
||||
/// Open & try to lock the lock file at the given `path`, returning a [handle][`LockFileRead`] to
|
||||
/// inspect its content. It is not an `Err(...)` if the file does not exist or is already locked.
|
||||
/// inspect its content.
|
||||
///
|
||||
/// It is not an `Err(...)` if the file does not exist or is already locked.
|
||||
/// Check the [`LockFileRead`] variants for details.
|
||||
pub fn read_and_hold_lock_file(path: &Utf8Path) -> anyhow::Result<LockFileRead> {
|
||||
let res = fs::OpenOptions::new().read(true).open(path);
|
||||
|
||||
@@ -190,7 +190,7 @@ impl Drop for TracingPanicHookGuard {
|
||||
}
|
||||
|
||||
/// Named symbol for our panic hook, which logs the panic.
|
||||
fn tracing_panic_hook(info: &std::panic::PanicInfo) {
|
||||
fn tracing_panic_hook(info: &std::panic::PanicHookInfo) {
|
||||
// following rust 1.66.1 std implementation:
|
||||
// https://github.com/rust-lang/rust/blob/90743e7298aca107ddaa0c202a4d3604e29bfeb6/library/std/src/panicking.rs#L235-L288
|
||||
let location = info.location();
|
||||
|
||||
@@ -8,6 +8,7 @@ use tracing::{trace, warn};
|
||||
use crate::lsn::Lsn;
|
||||
|
||||
/// Feedback pageserver sends to safekeeper and safekeeper resends to compute.
|
||||
///
|
||||
/// Serialized in custom flexible key/value format. In replication protocol, it
|
||||
/// is marked with NEON_STATUS_UPDATE_TAG_BYTE to differentiate from postgres
|
||||
/// Standby status update / Hot standby feedback messages.
|
||||
|
||||
@@ -65,6 +65,8 @@ impl<T> Poison<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Armed pointer to a [`Poison`].
|
||||
///
|
||||
/// Use [`Self::data`] and [`Self::data_mut`] to access the wrapped state.
|
||||
/// Once modifications are done, use [`Self::disarm`].
|
||||
/// If [`Guard`] gets dropped instead of calling [`Self::disarm`], the state is poisoned
|
||||
|
||||
@@ -13,10 +13,11 @@ pub struct ShardNumber(pub u8);
|
||||
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
|
||||
pub struct ShardCount(pub u8);
|
||||
|
||||
/// Combination of ShardNumber and ShardCount. For use within the context of a particular tenant,
|
||||
/// when we need to know which shard we're dealing with, but do not need to know the full
|
||||
/// ShardIdentity (because we won't be doing any page->shard mapping), and do not need to know
|
||||
/// the fully qualified TenantShardId.
|
||||
/// Combination of ShardNumber and ShardCount.
|
||||
///
|
||||
/// For use within the context of a particular tenant, when we need to know which shard we're
|
||||
/// dealing with, but do not need to know the full ShardIdentity (because we won't be doing
|
||||
/// any page->shard mapping), and do not need to know the fully qualified TenantShardId.
|
||||
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
|
||||
pub struct ShardIndex {
|
||||
pub shard_number: ShardNumber,
|
||||
|
||||
@@ -49,12 +49,11 @@ use std::sync::{RwLock, RwLockWriteGuard};
|
||||
|
||||
use tokio::sync::watch;
|
||||
|
||||
///
|
||||
/// Rcu allows multiple readers to read and hold onto a value without blocking
|
||||
/// (for very long). Storing to the Rcu updates the value, making new readers
|
||||
/// immediately see the new value, but it also waits for all current readers to
|
||||
/// finish.
|
||||
/// (for very long).
|
||||
///
|
||||
/// Storing to the Rcu updates the value, making new readers immediately see
|
||||
/// the new value, but it also waits for all current readers to finish.
|
||||
pub struct Rcu<V> {
|
||||
inner: RwLock<RcuInner<V>>,
|
||||
}
|
||||
|
||||
@@ -5,7 +5,9 @@ use std::sync::{
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
/// Custom design like [`tokio::sync::OnceCell`] but using [`OwnedSemaphorePermit`] instead of
|
||||
/// `SemaphorePermit`, allowing use of `take` which does not require holding an outer mutex guard
|
||||
/// `SemaphorePermit`.
|
||||
///
|
||||
/// Allows use of `take` which does not require holding an outer mutex guard
|
||||
/// for the duration of initialization.
|
||||
///
|
||||
/// Has no unsafe, builds upon [`tokio::sync::Semaphore`] and [`std::sync::Mutex`].
|
||||
|
||||
@@ -7,6 +7,7 @@ pub enum VecMapOrdering {
|
||||
}
|
||||
|
||||
/// Ordered map datastructure implemented in a Vec.
|
||||
///
|
||||
/// Append only - can only add keys that are larger than the
|
||||
/// current max key.
|
||||
/// Ordering can be adjusted using [`VecMapOrdering`]
|
||||
|
||||
@@ -6,9 +6,10 @@ pub enum YieldingLoopError {
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
/// Helper for long synchronous loops, e.g. over all tenants in the system. Periodically
|
||||
/// yields to avoid blocking the executor, and after resuming checks the provided
|
||||
/// cancellation token to drop out promptly on shutdown.
|
||||
/// Helper for long synchronous loops, e.g. over all tenants in the system.
|
||||
///
|
||||
/// Periodically yields to avoid blocking the executor, and after resuming
|
||||
/// checks the provided cancellation token to drop out promptly on shutdown.
|
||||
#[inline(always)]
|
||||
pub async fn yielding_loop<I, T, F>(
|
||||
interval: usize,
|
||||
|
||||
@@ -1,2 +1,20 @@
|
||||
pub mod mgmt_api;
|
||||
pub mod page_service;
|
||||
|
||||
/// For timeline_block_unblock_gc, distinguish the two different operations. This could be a bool.
|
||||
// If file structure is per-kind not per-feature then where to put this?
|
||||
#[derive(Clone, Copy)]
|
||||
pub enum BlockUnblock {
|
||||
Block,
|
||||
Unblock,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for BlockUnblock {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let s = match self {
|
||||
BlockUnblock::Block => "block",
|
||||
BlockUnblock::Unblock => "unblock",
|
||||
};
|
||||
f.write_str(s)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,8 @@ use utils::{
|
||||
|
||||
pub use reqwest::Body as ReqwestBody;
|
||||
|
||||
use crate::BlockUnblock;
|
||||
|
||||
pub mod util;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -454,6 +456,20 @@ impl Client {
|
||||
.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn timeline_block_unblock_gc(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
dir: BlockUnblock,
|
||||
) -> Result<()> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/{dir}_gc",
|
||||
self.mgmt_api_endpoint,
|
||||
);
|
||||
|
||||
self.request(Method::POST, &uri, ()).await.map(|_| ())
|
||||
}
|
||||
|
||||
pub async fn tenant_reset(&self, tenant_shard_id: TenantShardId) -> Result<()> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{}/reset",
|
||||
|
||||
@@ -142,11 +142,16 @@ impl PagestreamClient {
|
||||
) -> anyhow::Result<PagestreamGetPageResponse> {
|
||||
let req = PagestreamFeMessage::GetPage(req);
|
||||
let req: bytes::Bytes = req.serialize();
|
||||
// let mut req = tokio_util::io::ReaderStream::new(&req);
|
||||
let mut req = tokio_stream::once(Ok(req));
|
||||
|
||||
self.copy_both.send_all(&mut req).await?;
|
||||
for i in 0..10 {
|
||||
let mut req = tokio_stream::once(Ok(req.clone()));
|
||||
self.copy_both.send_all(&mut req).await?;
|
||||
}
|
||||
|
||||
for i in 0..9 {
|
||||
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
|
||||
let next: bytes::Bytes = next.unwrap()?;
|
||||
}
|
||||
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
|
||||
let next: bytes::Bytes = next.unwrap()?;
|
||||
|
||||
|
||||
@@ -37,6 +37,7 @@ use pageserver::{
|
||||
virtual_file,
|
||||
};
|
||||
use postgres_backend::AuthType;
|
||||
use utils::crashsafe::syncfs;
|
||||
use utils::failpoint_support;
|
||||
use utils::logging::TracingErrorLayerEnablement;
|
||||
use utils::{
|
||||
@@ -155,23 +156,7 @@ fn main() -> anyhow::Result<()> {
|
||||
};
|
||||
|
||||
let started = Instant::now();
|
||||
// Linux guarantees durability for syncfs.
|
||||
// POSIX doesn't have syncfs, and further does not actually guarantee durability of sync().
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
use std::os::fd::AsRawFd;
|
||||
nix::unistd::syncfs(dirfd.as_raw_fd()).context("syncfs")?;
|
||||
}
|
||||
#[cfg(target_os = "macos")]
|
||||
{
|
||||
// macOS is not a production platform for Neon, don't even bother.
|
||||
drop(dirfd);
|
||||
}
|
||||
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
|
||||
{
|
||||
compile_error!("Unsupported OS");
|
||||
}
|
||||
|
||||
syncfs(dirfd)?;
|
||||
let elapsed = started.elapsed();
|
||||
info!(
|
||||
elapsed_ms = elapsed.as_millis(),
|
||||
|
||||
@@ -180,6 +180,8 @@ pub struct PageServerConf {
|
||||
pub io_buffer_alignment: usize,
|
||||
}
|
||||
|
||||
/// Token for authentication to safekeepers
|
||||
///
|
||||
/// We do not want to store this in a PageServerConf because the latter may be logged
|
||||
/// and/or serialized at a whim, while the token is secret. Currently this token is the
|
||||
/// same for accessing all tenants/timelines, but may become per-tenant/per-timeline in
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
//! This module defines `RequestContext`, a structure that we use throughout
|
||||
//! the pageserver to propagate high-level context from places
|
||||
//! that _originate_ activity down to the shared code paths at the
|
||||
//! heart of the pageserver. It's inspired by Golang's `context.Context`.
|
||||
//! Defines [`RequestContext`].
|
||||
//!
|
||||
//! It is a structure that we use throughout the pageserver to propagate
|
||||
//! high-level context from places that _originate_ activity down to the
|
||||
//! shared code paths at the heart of the pageserver. It's inspired by
|
||||
//! Golang's `context.Context`.
|
||||
//!
|
||||
//! For example, in `Timeline::get(page_nr, lsn)` we need to answer the following questions:
|
||||
//! 1. What high-level activity ([`TaskKind`]) needs this page?
|
||||
|
||||
@@ -1185,6 +1185,7 @@ struct GlobalAndPerTimelineHistogramTimer<'a, 'c> {
|
||||
ctx: &'c RequestContext,
|
||||
start: std::time::Instant,
|
||||
op: SmgrQueryType,
|
||||
count: usize,
|
||||
}
|
||||
|
||||
impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> {
|
||||
@@ -1212,9 +1213,11 @@ impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> {
|
||||
elapsed
|
||||
}
|
||||
};
|
||||
self.global_metric.observe(ex_throttled.as_secs_f64());
|
||||
if let Some(timeline_metric) = self.timeline_metric {
|
||||
timeline_metric.observe(ex_throttled.as_secs_f64());
|
||||
for _ in 0..self.count {
|
||||
self.global_metric.observe(ex_throttled.as_secs_f64());
|
||||
if let Some(timeline_metric) = self.timeline_metric {
|
||||
timeline_metric.observe(ex_throttled.as_secs_f64());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1343,6 +1346,14 @@ impl SmgrQueryTimePerTimeline {
|
||||
&'a self,
|
||||
op: SmgrQueryType,
|
||||
ctx: &'c RequestContext,
|
||||
) -> Option<impl Drop + '_> {
|
||||
self.start_timer_many(op, 1, ctx)
|
||||
}
|
||||
pub(crate) fn start_timer_many<'c: 'a, 'a>(
|
||||
&'a self,
|
||||
op: SmgrQueryType,
|
||||
count: usize,
|
||||
ctx: &'c RequestContext,
|
||||
) -> Option<impl Drop + '_> {
|
||||
let global_metric = &self.global_metrics[op as usize];
|
||||
let start = Instant::now();
|
||||
@@ -1376,6 +1387,7 @@ impl SmgrQueryTimePerTimeline {
|
||||
ctx,
|
||||
start,
|
||||
op,
|
||||
count,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -3170,6 +3182,16 @@ static TOKIO_EXECUTOR_THREAD_COUNT: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub(crate) static CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM: Lazy<Histogram> =
|
||||
Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_consecutive_nonblocking_getpage_requests",
|
||||
"Number of consecutive nonblocking getpage requests",
|
||||
(0..=256).map(|x| x as f64).collect::<Vec<f64>>(),
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub(crate) fn set_tokio_runtime_setup(setup: &str, num_threads: NonZeroUsize) {
|
||||
static SERIALIZE: std::sync::Mutex<()> = std::sync::Mutex::new(());
|
||||
let _guard = SERIALIZE.lock().unwrap();
|
||||
|
||||
@@ -5,14 +5,14 @@ use anyhow::Context;
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use bytes::Buf;
|
||||
use futures::FutureExt;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::models::TenantState;
|
||||
use once_cell::sync::{Lazy, OnceCell};
|
||||
use pageserver_api::models::{self, TenantState};
|
||||
use pageserver_api::models::{
|
||||
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
|
||||
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
|
||||
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
|
||||
PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest,
|
||||
PagestreamNblocksResponse, PagestreamProtocolVersion,
|
||||
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
|
||||
PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
|
||||
PagestreamProtocolVersion,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError};
|
||||
@@ -43,7 +43,7 @@ use crate::basebackup;
|
||||
use crate::basebackup::BasebackupError;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::metrics;
|
||||
use crate::metrics::{self, CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM};
|
||||
use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
|
||||
use crate::pgdatadir_mapping::Version;
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
@@ -58,7 +58,7 @@ use crate::tenant::GetTimelineError;
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::tenant::Timeline;
|
||||
use pageserver_api::key::rel_block_to_key;
|
||||
use pageserver_api::reltag::SlruKind;
|
||||
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
|
||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
|
||||
@@ -577,124 +577,317 @@ impl PageServerHandler {
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
// read request bytes (it's exactly 1 PagestreamFeMessage per CopyData)
|
||||
let msg = tokio::select! {
|
||||
biased;
|
||||
_ = self.cancel.cancelled() => {
|
||||
return Err(QueryError::Shutdown)
|
||||
}
|
||||
msg = pgb.read_message() => { msg }
|
||||
};
|
||||
let copy_data_bytes = match msg? {
|
||||
Some(FeMessage::CopyData(bytes)) => bytes,
|
||||
Some(FeMessage::Terminate) => break,
|
||||
Some(m) => {
|
||||
return Err(QueryError::Other(anyhow::anyhow!(
|
||||
"unexpected message: {m:?} during COPY"
|
||||
)));
|
||||
}
|
||||
None => break, // client disconnected
|
||||
};
|
||||
let mut batched = None;
|
||||
'outer: loop {
|
||||
enum DebouncedFeMessage {
|
||||
Exists(models::PagestreamExistsRequest),
|
||||
Nblocks(models::PagestreamNblocksRequest),
|
||||
GetPage {
|
||||
span: Span,
|
||||
shard: timeline::handle::Handle<TenantManagerTypes>,
|
||||
effective_request_lsn: Lsn,
|
||||
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
|
||||
},
|
||||
DbSize(models::PagestreamDbSizeRequest),
|
||||
GetSlruSegment(models::PagestreamGetSlruSegmentRequest),
|
||||
RespondError(Span, PageStreamError),
|
||||
}
|
||||
let mut debounce: Option<std::time::Instant> = None;
|
||||
// return or `?` on protocol error
|
||||
// `break EXPR` to stop batching. The EXPR will be the first message in the next batch.
|
||||
let next_batched: Option<DebouncedFeMessage> = loop {
|
||||
static BOUNCE_TIMEOUT: Lazy<Duration> = Lazy::new(|| {
|
||||
utils::env::var::<humantime::Duration, _>("NEON_PAGESERVER_DEBOUNCE")
|
||||
.unwrap()
|
||||
.into()
|
||||
});
|
||||
let sleep_fut = if let Some(started_at) = debounce {
|
||||
futures::future::Either::Left(tokio::time::sleep_until(
|
||||
(started_at + *BOUNCE_TIMEOUT).into(),
|
||||
))
|
||||
} else {
|
||||
futures::future::Either::Right(futures::future::pending())
|
||||
};
|
||||
let msg = tokio::select! {
|
||||
biased;
|
||||
_ = self.cancel.cancelled() => {
|
||||
return Err(QueryError::Shutdown)
|
||||
}
|
||||
msg = pgb.read_message() => {
|
||||
msg
|
||||
}
|
||||
_ = sleep_fut => {
|
||||
assert!(batched.is_some());
|
||||
break None;
|
||||
}
|
||||
};
|
||||
let copy_data_bytes = match msg? {
|
||||
Some(FeMessage::CopyData(bytes)) => bytes,
|
||||
Some(FeMessage::Terminate) => break 'outer,
|
||||
Some(m) => {
|
||||
return Err(QueryError::Other(anyhow::anyhow!(
|
||||
"unexpected message: {m:?} during COPY"
|
||||
)));
|
||||
}
|
||||
None => break 'outer, // client disconnected
|
||||
};
|
||||
trace!("query: {copy_data_bytes:?}");
|
||||
fail::fail_point!("ps::handle-pagerequest-message");
|
||||
|
||||
trace!("query: {copy_data_bytes:?}");
|
||||
fail::fail_point!("ps::handle-pagerequest-message");
|
||||
// parse request
|
||||
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
|
||||
|
||||
// parse request
|
||||
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
|
||||
let this_msg = match neon_fe_msg {
|
||||
PagestreamFeMessage::Exists(msg) => DebouncedFeMessage::Exists(msg),
|
||||
PagestreamFeMessage::Nblocks(msg) => DebouncedFeMessage::Nblocks(msg),
|
||||
PagestreamFeMessage::DbSize(msg) => DebouncedFeMessage::DbSize(msg),
|
||||
PagestreamFeMessage::GetSlruSegment(msg) => {
|
||||
DebouncedFeMessage::GetSlruSegment(msg)
|
||||
}
|
||||
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
rel,
|
||||
blkno,
|
||||
}) => {
|
||||
let span = tracing::info_span!("handle_get_page_at_lsn_request_batched", %tenant_id, %timeline_id, shard_id = tracing::field::Empty, req_lsn = %request_lsn, batch_size = tracing::field::Empty);
|
||||
let key = rel_block_to_key(rel, blkno);
|
||||
let shard = match self
|
||||
.timeline_handles
|
||||
.get(tenant_id, timeline_id, ShardSelector::Page(key))
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
{
|
||||
Ok(tl) => tl,
|
||||
Err(GetActiveTimelineError::Tenant(
|
||||
GetActiveTenantError::NotFound(_),
|
||||
)) => {
|
||||
// We already know this tenant exists in general, because we resolved it at
|
||||
// start of connection. Getting a NotFound here indicates that the shard containing
|
||||
// the requested page is not present on this node: the client's knowledge of shard->pageserver
|
||||
// mapping is out of date.
|
||||
//
|
||||
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
|
||||
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
|
||||
// and talk to a different pageserver.
|
||||
break Some(DebouncedFeMessage::RespondError(
|
||||
span,
|
||||
PageStreamError::Reconnect(
|
||||
"getpage@lsn request routed to wrong shard".into(),
|
||||
),
|
||||
));
|
||||
}
|
||||
Err(e) => break Some(DebouncedFeMessage::RespondError(span, e.into())),
|
||||
};
|
||||
let effective_request_lsn = match Self::wait_or_get_last_lsn(
|
||||
&shard,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
&shard.get_latest_gc_cutoff_lsn(),
|
||||
&ctx,
|
||||
)
|
||||
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
|
||||
.await
|
||||
{
|
||||
Ok(lsn) => lsn,
|
||||
Err(e) => {
|
||||
break Some(DebouncedFeMessage::RespondError(span, e));
|
||||
}
|
||||
};
|
||||
DebouncedFeMessage::GetPage {
|
||||
span,
|
||||
shard,
|
||||
effective_request_lsn,
|
||||
pages: smallvec::smallvec![(rel, blkno)],
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// check if we can debounce
|
||||
match (&mut batched, this_msg) {
|
||||
(None, this_msg) => {
|
||||
batched = Some(this_msg);
|
||||
}
|
||||
(
|
||||
Some(DebouncedFeMessage::GetPage {
|
||||
span: _,
|
||||
shard: accum_shard,
|
||||
pages: accum_pages,
|
||||
effective_request_lsn: accum_lsn,
|
||||
}),
|
||||
DebouncedFeMessage::GetPage {
|
||||
span: _,
|
||||
shard: this_shard,
|
||||
pages: this_pages,
|
||||
effective_request_lsn: this_lsn,
|
||||
},
|
||||
) if async {
|
||||
assert_eq!(this_pages.len(), 1);
|
||||
if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize {
|
||||
assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize);
|
||||
return false;
|
||||
}
|
||||
if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
|
||||
!= (this_shard.tenant_shard_id, this_shard.timeline_id)
|
||||
{
|
||||
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
|
||||
// But the current logig for keeping responses in order does not support that.
|
||||
return false;
|
||||
}
|
||||
// the vectored get currently only supports a single LSN, so, bounce as soon
|
||||
// as the effective request_lsn changes
|
||||
return *accum_lsn == this_lsn;
|
||||
}
|
||||
.await =>
|
||||
{
|
||||
// ok to batch
|
||||
accum_pages.extend(this_pages);
|
||||
}
|
||||
(Some(_), this_msg) => {
|
||||
// by default, don't continue batching
|
||||
break Some(this_msg);
|
||||
}
|
||||
}
|
||||
|
||||
// debounce impl piece
|
||||
let started_at = debounce.get_or_insert_with(Instant::now);
|
||||
if started_at.elapsed() > *BOUNCE_TIMEOUT {
|
||||
break None;
|
||||
}
|
||||
};
|
||||
|
||||
// invoke handler function
|
||||
let (handler_result, span) = match neon_fe_msg {
|
||||
PagestreamFeMessage::Exists(req) => {
|
||||
let (handler_results, span): (
|
||||
smallvec::SmallVec<[Result<PagestreamBeMessage, PageStreamError>; 1]>,
|
||||
_,
|
||||
) = match batched.take().expect("loop above ensures this") {
|
||||
DebouncedFeMessage::Exists(req) => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::exists");
|
||||
let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
smallvec::smallvec![
|
||||
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
],
|
||||
span,
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::Nblocks(req) => {
|
||||
DebouncedFeMessage::Nblocks(req) => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::nblocks");
|
||||
let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
smallvec::smallvec![
|
||||
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
],
|
||||
span,
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::GetPage(req) => {
|
||||
DebouncedFeMessage::GetPage {
|
||||
span,
|
||||
shard,
|
||||
effective_request_lsn,
|
||||
pages,
|
||||
} => {
|
||||
CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM.observe(pages.len() as f64);
|
||||
span.record("batch_size", pages.len() as u64);
|
||||
fail::fail_point!("ps::handle-pagerequest-message::getpage");
|
||||
// shard_id is filled in by the handler
|
||||
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
{
|
||||
let npages = pages.len();
|
||||
let res = self
|
||||
.handle_get_page_at_lsn_request_batched(
|
||||
&shard,
|
||||
effective_request_lsn,
|
||||
pages,
|
||||
&ctx,
|
||||
)
|
||||
.instrument(span.clone())
|
||||
.await;
|
||||
assert_eq!(res.len(), npages);
|
||||
res
|
||||
},
|
||||
span,
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::DbSize(req) => {
|
||||
DebouncedFeMessage::DbSize(req) => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::dbsize");
|
||||
let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
smallvec::smallvec![
|
||||
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
],
|
||||
span,
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::GetSlruSegment(req) => {
|
||||
DebouncedFeMessage::GetSlruSegment(req) => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
|
||||
let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
|
||||
smallvec::smallvec![
|
||||
self.handle_get_slru_segment_request(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&req,
|
||||
&ctx
|
||||
)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
.await
|
||||
],
|
||||
span,
|
||||
)
|
||||
}
|
||||
DebouncedFeMessage::RespondError(span, e) => {
|
||||
// We've already decided to respond with an error, so we don't need to
|
||||
// call the handler.
|
||||
(smallvec::smallvec![Err(e)], span)
|
||||
}
|
||||
};
|
||||
|
||||
// Map handler result to protocol behavior.
|
||||
// Some handler errors cause exit from pagestream protocol.
|
||||
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
|
||||
let response_msg = match handler_result {
|
||||
Err(e) => match &e {
|
||||
PageStreamError::Shutdown => {
|
||||
// If we fail to fulfil a request during shutdown, which may be _because_ of
|
||||
// shutdown, then do not send the error to the client. Instead just drop the
|
||||
// connection.
|
||||
span.in_scope(|| info!("dropping connection due to shutdown"));
|
||||
return Err(QueryError::Shutdown);
|
||||
}
|
||||
PageStreamError::Reconnect(reason) => {
|
||||
span.in_scope(|| info!("handler requested reconnect: {reason}"));
|
||||
return Err(QueryError::Reconnect);
|
||||
}
|
||||
PageStreamError::Read(_)
|
||||
| PageStreamError::LsnTimeout(_)
|
||||
| PageStreamError::NotFound(_)
|
||||
| PageStreamError::BadRequest(_) => {
|
||||
// print the all details to the log with {:#}, but for the client the
|
||||
// error message is enough. Do not log if shutting down, as the anyhow::Error
|
||||
// here includes cancellation which is not an error.
|
||||
let full = utils::error::report_compact_sources(&e);
|
||||
span.in_scope(|| {
|
||||
error!("error reading relation or page version: {full:#}")
|
||||
});
|
||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
message: e.to_string(),
|
||||
})
|
||||
}
|
||||
},
|
||||
Ok(response_msg) => response_msg,
|
||||
};
|
||||
for handler_result in handler_results {
|
||||
let response_msg = match handler_result {
|
||||
Err(e) => match &e {
|
||||
PageStreamError::Shutdown => {
|
||||
// If we fail to fulfil a request during shutdown, which may be _because_ of
|
||||
// shutdown, then do not send the error to the client. Instead just drop the
|
||||
// connection.
|
||||
span.in_scope(|| info!("dropping connection due to shutdown"));
|
||||
return Err(QueryError::Shutdown);
|
||||
}
|
||||
PageStreamError::Reconnect(reason) => {
|
||||
span.in_scope(|| info!("handler requested reconnect: {reason}"));
|
||||
return Err(QueryError::Reconnect);
|
||||
}
|
||||
PageStreamError::Read(_)
|
||||
| PageStreamError::LsnTimeout(_)
|
||||
| PageStreamError::NotFound(_)
|
||||
| PageStreamError::BadRequest(_) => {
|
||||
// print the all details to the log with {:#}, but for the client the
|
||||
// error message is enough. Do not log if shutting down, as the anyhow::Error
|
||||
// here includes cancellation which is not an error.
|
||||
let full = utils::error::report_compact_sources(&e);
|
||||
span.in_scope(|| {
|
||||
error!("error reading relation or page version: {full:#}")
|
||||
});
|
||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
message: e.to_string(),
|
||||
})
|
||||
}
|
||||
},
|
||||
Ok(response_msg) => response_msg,
|
||||
};
|
||||
|
||||
// marshal & transmit response message
|
||||
pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
|
||||
// marshal & transmit response message
|
||||
pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
|
||||
}
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = self.cancel.cancelled() => {
|
||||
@@ -706,6 +899,9 @@ impl PageServerHandler {
|
||||
res?;
|
||||
}
|
||||
}
|
||||
|
||||
assert!(batched.is_none(), "we take() earlier");
|
||||
batched = next_batched;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -949,60 +1145,30 @@ impl PageServerHandler {
|
||||
}))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
async fn handle_get_page_at_lsn_request(
|
||||
#[instrument(skip_all)]
|
||||
async fn handle_get_page_at_lsn_request_batched(
|
||||
&mut self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
req: &PagestreamGetPageRequest,
|
||||
timeline: &Timeline,
|
||||
effective_lsn: Lsn,
|
||||
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let timeline = match self
|
||||
.timeline_handles
|
||||
.get(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
ShardSelector::Page(rel_block_to_key(req.rel, req.blkno)),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(tl) => tl,
|
||||
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
|
||||
// We already know this tenant exists in general, because we resolved it at
|
||||
// start of connection. Getting a NotFound here indicates that the shard containing
|
||||
// the requested page is not present on this node: the client's knowledge of shard->pageserver
|
||||
// mapping is out of date.
|
||||
//
|
||||
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
|
||||
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
|
||||
// and talk to a different pageserver.
|
||||
return Err(PageStreamError::Reconnect(
|
||||
"getpage@lsn request routed to wrong shard".into(),
|
||||
));
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
let _timer = timeline
|
||||
.query_metrics
|
||||
.start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx);
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
&timeline,
|
||||
req.request_lsn,
|
||||
req.not_modified_since,
|
||||
&latest_gc_cutoff_lsn,
|
||||
) -> smallvec::SmallVec<[Result<PagestreamBeMessage, PageStreamError>; 1]> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
let _timer = timeline.query_metrics.start_timer_many(
|
||||
metrics::SmgrQueryType::GetPageAtLsn,
|
||||
pages.len(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
);
|
||||
|
||||
let page = timeline
|
||||
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), ctx)
|
||||
.await?;
|
||||
let pages = timeline
|
||||
.get_rel_page_at_lsn_batched(pages, Version::Lsn(effective_lsn), ctx)
|
||||
.await;
|
||||
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
page,
|
||||
smallvec::SmallVec::from_iter(pages.into_iter().map(|page| {
|
||||
page.map(|page| {
|
||||
PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse { page })
|
||||
})
|
||||
.map_err(PageStreamError::Read)
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -1499,3 +1665,10 @@ fn set_tracing_field_shard_id(timeline: &Timeline) {
|
||||
);
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
}
|
||||
|
||||
struct WaitedForLsn(Lsn);
|
||||
impl From<WaitedForLsn> for Lsn {
|
||||
fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self {
|
||||
lsn
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,12 +9,17 @@
|
||||
use super::tenant::{PageReconstructError, Timeline};
|
||||
use crate::context::RequestContext;
|
||||
use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
|
||||
use crate::span::{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id,
|
||||
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
|
||||
};
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use crate::{aux_file, repository::*};
|
||||
use anyhow::{ensure, Context};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::key::{
|
||||
dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
|
||||
relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
|
||||
@@ -28,7 +33,7 @@ use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::{hash_map, HashMap, HashSet};
|
||||
use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
|
||||
use std::ops::ControlFlow;
|
||||
use std::ops::Range;
|
||||
use strum::IntoEnumIterator;
|
||||
@@ -191,26 +196,184 @@ impl Timeline {
|
||||
version: Version<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
if tag.relnode == 0 {
|
||||
return Err(PageReconstructError::Other(
|
||||
RelationError::InvalidRelnode.into(),
|
||||
));
|
||||
}
|
||||
let pages = smallvec::smallvec![(tag, blknum)];
|
||||
let res = self.get_rel_page_at_lsn_batched(pages, version, ctx).await;
|
||||
assert_eq!(res.len(), 1);
|
||||
res.into_iter().next().unwrap()
|
||||
}
|
||||
|
||||
let nblocks = self.get_rel_size(tag, version, ctx).await?;
|
||||
if blknum >= nblocks {
|
||||
debug!(
|
||||
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
|
||||
tag,
|
||||
blknum,
|
||||
version.get_lsn(),
|
||||
nblocks
|
||||
);
|
||||
return Ok(ZERO_PAGE.clone());
|
||||
/// Like [`get_rel_page_at_lsn`], but returns a batch of pages.
|
||||
pub(crate) async fn get_rel_page_at_lsn_batched(
|
||||
&self,
|
||||
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
|
||||
version: Version<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> smallvec::SmallVec<[Result<Bytes, PageReconstructError>; 1]> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
let request_lsn = match version {
|
||||
Version::Lsn(lsn) => lsn,
|
||||
Version::Modified(_) => panic!("unsupported"),
|
||||
};
|
||||
enum KeyState {
|
||||
NeedsVectoredGet,
|
||||
Done(Result<Bytes, PageReconstructError>),
|
||||
}
|
||||
let mut key_states = BTreeMap::new();
|
||||
let mut vectored_gets: smallvec::SmallVec<[_; 1]> =
|
||||
smallvec::SmallVec::with_capacity(pages.len());
|
||||
for (response_order, (tag, blknum)) in pages.into_iter().enumerate() {
|
||||
let key = rel_block_to_key(tag, blknum);
|
||||
use std::collections::btree_map::Entry;
|
||||
let key_state_slot = match key_states.entry((key, response_order)) {
|
||||
Entry::Occupied(_entry) => unreachable!(
|
||||
"enumerate makes keys unique, even if batch contains same key twice"
|
||||
),
|
||||
Entry::Vacant(entry) => entry,
|
||||
};
|
||||
|
||||
let key = rel_block_to_key(tag, blknum);
|
||||
version.get(self, key, ctx).await
|
||||
if tag.relnode == 0 {
|
||||
key_state_slot.insert(KeyState::Done(Err(PageReconstructError::Other(
|
||||
RelationError::InvalidRelnode.into(),
|
||||
))));
|
||||
continue;
|
||||
}
|
||||
|
||||
let nblocks = match self.get_rel_size(tag, version, ctx).await {
|
||||
Ok(nblocks) => nblocks,
|
||||
Err(err) => {
|
||||
key_state_slot.insert(KeyState::Done(Err(err)));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if blknum >= nblocks {
|
||||
debug!(
|
||||
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
|
||||
tag,
|
||||
blknum,
|
||||
version.get_lsn(),
|
||||
nblocks
|
||||
);
|
||||
key_state_slot.insert(KeyState::Done(Ok(ZERO_PAGE.clone())));
|
||||
continue;
|
||||
}
|
||||
|
||||
vectored_gets.push(key);
|
||||
key_state_slot.insert(KeyState::NeedsVectoredGet);
|
||||
}
|
||||
// turn vectored_gets into a keyspace
|
||||
let keyspace = {
|
||||
// add_key reuqires monotonicity
|
||||
vectored_gets.sort_unstable();
|
||||
let mut acc = KeySpaceAccum::new();
|
||||
for key in vectored_gets
|
||||
.into_iter()
|
||||
// in fact it requires strong monotonicity
|
||||
.dedup()
|
||||
{
|
||||
acc.add_key(key);
|
||||
}
|
||||
acc.to_keyspace()
|
||||
};
|
||||
|
||||
match self.get_vectored(keyspace, request_lsn, ctx).await {
|
||||
Ok(results) => {
|
||||
for (key, res) in results {
|
||||
if let Err(err) = &res {
|
||||
warn!(%key, ?err, "a key inside get_vectored failed with a per-key error");
|
||||
}
|
||||
let mut interests = key_states.range_mut((key, 0)..(key.next(), 0)).peekable();
|
||||
let first_interest = interests.next().unwrap();
|
||||
let next_interest = interests.peek().is_some();
|
||||
if !next_interest {
|
||||
match first_interest.1 {
|
||||
KeyState::NeedsVectoredGet => {
|
||||
*first_interest.1 = KeyState::Done(res);
|
||||
}
|
||||
KeyState::Done(_) => unreachable!(),
|
||||
}
|
||||
continue;
|
||||
} else {
|
||||
for ((_, _), state) in [first_interest].into_iter().chain(interests) {
|
||||
match state {
|
||||
KeyState::NeedsVectoredGet => {
|
||||
*state = KeyState::Done(match &res {
|
||||
Ok(buf) => Ok(buf.clone()),
|
||||
// this `match` is working around the fact that we cannot Clone the PageReconstructError
|
||||
Err(err) => Err(match err {
|
||||
PageReconstructError::Cancelled => {
|
||||
PageReconstructError::Cancelled
|
||||
}
|
||||
|
||||
x @ PageReconstructError::Other(_) |
|
||||
x @ PageReconstructError::AncestorLsnTimeout(_) |
|
||||
x @ PageReconstructError::WalRedo(_) |
|
||||
x @ PageReconstructError::MissingKey(_) => {
|
||||
PageReconstructError::Other(anyhow::anyhow!("there was more than one request for this key in the batch, error logged once: {x:?}"))
|
||||
},
|
||||
}),
|
||||
});
|
||||
}
|
||||
KeyState::Done(_) => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(?err, "get_vectored failed with a global error, mapping that error to per-key failure");
|
||||
// this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
|
||||
for ((_, _), state) in key_states.iter_mut() {
|
||||
// this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
|
||||
// but without taking ownership of the GetVectoredError
|
||||
match &err {
|
||||
GetVectoredError::Cancelled => {
|
||||
*state = KeyState::Done(Err(PageReconstructError::Cancelled));
|
||||
}
|
||||
// TODO: restructure get_vectored API to make this error per-key
|
||||
GetVectoredError::MissingKey(err) => {
|
||||
*state = KeyState::Done(Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more of the requested keys were missing: {err:?}"))));
|
||||
}
|
||||
// TODO: restructure get_vectored API to make this error per-key
|
||||
GetVectoredError::GetReadyAncestorError(err) => {
|
||||
*state = KeyState::Done(Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}"))));
|
||||
}
|
||||
// TODO: restructure get_vectored API to make this error per-key
|
||||
GetVectoredError::Other(err) => {
|
||||
*state = KeyState::Done(Err(PageReconstructError::Other(
|
||||
anyhow::anyhow!("whole vectored get request failed: {err:?}"),
|
||||
)));
|
||||
}
|
||||
// TODO: we can prevent this error class by moving this check into the type system
|
||||
GetVectoredError::InvalidLsn(e) => {
|
||||
*state =
|
||||
KeyState::Done(Err(anyhow::anyhow!("invalid LSN: {e:?}").into()));
|
||||
}
|
||||
// NB: this should never happen in practice because we limit MAX_GET_VECTORED_KEYS
|
||||
// TODO: we can prevent this error class by moving this check into the type system
|
||||
GetVectoredError::Oversized(err) => {
|
||||
*state = KeyState::Done(Err(anyhow::anyhow!(
|
||||
"batching oversized: {err:?}"
|
||||
)
|
||||
.into()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// get the results into the order in which they were requested
|
||||
let mut return_order: smallvec::SmallVec<[_; Timeline::MAX_GET_VECTORED_KEYS as usize]> =
|
||||
smallvec::SmallVec::with_capacity(key_states.len());
|
||||
return_order.extend(key_states.keys().map(|(key, idx)| (*key, *idx)));
|
||||
return_order.sort_unstable_by_key(|(_, idx)| *idx);
|
||||
let mut res = smallvec::SmallVec::with_capacity(key_states.len());
|
||||
res.extend(return_order.into_iter().map(|key_states_key| {
|
||||
match key_states.remove(&key_states_key).unwrap() {
|
||||
KeyState::Done(res) => res,
|
||||
KeyState::NeedsVectoredGet => unreachable!(),
|
||||
}
|
||||
}));
|
||||
res
|
||||
}
|
||||
|
||||
// Get size of a database in blocks
|
||||
@@ -1021,9 +1184,10 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// DatadirModification represents an operation to ingest an atomic set of
|
||||
/// updates to the repository. It is created by the 'begin_record'
|
||||
/// function. It is called for each WAL record, so that all the modifications
|
||||
/// by a one WAL record appear atomic.
|
||||
/// updates to the repository.
|
||||
///
|
||||
/// It is created by the 'begin_record' function. It is called for each WAL
|
||||
/// record, so that all the modifications by a one WAL record appear atomic.
|
||||
pub struct DatadirModification<'a> {
|
||||
/// The timeline this modification applies to. You can access this to
|
||||
/// read the state, but note that any pending updates are *not* reflected
|
||||
@@ -2048,6 +2212,7 @@ impl<'a> DatadirModification<'a> {
|
||||
|
||||
/// This struct facilitates accessing either a committed key from the timeline at a
|
||||
/// specific LSN, or the latest uncommitted key from a pending modification.
|
||||
///
|
||||
/// During WAL ingestion, the records from multiple LSNs may be batched in the same
|
||||
/// modification before being flushed to the timeline. Hence, the routines in WalIngest
|
||||
/// need to look up the keys in the modification first before looking them up in the
|
||||
|
||||
@@ -73,6 +73,21 @@ impl ValueBytes {
|
||||
|
||||
Ok(raw[8] == 1)
|
||||
}
|
||||
|
||||
pub(crate) fn is_image(raw: &[u8]) -> Result<bool, InvalidInput> {
|
||||
if raw.len() < 12 {
|
||||
return Err(InvalidInput::TooShortValue);
|
||||
}
|
||||
|
||||
let value_discriminator = &raw[0..4];
|
||||
|
||||
if value_discriminator == [0, 0, 0, 0] {
|
||||
// Value::Image always initializes
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
//! Timeline repository implementation that keeps old data in layer files, and
|
||||
//! the recent changes in ephemeral files.
|
||||
//!
|
||||
//! Timeline repository implementation that keeps old data in files on disk, and
|
||||
//! the recent changes in memory. See tenant/*_layer.rs files.
|
||||
//! The functions here are responsible for locating the correct layer for the
|
||||
//! get/put call, walking back the timeline branching history as needed.
|
||||
//! See tenant/*_layer.rs files. The functions here are responsible for locating
|
||||
//! the correct layer for the get/put call, walking back the timeline branching
|
||||
//! history as needed.
|
||||
//!
|
||||
//! The files are stored in the .neon/tenants/<tenant_id>/timelines/<timeline_id>
|
||||
//! directory. See docs/pageserver-storage.md for how the files are managed.
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
//! Describes the legacy now hopefully no longer modified per-timeline metadata stored in
|
||||
//! `index_part.json` managed by [`remote_timeline_client`]. For many tenants and their timelines,
|
||||
//! this struct and it's original serialization format is still needed because they were written a
|
||||
//! long time ago.
|
||||
//! Describes the legacy now hopefully no longer modified per-timeline metadata.
|
||||
//!
|
||||
//! It is stored in `index_part.json` managed by [`remote_timeline_client`]. For many tenants and
|
||||
//! their timelines, this struct and its original serialization format is still needed because
|
||||
//! they were written a long time ago.
|
||||
//!
|
||||
//! Instead of changing and adding versioning to this, just change [`IndexPart`] with soft json
|
||||
//! versioning.
|
||||
|
||||
@@ -282,9 +282,10 @@ impl BackgroundPurges {
|
||||
static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
|
||||
Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing));
|
||||
|
||||
/// The TenantManager is responsible for storing and mutating the collection of all tenants
|
||||
/// that this pageserver process has state for. Every Tenant and SecondaryTenant instance
|
||||
/// lives inside the TenantManager.
|
||||
/// Responsible for storing and mutating the collection of all tenants
|
||||
/// that this pageserver has state for.
|
||||
///
|
||||
/// Every Tenant and SecondaryTenant instance lives inside the TenantManager.
|
||||
///
|
||||
/// The most important role of the TenantManager is to prevent conflicts: e.g. trying to attach
|
||||
/// the same tenant twice concurrently, or trying to configure the same tenant into secondary
|
||||
@@ -2346,8 +2347,9 @@ pub enum TenantMapError {
|
||||
ShuttingDown,
|
||||
}
|
||||
|
||||
/// Guards a particular tenant_id's content in the TenantsMap. While this
|
||||
/// structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`]
|
||||
/// Guards a particular tenant_id's content in the TenantsMap.
|
||||
///
|
||||
/// While this structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`]
|
||||
/// for this tenant, which acts as a marker for any operations targeting
|
||||
/// this tenant to retry later, or wait for the InProgress state to end.
|
||||
///
|
||||
|
||||
@@ -2184,6 +2184,8 @@ pub fn remote_timeline_path(
|
||||
remote_timelines_path(tenant_shard_id).join(Utf8Path::new(&timeline_id.to_string()))
|
||||
}
|
||||
|
||||
/// Obtains the path of the given Layer in the remote
|
||||
///
|
||||
/// Note that the shard component of a remote layer path is _not_ always the same
|
||||
/// as in the TenantShardId of the caller: tenants may reference layers from a different
|
||||
/// ShardIndex. Use the ShardIndex from the layer's metadata.
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
//! In-memory index to track the tenant files on the remote storage.
|
||||
//!
|
||||
//! Able to restore itself from the storage index parts, that are located in every timeline's remote directory and contain all data about
|
||||
//! remote timeline layers and its metadata.
|
||||
|
||||
|
||||
@@ -8,15 +8,17 @@ mod layer_desc;
|
||||
mod layer_name;
|
||||
pub mod merge_iterator;
|
||||
|
||||
use tokio::sync::{self};
|
||||
use utils::bin_ser::BeSer;
|
||||
pub mod split_writer;
|
||||
|
||||
use crate::context::{AccessStatsBehavior, RequestContext};
|
||||
use crate::repository::Value;
|
||||
use crate::repository::{Value, ValueBytes};
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::key::{Key, DBDIR_KEY};
|
||||
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
|
||||
use std::cmp::{Ordering, Reverse};
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{BinaryHeap, HashMap};
|
||||
use std::ops::Range;
|
||||
@@ -79,12 +81,18 @@ pub(crate) enum ValueReconstructSituation {
|
||||
}
|
||||
|
||||
/// Reconstruct data accumulated for a single key during a vectored get
|
||||
#[derive(Debug, Default, Clone)]
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct VectoredValueReconstructState {
|
||||
pub(crate) records: Vec<(Lsn, NeonWalRecord)>,
|
||||
pub(crate) img: Option<(Lsn, Bytes)>,
|
||||
pub(crate) records: Vec<(
|
||||
Lsn,
|
||||
tokio::sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
|
||||
)>,
|
||||
pub(crate) img: Option<(
|
||||
Lsn,
|
||||
tokio::sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
|
||||
)>,
|
||||
|
||||
situation: ValueReconstructSituation,
|
||||
pub(crate) situation: ValueReconstructSituation,
|
||||
}
|
||||
|
||||
impl VectoredValueReconstructState {
|
||||
@@ -93,16 +101,57 @@ impl VectoredValueReconstructState {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<VectoredValueReconstructState> for ValueReconstructState {
|
||||
fn from(mut state: VectoredValueReconstructState) -> Self {
|
||||
// walredo expects the records to be descending in terms of Lsn
|
||||
state.records.sort_by_key(|(lsn, _)| Reverse(*lsn));
|
||||
pub(crate) async fn convert(
|
||||
_key: Key,
|
||||
from: VectoredValueReconstructState,
|
||||
) -> Result<ValueReconstructState, PageReconstructError> {
|
||||
let mut to = ValueReconstructState::default();
|
||||
|
||||
ValueReconstructState {
|
||||
records: state.records,
|
||||
img: state.img,
|
||||
for (lsn, fut) in from.records {
|
||||
match fut.await {
|
||||
Ok(res) => match res {
|
||||
Ok(bytes) => {
|
||||
let value = Value::des(&bytes)
|
||||
.map_err(|err| PageReconstructError::Other(err.into()))?;
|
||||
|
||||
match value {
|
||||
Value::WalRecord(rec) => {
|
||||
to.records.push((lsn, rec));
|
||||
},
|
||||
Value::Image(img) => {
|
||||
assert!(to.img.is_none());
|
||||
to.img = Some((lsn, img));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
return Err(PageReconstructError::Other(err.into()));
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
return Err(PageReconstructError::Other(err.into()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if to.img.is_none() {
|
||||
let (lsn, fut) = from.img.expect("Need an image");
|
||||
match fut.await {
|
||||
Ok(res) => match res {
|
||||
Ok(bytes) => {
|
||||
to.img = Some((lsn, bytes));
|
||||
}
|
||||
Err(err) => {
|
||||
return Err(PageReconstructError::Other(err.into()));
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
return Err(PageReconstructError::Other(err.into()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(to)
|
||||
}
|
||||
|
||||
/// Bag of data accumulated during a vectored get..
|
||||
@@ -200,7 +249,8 @@ impl ValuesReconstructState {
|
||||
&mut self,
|
||||
key: &Key,
|
||||
lsn: Lsn,
|
||||
value: Value,
|
||||
completes: bool,
|
||||
value: sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
|
||||
) -> ValueReconstructSituation {
|
||||
let state = self
|
||||
.keys
|
||||
@@ -208,31 +258,14 @@ impl ValuesReconstructState {
|
||||
.or_insert(Ok(VectoredValueReconstructState::default()));
|
||||
|
||||
if let Ok(state) = state {
|
||||
let key_done = match state.situation {
|
||||
match state.situation {
|
||||
ValueReconstructSituation::Complete => unreachable!(),
|
||||
ValueReconstructSituation::Continue => match value {
|
||||
Value::Image(img) => {
|
||||
state.img = Some((lsn, img));
|
||||
true
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
debug_assert!(
|
||||
Some(lsn) > state.get_cached_lsn(),
|
||||
"Attempt to collect a record below cached LSN for walredo: {} < {}",
|
||||
lsn,
|
||||
state
|
||||
.get_cached_lsn()
|
||||
.expect("Assertion can only fire if a cached lsn is present")
|
||||
);
|
||||
ValueReconstructSituation::Continue => {
|
||||
state.records.push((lsn, value));
|
||||
}
|
||||
}
|
||||
|
||||
let will_init = rec.will_init();
|
||||
state.records.push((lsn, rec));
|
||||
will_init
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
if key_done && state.situation == ValueReconstructSituation::Continue {
|
||||
if completes && state.situation == ValueReconstructSituation::Continue {
|
||||
state.situation = ValueReconstructSituation::Complete;
|
||||
self.keys_done.add_key(*key);
|
||||
}
|
||||
@@ -434,10 +467,11 @@ impl ReadableLayer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Layers contain a hint indicating whether they are likely to be used for reads. This is a hint rather
|
||||
/// than an authoritative value, so that we do not have to update it synchronously when changing the visibility
|
||||
/// of layers (for example when creating a branch that makes some previously covered layers visible). It should
|
||||
/// be used for cache management but not for correctness-critical checks.
|
||||
/// Layers contain a hint indicating whether they are likely to be used for reads.
|
||||
///
|
||||
/// This is a hint rather than an authoritative value, so that we do not have to update it synchronously
|
||||
/// when changing the visibility of layers (for example when creating a branch that makes some previously
|
||||
/// covered layers visible). It should be used for cache management but not for correctness-critical checks.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum LayerVisibilityHint {
|
||||
/// A Visible layer might be read while serving a read, because there is not an image layer between it
|
||||
|
||||
@@ -42,13 +42,12 @@ use crate::tenant::vectored_blob_io::{
|
||||
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
|
||||
VectoredReadCoalesceMode, VectoredReadPlanner,
|
||||
};
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use crate::{walrecord, TEMP_FILE_SUFFIX};
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::BytesMut;
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use futures::StreamExt;
|
||||
use itertools::Itertools;
|
||||
@@ -58,14 +57,14 @@ use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::VecDeque;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::fs::File;
|
||||
use std::io::SeekFrom;
|
||||
use std::ops::Range;
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio::sync::{self, OnceCell};
|
||||
use tokio_epoll_uring::IoBuf;
|
||||
use tracing::*;
|
||||
|
||||
@@ -136,10 +135,11 @@ impl Summary {
|
||||
// Flag indicating that this version initialize the page
|
||||
const WILL_INIT: u64 = 1;
|
||||
|
||||
/// Struct representing reference to BLOB in layers. Reference contains BLOB
|
||||
/// offset, and for WAL records it also contains `will_init` flag. The flag
|
||||
/// helps to determine the range of records that needs to be applied, without
|
||||
/// reading/deserializing records themselves.
|
||||
/// Struct representing reference to BLOB in layers.
|
||||
///
|
||||
/// Reference contains BLOB offset, and for WAL records it also contains
|
||||
/// `will_init` flag. The flag helps to determine the range of records
|
||||
/// that needs to be applied, without reading/deserializing records themselves.
|
||||
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
|
||||
pub struct BlobRef(pub u64);
|
||||
|
||||
@@ -223,7 +223,7 @@ pub struct DeltaLayerInner {
|
||||
index_start_blk: u32,
|
||||
index_root_blk: u32,
|
||||
|
||||
file: VirtualFile,
|
||||
file: Arc<VirtualFile>,
|
||||
file_id: FileId,
|
||||
|
||||
layer_key_range: Range<Key>,
|
||||
@@ -787,9 +787,11 @@ impl DeltaLayerInner {
|
||||
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
let file = VirtualFile::open(path, ctx)
|
||||
.await
|
||||
.context("open layer file")?;
|
||||
let file = Arc::new(
|
||||
VirtualFile::open(path, ctx)
|
||||
.await
|
||||
.context("open layer file")?,
|
||||
);
|
||||
|
||||
let file_id = page_cache::next_file_id();
|
||||
|
||||
@@ -979,77 +981,59 @@ impl DeltaLayerInner {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) {
|
||||
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
|
||||
let mut ignore_key_with_err = None;
|
||||
|
||||
let max_vectored_read_bytes = self
|
||||
.max_vectored_read_bytes
|
||||
.expect("Layer is loaded with max vectored bytes config")
|
||||
.0
|
||||
.into();
|
||||
let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
|
||||
let mut buf = Some(BytesMut::with_capacity(buf_size));
|
||||
|
||||
// Note that reads are processed in reverse order (from highest key+lsn).
|
||||
// This is the order that `ReconstructState` requires such that it can
|
||||
// track when a key is done.
|
||||
for read in reads.into_iter().rev() {
|
||||
let res = vectored_blob_reader
|
||||
.read_blobs(&read, buf.take().expect("Should have a buffer"), ctx)
|
||||
.await;
|
||||
|
||||
let blobs_buf = match res {
|
||||
Ok(blobs_buf) => blobs_buf,
|
||||
Err(err) => {
|
||||
let kind = err.kind();
|
||||
for (_, blob_meta) in read.blobs_at.as_slice() {
|
||||
reconstruct_state.on_key_error(
|
||||
blob_meta.key,
|
||||
PageReconstructError::Other(anyhow!(
|
||||
"Failed to read blobs from virtual file {}: {}",
|
||||
self.file.path,
|
||||
kind
|
||||
)),
|
||||
);
|
||||
}
|
||||
|
||||
// We have "lost" the buffer since the lower level IO api
|
||||
// doesn't return the buffer on error. Allocate a new one.
|
||||
buf = Some(BytesMut::with_capacity(buf_size));
|
||||
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
for meta in blobs_buf.blobs.iter().rev() {
|
||||
if Some(meta.meta.key) == ignore_key_with_err {
|
||||
continue;
|
||||
}
|
||||
|
||||
let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
|
||||
let value = match value {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
reconstruct_state.on_key_error(
|
||||
meta.meta.key,
|
||||
PageReconstructError::Other(anyhow!(e).context(format!(
|
||||
"Failed to deserialize blob from virtual file {}",
|
||||
self.file.path,
|
||||
))),
|
||||
);
|
||||
|
||||
ignore_key_with_err = Some(meta.meta.key);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
|
||||
// state, no further updates shall be made to it. The call below will
|
||||
// panic if the invariant is violated.
|
||||
reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
|
||||
let mut senders: HashMap<
|
||||
(Key, Lsn),
|
||||
sync::oneshot::Sender<Result<Bytes, std::io::Error>>,
|
||||
> = Default::default();
|
||||
for (_, blob_meta) in read.blobs_at.as_slice() {
|
||||
let (tx, rx) = sync::oneshot::channel();
|
||||
senders.insert((blob_meta.key, blob_meta.lsn), tx);
|
||||
reconstruct_state.update_key(
|
||||
&blob_meta.key,
|
||||
blob_meta.lsn,
|
||||
blob_meta.will_init,
|
||||
rx,
|
||||
);
|
||||
}
|
||||
|
||||
buf = Some(blobs_buf.buf);
|
||||
let read_from = self.file.clone();
|
||||
let read_ctx = ctx.attached_child();
|
||||
tokio::task::spawn(async move {
|
||||
let vectored_blob_reader = VectoredBlobReader::new(&read_from);
|
||||
let buf = BytesMut::with_capacity(buf_size);
|
||||
|
||||
let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
|
||||
match res {
|
||||
Ok(blobs_buf) => {
|
||||
for meta in blobs_buf.blobs.iter().rev() {
|
||||
let buf = &blobs_buf.buf[meta.start..meta.end];
|
||||
let sender = senders
|
||||
.remove(&(meta.meta.key, meta.meta.lsn))
|
||||
.expect("sender must exist");
|
||||
let _ = sender.send(Ok(Bytes::copy_from_slice(buf)));
|
||||
}
|
||||
|
||||
assert!(senders.is_empty());
|
||||
}
|
||||
Err(err) => {
|
||||
for (_, sender) in senders {
|
||||
let _ = sender
|
||||
.send(Err(std::io::Error::new(err.kind(), "vec read failed")));
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1189,7 +1173,14 @@ impl DeltaLayerInner {
|
||||
let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
|
||||
let end_offset = offset;
|
||||
|
||||
Some((BlobMeta { key, lsn }, start_offset..end_offset))
|
||||
Some((
|
||||
BlobMeta {
|
||||
key,
|
||||
lsn,
|
||||
will_init: false,
|
||||
},
|
||||
start_offset..end_offset,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
//! An ImageLayer represents an image or a snapshot of a key-range at
|
||||
//! one particular LSN. It contains an image of all key-value pairs
|
||||
//! in its key-range. Any key that falls into the image layer's range
|
||||
//! but does not exist in the layer, does not exist.
|
||||
//! one particular LSN.
|
||||
//!
|
||||
//! It contains an image of all key-value pairs in its key-range. Any key
|
||||
//! that falls into the image layer's range but does not exist in the layer,
|
||||
//! does not exist.
|
||||
//!
|
||||
//! An image layer is stored in a file on disk. The file is stored in
|
||||
//! timelines/<timeline_id> directory. Currently, there are no
|
||||
@@ -19,7 +21,7 @@
|
||||
//!
|
||||
//! Every image layer file consists of three parts: "summary",
|
||||
//! "index", and "values". The summary is a fixed size header at the
|
||||
//! beginning of the file, and it contains basic information about the
|
||||
//! beginningof the file, and it contains basic information about the
|
||||
//! layer, and offsets to the other parts. The "index" is a B-tree,
|
||||
//! mapping from Key to an offset in the "values" part. The
|
||||
//! actual page images are stored in the "values" part.
|
||||
@@ -36,11 +38,11 @@ use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::vectored_blob_io::{
|
||||
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
|
||||
};
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
use crate::tenant::Timeline;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use hex;
|
||||
@@ -50,13 +52,14 @@ use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::shard::{ShardIdentity, TenantShardId};
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::VecDeque;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::fs::File;
|
||||
use std::io::SeekFrom;
|
||||
use std::ops::Range;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::*;
|
||||
@@ -161,7 +164,7 @@ pub struct ImageLayerInner {
|
||||
key_range: Range<Key>,
|
||||
lsn: Lsn,
|
||||
|
||||
file: VirtualFile,
|
||||
file: Arc<VirtualFile>,
|
||||
file_id: FileId,
|
||||
|
||||
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
|
||||
@@ -388,9 +391,11 @@ impl ImageLayerInner {
|
||||
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
let file = VirtualFile::open(path, ctx)
|
||||
.await
|
||||
.context("open layer file")?;
|
||||
let file = Arc::new(
|
||||
VirtualFile::open(path, ctx)
|
||||
.await
|
||||
.context("open layer file")?,
|
||||
);
|
||||
let file_id = page_cache::next_file_id();
|
||||
let block_reader = FileBlockReader::new(&file, file_id);
|
||||
let summary_blk = block_reader
|
||||
@@ -577,8 +582,16 @@ impl ImageLayerInner {
|
||||
.0
|
||||
.into();
|
||||
|
||||
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
|
||||
for read in reads.into_iter() {
|
||||
let mut senders: HashMap<(Key, Lsn), oneshot::Sender<Result<Bytes, std::io::Error>>> =
|
||||
Default::default();
|
||||
for (_, blob_meta) in read.blobs_at.as_slice() {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
senders.insert((blob_meta.key, blob_meta.lsn), tx);
|
||||
|
||||
reconstruct_state.update_key(&blob_meta.key, blob_meta.lsn, true, rx);
|
||||
}
|
||||
|
||||
let buf_size = read.size();
|
||||
|
||||
if buf_size > max_vectored_read_bytes {
|
||||
@@ -597,36 +610,33 @@ impl ImageLayerInner {
|
||||
);
|
||||
}
|
||||
|
||||
let buf = BytesMut::with_capacity(buf_size);
|
||||
let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
|
||||
let read_from = self.file.clone();
|
||||
let read_ctx = ctx.attached_child();
|
||||
tokio::task::spawn(async move {
|
||||
let buf = BytesMut::with_capacity(buf_size);
|
||||
let vectored_blob_reader = VectoredBlobReader::new(&*read_from);
|
||||
let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
|
||||
|
||||
match res {
|
||||
Ok(blobs_buf) => {
|
||||
let frozen_buf = blobs_buf.buf.freeze();
|
||||
match res {
|
||||
Ok(blobs_buf) => {
|
||||
for meta in blobs_buf.blobs.iter().rev() {
|
||||
let buf = &blobs_buf.buf[meta.start..meta.end];
|
||||
let sender = senders
|
||||
.remove(&(meta.meta.key, meta.meta.lsn))
|
||||
.expect("sender must exist");
|
||||
let _ = sender.send(Ok(Bytes::copy_from_slice(buf)));
|
||||
}
|
||||
|
||||
for meta in blobs_buf.blobs.iter() {
|
||||
let img_buf = frozen_buf.slice(meta.start..meta.end);
|
||||
reconstruct_state.update_key(
|
||||
&meta.meta.key,
|
||||
self.lsn,
|
||||
Value::Image(img_buf),
|
||||
);
|
||||
assert!(senders.is_empty());
|
||||
}
|
||||
Err(err) => {
|
||||
for (_, sender) in senders {
|
||||
let _ = sender
|
||||
.send(Err(std::io::Error::new(err.kind(), "vec read failed")));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
let kind = err.kind();
|
||||
for (_, blob_meta) in read.blobs_at.as_slice() {
|
||||
reconstruct_state.on_key_error(
|
||||
blob_meta.key,
|
||||
PageReconstructError::from(anyhow!(
|
||||
"Failed to read blobs from virtual file {}: {}",
|
||||
self.file.path,
|
||||
kind
|
||||
)),
|
||||
);
|
||||
}
|
||||
}
|
||||
};
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,10 +10,9 @@ use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::tenant::ephemeral_file::EphemeralFile;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
|
||||
use crate::{l0_flush, page_cache};
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use anyhow::{Context, Result};
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::key::CompactKey;
|
||||
@@ -35,9 +34,7 @@ use std::sync::atomic::Ordering as AtomicOrdering;
|
||||
use std::sync::atomic::{AtomicU64, AtomicUsize};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use super::{
|
||||
DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState,
|
||||
};
|
||||
use super::{DeltaLayerWriter, PersistentLayerDesc, ValuesReconstructState};
|
||||
|
||||
pub(crate) mod vectored_dio_read;
|
||||
|
||||
@@ -87,7 +84,7 @@ pub struct InMemoryLayerInner {
|
||||
/// The values are stored in a serialized format in this file.
|
||||
/// Each serialized Value is preceded by a 'u32' length field.
|
||||
/// PerSeg::page_versions map stores offsets into this file.
|
||||
file: EphemeralFile,
|
||||
file: Arc<tokio::sync::RwLock<EphemeralFile>>,
|
||||
|
||||
resource_units: GlobalResourceUnits,
|
||||
}
|
||||
@@ -381,7 +378,11 @@ impl InMemoryLayer {
|
||||
}
|
||||
|
||||
pub(crate) fn try_len(&self) -> Option<u64> {
|
||||
self.inner.try_read().map(|i| i.file.len()).ok()
|
||||
self.inner
|
||||
.try_read()
|
||||
.map(|i| i.file.try_read().map(|i| i.len()).ok())
|
||||
.ok()
|
||||
.flatten()
|
||||
}
|
||||
|
||||
pub(crate) fn assert_writable(&self) {
|
||||
@@ -432,6 +433,10 @@ impl InMemoryLayer {
|
||||
read: vectored_dio_read::LogicalRead<Vec<u8>>,
|
||||
}
|
||||
let mut reads: HashMap<Key, Vec<ValueRead>> = HashMap::new();
|
||||
let mut senders: HashMap<
|
||||
(Key, Lsn),
|
||||
tokio::sync::oneshot::Sender<Result<Bytes, std::io::Error>>,
|
||||
> = Default::default();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
for (key, vec_map) in inner
|
||||
@@ -459,6 +464,11 @@ impl InMemoryLayer {
|
||||
Vec::with_capacity(len as usize),
|
||||
),
|
||||
});
|
||||
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
senders.insert((key, *entry_lsn), tx);
|
||||
reconstruct_state.update_key(&key, *entry_lsn, will_init, rx);
|
||||
|
||||
if will_init {
|
||||
break;
|
||||
}
|
||||
@@ -466,46 +476,42 @@ impl InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
// Execute the reads.
|
||||
let read_from = inner.file.clone();
|
||||
let read_ctx = ctx.attached_child();
|
||||
tokio::task::spawn(async move {
|
||||
let locked = read_from.read().await;
|
||||
let f = vectored_dio_read::execute(
|
||||
&*locked,
|
||||
reads
|
||||
.iter()
|
||||
.flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)),
|
||||
&read_ctx,
|
||||
);
|
||||
send_future::SendFuture::send(f) // https://github.com/rust-lang/rust/issues/96865
|
||||
.await;
|
||||
|
||||
let f = vectored_dio_read::execute(
|
||||
&inner.file,
|
||||
reads
|
||||
.iter()
|
||||
.flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)),
|
||||
&ctx,
|
||||
);
|
||||
send_future::SendFuture::send(f) // https://github.com/rust-lang/rust/issues/96865
|
||||
.await;
|
||||
|
||||
// Process results into the reconstruct state
|
||||
'next_key: for (key, value_reads) in reads {
|
||||
for ValueRead { entry_lsn, read } in value_reads {
|
||||
match read.into_result().expect("we run execute() above") {
|
||||
Err(e) => {
|
||||
reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e)));
|
||||
continue 'next_key;
|
||||
}
|
||||
Ok(value_buf) => {
|
||||
let value = Value::des(&value_buf);
|
||||
if let Err(e) = value {
|
||||
reconstruct_state
|
||||
.on_key_error(key, PageReconstructError::from(anyhow!(e)));
|
||||
continue 'next_key;
|
||||
for (key, value_reads) in reads {
|
||||
for ValueRead { entry_lsn, read } in value_reads {
|
||||
let sender = senders
|
||||
.remove(&(key, entry_lsn))
|
||||
.expect("sender must exist");
|
||||
match read.into_result().expect("we run execute() above") {
|
||||
Err(e) => {
|
||||
let sender = senders
|
||||
.remove(&(key, entry_lsn))
|
||||
.expect("sender must exist");
|
||||
let _ = sender
|
||||
.send(Err(std::io::Error::new(e.kind(), "dio vec read failed")));
|
||||
}
|
||||
|
||||
let key_situation =
|
||||
reconstruct_state.update_key(&key, entry_lsn, value.unwrap());
|
||||
if key_situation == ValueReconstructSituation::Complete {
|
||||
// TODO: metric to see if we fetched more values than necessary
|
||||
continue 'next_key;
|
||||
Ok(value_buf) => {
|
||||
let _ = sender.send(Ok(value_buf.into()));
|
||||
}
|
||||
|
||||
// process the next value in the next iteration of the loop
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assert!(senders.is_empty());
|
||||
});
|
||||
|
||||
reconstruct_state.on_lsn_advanced(&keyspace, self.start_lsn);
|
||||
|
||||
@@ -600,7 +606,8 @@ impl InMemoryLayer {
|
||||
/// Get layer size.
|
||||
pub async fn size(&self) -> Result<u64> {
|
||||
let inner = self.inner.read().await;
|
||||
Ok(inner.file.len())
|
||||
let locked = inner.file.try_read().expect("no contention");
|
||||
Ok(locked.len())
|
||||
}
|
||||
|
||||
/// Create a new, empty, in-memory layer
|
||||
@@ -614,9 +621,10 @@ impl InMemoryLayer {
|
||||
) -> Result<InMemoryLayer> {
|
||||
trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
|
||||
|
||||
let file =
|
||||
EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?;
|
||||
let key = InMemoryLayerFileId(file.page_cache_file_id());
|
||||
let file = Arc::new(tokio::sync::RwLock::new(
|
||||
EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?,
|
||||
));
|
||||
let key = InMemoryLayerFileId(file.read().await.page_cache_file_id());
|
||||
|
||||
Ok(InMemoryLayer {
|
||||
file_id: key,
|
||||
@@ -648,7 +656,7 @@ impl InMemoryLayer {
|
||||
let mut inner = self.inner.write().await;
|
||||
self.assert_writable();
|
||||
|
||||
let base_offset = inner.file.len();
|
||||
let base_offset = inner.file.read().await.len();
|
||||
|
||||
let SerializedBatch {
|
||||
raw,
|
||||
@@ -672,8 +680,13 @@ impl InMemoryLayer {
|
||||
}
|
||||
|
||||
// Write the batch to the file
|
||||
inner.file.write_raw(&raw, ctx).await?;
|
||||
let new_size = inner.file.len();
|
||||
// FIXME: can't borrow arc
|
||||
let new_size = {
|
||||
let mut locked = inner.file.write().await;
|
||||
locked.write_raw(&raw, ctx).await?;
|
||||
locked.len()
|
||||
};
|
||||
|
||||
let expected_new_len = base_offset
|
||||
.checked_add(raw.len().into_u64())
|
||||
// write_raw would error if we were to overflow u64.
|
||||
@@ -713,7 +726,7 @@ impl InMemoryLayer {
|
||||
|
||||
pub(crate) async fn tick(&self) -> Option<u64> {
|
||||
let mut inner = self.inner.write().await;
|
||||
let size = inner.file.len();
|
||||
let size = inner.file.read().await.len();
|
||||
inner.resource_units.publish_size(size)
|
||||
}
|
||||
|
||||
@@ -809,7 +822,7 @@ impl InMemoryLayer {
|
||||
|
||||
match l0_flush_global_state {
|
||||
l0_flush::Inner::Direct { .. } => {
|
||||
let file_contents: Vec<u8> = inner.file.load_to_vec(ctx).await?;
|
||||
let file_contents: Vec<u8> = inner.file.read().await.load_to_vec(ctx).await?;
|
||||
|
||||
let file_contents = Bytes::from(file_contents);
|
||||
|
||||
|
||||
@@ -107,6 +107,8 @@ async fn smoke_test() {
|
||||
.expect("tenant harness writes the control file")
|
||||
};
|
||||
|
||||
let img_before = (img_before.0, img_before.1.await.unwrap().unwrap());
|
||||
let img_after = (img_after.0, img_after.1.await.unwrap().unwrap());
|
||||
assert_eq!(img_before, img_after);
|
||||
|
||||
// evict_and_wait can timeout, but it doesn't cancel the evicting itself
|
||||
|
||||
@@ -12,8 +12,10 @@ use serde::{Deserialize, Serialize};
|
||||
#[cfg(test)]
|
||||
use utils::id::TenantId;
|
||||
|
||||
/// A unique identifier of a persistent layer. This is different from `LayerDescriptor`, which is only used in the
|
||||
/// benchmarks. This struct contains all necessary information to find the image / delta layer. It also provides
|
||||
/// A unique identifier of a persistent layer.
|
||||
///
|
||||
/// This is different from `LayerDescriptor`, which is only used in the benchmarks.
|
||||
/// This struct contains all necessary information to find the image / delta layer. It also provides
|
||||
/// a unified way to generate layer information like file name.
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Hash)]
|
||||
pub struct PersistentLayerDesc {
|
||||
|
||||
@@ -217,8 +217,9 @@ impl fmt::Display for ImageLayerName {
|
||||
}
|
||||
}
|
||||
|
||||
/// LayerName is the logical identity of a layer within a LayerMap at a moment in time. The
|
||||
/// LayerName is not a unique filename, as the same LayerName may have multiple physical incarnations
|
||||
/// LayerName is the logical identity of a layer within a LayerMap at a moment in time.
|
||||
///
|
||||
/// The LayerName is not a unique filename, as the same LayerName may have multiple physical incarnations
|
||||
/// over time (e.g. across shard splits or compression). The physical filenames of layers in local
|
||||
/// storage and object names in remote storage consist of the LayerName plus some extra qualifiers
|
||||
/// that uniquely identify the physical incarnation of a layer (see [crate::tenant::remote_timeline_client::remote_layer_path])
|
||||
|
||||
@@ -226,9 +226,11 @@ impl<'a> IteratorWrapper<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
/// A merge iterator over delta/image layer iterators. When duplicated records are
|
||||
/// found, the iterator will not perform any deduplication, and the caller should handle
|
||||
/// these situation. By saying duplicated records, there are many possibilities:
|
||||
/// A merge iterator over delta/image layer iterators.
|
||||
///
|
||||
/// When duplicated records are found, the iterator will not perform any
|
||||
/// deduplication, and the caller should handle these situation. By saying
|
||||
/// duplicated records, there are many possibilities:
|
||||
///
|
||||
/// * Two same delta at the same LSN.
|
||||
/// * Two same image at the same LSN.
|
||||
|
||||
@@ -34,9 +34,10 @@ impl SplitWriterResult {
|
||||
}
|
||||
}
|
||||
|
||||
/// An image writer that takes images and produces multiple image layers. The interface does not
|
||||
/// guarantee atomicity (i.e., if the image layer generation fails, there might be leftover files
|
||||
/// to be cleaned up)
|
||||
/// An image writer that takes images and produces multiple image layers.
|
||||
///
|
||||
/// The interface does not guarantee atomicity (i.e., if the image layer generation
|
||||
/// fails, there might be leftover files to be cleaned up)
|
||||
#[must_use]
|
||||
pub struct SplitImageLayerWriter {
|
||||
inner: ImageLayerWriter,
|
||||
@@ -193,9 +194,10 @@ impl SplitImageLayerWriter {
|
||||
}
|
||||
}
|
||||
|
||||
/// A delta writer that takes key-lsn-values and produces multiple delta layers. The interface does not
|
||||
/// guarantee atomicity (i.e., if the delta layer generation fails, there might be leftover files
|
||||
/// to be cleaned up).
|
||||
/// A delta writer that takes key-lsn-values and produces multiple delta layers.
|
||||
///
|
||||
/// The interface does not guarantee atomicity (i.e., if the delta layer generation fails,
|
||||
/// there might be leftover files to be cleaned up).
|
||||
///
|
||||
/// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
|
||||
/// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
|
||||
|
||||
@@ -18,6 +18,7 @@ use camino::Utf8Path;
|
||||
use chrono::{DateTime, Utc};
|
||||
use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
use futures::{stream::FuturesUnordered, StreamExt};
|
||||
use handle::ShardTimelineId;
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::{
|
||||
@@ -68,7 +69,9 @@ use crate::{
|
||||
tenant::{
|
||||
layer_map::{LayerMap, SearchResult},
|
||||
metadata::TimelineMetadata,
|
||||
storage_layer::{inmemory_layer::IndexEntry, PersistentLayerDesc},
|
||||
storage_layer::{
|
||||
convert, inmemory_layer::IndexEntry, PersistentLayerDesc, ValueReconstructSituation,
|
||||
},
|
||||
},
|
||||
walredo,
|
||||
};
|
||||
@@ -1129,22 +1132,38 @@ impl Timeline {
|
||||
let reconstruct_timer = crate::metrics::RECONSTRUCT_TIME
|
||||
.for_get_kind(get_kind)
|
||||
.start_timer();
|
||||
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
|
||||
let layers_visited = reconstruct_state.get_layers_visited();
|
||||
|
||||
let futs = FuturesUnordered::new();
|
||||
for (key, res) in std::mem::take(&mut reconstruct_state.keys) {
|
||||
match res {
|
||||
Err(err) => {
|
||||
results.insert(key, Err(err));
|
||||
}
|
||||
Ok(state) => {
|
||||
let state = ValueReconstructState::from(state);
|
||||
futs.push({
|
||||
let walredo_self = self.myself.upgrade().expect("&self method holds the arc");
|
||||
async move {
|
||||
let state = res.expect("Read path is infallible");
|
||||
assert!(matches!(
|
||||
state.situation,
|
||||
ValueReconstructSituation::Complete
|
||||
));
|
||||
|
||||
let reconstruct_res = self.reconstruct_value(key, lsn, state).await;
|
||||
results.insert(key, reconstruct_res);
|
||||
let converted = match convert(key, state).await {
|
||||
Ok(ok) => ok,
|
||||
Err(err) => {
|
||||
return (key, Err(err));
|
||||
}
|
||||
};
|
||||
|
||||
(
|
||||
key,
|
||||
walredo_self.reconstruct_value(key, lsn, converted).await,
|
||||
)
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let results = futs
|
||||
.collect::<BTreeMap<Key, Result<Bytes, PageReconstructError>>>()
|
||||
.await;
|
||||
|
||||
reconstruct_timer.stop_and_record();
|
||||
|
||||
// For aux file keys (v1 or v2) the vectored read path does not return an error
|
||||
@@ -5496,30 +5515,30 @@ impl Timeline {
|
||||
#[cfg(test)]
|
||||
pub(crate) async fn inspect_image_layers(
|
||||
self: &Arc<Timeline>,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
_lsn: Lsn,
|
||||
_ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<(Key, Bytes)>> {
|
||||
let mut all_data = Vec::new();
|
||||
let guard = self.layers.read().await;
|
||||
for layer in guard.layer_map()?.iter_historic_layers() {
|
||||
if !layer.is_delta() && layer.image_layer_lsn() == lsn {
|
||||
let layer = guard.get_from_desc(&layer);
|
||||
let mut reconstruct_data = ValuesReconstructState::default();
|
||||
layer
|
||||
.get_values_reconstruct_data(
|
||||
KeySpace::single(Key::MIN..Key::MAX),
|
||||
lsn..Lsn(lsn.0 + 1),
|
||||
&mut reconstruct_data,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
for (k, v) in reconstruct_data.keys {
|
||||
all_data.push((k, v?.img.unwrap().1));
|
||||
}
|
||||
}
|
||||
}
|
||||
all_data.sort();
|
||||
Ok(all_data)
|
||||
// let mut all_data = Vec::new();
|
||||
// let guard = self.layers.read().await;
|
||||
// for layer in guard.layer_map()?.iter_historic_layers() {
|
||||
// if !layer.is_delta() && layer.image_layer_lsn() == lsn {
|
||||
// let layer = guard.get_from_desc(&layer);
|
||||
// let mut reconstruct_data = ValuesReconstructState::default();
|
||||
// layer
|
||||
// .get_values_reconstruct_data(
|
||||
// KeySpace::single(Key::MIN..Key::MAX),
|
||||
// lsn..Lsn(lsn.0 + 1),
|
||||
// &mut reconstruct_data,
|
||||
// ctx,
|
||||
// )
|
||||
// .await?;
|
||||
// for (k, v) in reconstruct_data.keys {
|
||||
// all_data.push((k, v?.img.unwrap().1));
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// all_data.sort();
|
||||
Ok(Vec::new())
|
||||
}
|
||||
|
||||
/// Get all historic layer descriptors in the layer map
|
||||
|
||||
@@ -33,6 +33,7 @@ use crate::virtual_file::{self, VirtualFile};
|
||||
pub struct BlobMeta {
|
||||
pub key: Key,
|
||||
pub lsn: Lsn,
|
||||
pub will_init: bool,
|
||||
}
|
||||
|
||||
/// Blob offsets into [`VectoredBlobsBuf::buf`]
|
||||
@@ -355,7 +356,8 @@ pub enum BlobFlag {
|
||||
/// * Iterate over the collected blobs and coalesce them into reads at the end
|
||||
pub struct VectoredReadPlanner {
|
||||
// Track all the blob offsets. Start offsets must be ordered.
|
||||
blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>,
|
||||
// Note: last bool is will_init
|
||||
blobs: BTreeMap<Key, Vec<(Lsn, u64, u64, bool)>>,
|
||||
// Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
|
||||
prev: Option<(Key, Lsn, u64, BlobFlag)>,
|
||||
|
||||
@@ -420,12 +422,12 @@ impl VectoredReadPlanner {
|
||||
match flag {
|
||||
BlobFlag::None => {
|
||||
let blobs_for_key = self.blobs.entry(key).or_default();
|
||||
blobs_for_key.push((lsn, start_offset, end_offset));
|
||||
blobs_for_key.push((lsn, start_offset, end_offset, false));
|
||||
}
|
||||
BlobFlag::ReplaceAll => {
|
||||
let blobs_for_key = self.blobs.entry(key).or_default();
|
||||
blobs_for_key.clear();
|
||||
blobs_for_key.push((lsn, start_offset, end_offset));
|
||||
blobs_for_key.push((lsn, start_offset, end_offset, true));
|
||||
}
|
||||
BlobFlag::Ignore => {}
|
||||
}
|
||||
@@ -436,11 +438,17 @@ impl VectoredReadPlanner {
|
||||
let mut reads = Vec::new();
|
||||
|
||||
for (key, blobs_for_key) in self.blobs {
|
||||
for (lsn, start_offset, end_offset) in blobs_for_key {
|
||||
for (lsn, start_offset, end_offset, will_init) in blobs_for_key {
|
||||
let extended = match &mut current_read_builder {
|
||||
Some(read_builder) => {
|
||||
read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn })
|
||||
}
|
||||
Some(read_builder) => read_builder.extend(
|
||||
start_offset,
|
||||
end_offset,
|
||||
BlobMeta {
|
||||
key,
|
||||
lsn,
|
||||
will_init,
|
||||
},
|
||||
),
|
||||
None => VectoredReadExtended::No,
|
||||
};
|
||||
|
||||
@@ -448,7 +456,11 @@ impl VectoredReadPlanner {
|
||||
let next_read_builder = VectoredReadBuilder::new(
|
||||
start_offset,
|
||||
end_offset,
|
||||
BlobMeta { key, lsn },
|
||||
BlobMeta {
|
||||
key,
|
||||
lsn,
|
||||
will_init,
|
||||
},
|
||||
self.max_read_size,
|
||||
self.mode,
|
||||
);
|
||||
@@ -593,8 +605,10 @@ impl<'a> VectoredBlobReader<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`]. It provides a streaming API for
|
||||
/// getting read blobs. It returns a batch when `handle` gets called and when the current key would just exceed the read_size and
|
||||
/// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
|
||||
///
|
||||
/// It provides a streaming API for getting read blobs. It returns a batch when
|
||||
/// `handle` gets called and when the current key would just exceed the read_size and
|
||||
/// max_cnt constraints.
|
||||
pub struct StreamingVectoredReadPlanner {
|
||||
read_builder: Option<VectoredReadBuilder>,
|
||||
@@ -663,10 +677,19 @@ impl StreamingVectoredReadPlanner {
|
||||
start_offset: u64,
|
||||
end_offset: u64,
|
||||
is_last_blob_in_read: bool,
|
||||
// destination: oneshot::Sender<Result<Bytes, std::io::Error>>,
|
||||
) -> Option<VectoredRead> {
|
||||
match &mut self.read_builder {
|
||||
Some(read_builder) => {
|
||||
let extended = read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn });
|
||||
let extended = read_builder.extend(
|
||||
start_offset,
|
||||
end_offset,
|
||||
BlobMeta {
|
||||
key,
|
||||
lsn,
|
||||
will_init: false,
|
||||
},
|
||||
);
|
||||
assert_eq!(extended, VectoredReadExtended::Yes);
|
||||
}
|
||||
None => {
|
||||
@@ -674,7 +697,11 @@ impl StreamingVectoredReadPlanner {
|
||||
Some(VectoredReadBuilder::new_streaming(
|
||||
start_offset,
|
||||
end_offset,
|
||||
BlobMeta { key, lsn },
|
||||
BlobMeta {
|
||||
key,
|
||||
lsn,
|
||||
will_init: false,
|
||||
},
|
||||
self.mode,
|
||||
))
|
||||
};
|
||||
@@ -1006,6 +1033,7 @@ mod tests {
|
||||
let meta = BlobMeta {
|
||||
key: Key::MIN,
|
||||
lsn: Lsn(0),
|
||||
will_init: false,
|
||||
};
|
||||
|
||||
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
//!
|
||||
//! VirtualFile is like a normal File, but it's not bound directly to
|
||||
//! a file descriptor. Instead, the file is opened when it's read from,
|
||||
//! a file descriptor.
|
||||
//!
|
||||
//! Instead, the file is opened when it's read from,
|
||||
//! and if too many files are open globally in the system, least-recently
|
||||
//! used ones are closed.
|
||||
//!
|
||||
|
||||
@@ -43,13 +43,12 @@ use utils::lsn::Lsn;
|
||||
use utils::sync::gate::GateError;
|
||||
use utils::sync::heavier_once_cell;
|
||||
|
||||
/// The real implementation that uses a Postgres process to
|
||||
/// perform WAL replay.
|
||||
///
|
||||
/// This is the real implementation that uses a Postgres process to
|
||||
/// perform WAL replay. Only one thread can use the process at a time,
|
||||
/// that is controlled by the Mutex. In the future, we might want to
|
||||
/// launch a pool of processes to allow concurrent replay of multiple
|
||||
/// records.
|
||||
///
|
||||
/// Only one thread can use the process at a time, that is controlled by the
|
||||
/// Mutex. In the future, we might want to launch a pool of processes to allow
|
||||
/// concurrent replay of multiple records.
|
||||
pub struct PostgresRedoManager {
|
||||
tenant_shard_id: TenantShardId,
|
||||
conf: &'static PageServerConf,
|
||||
|
||||
@@ -1038,9 +1038,12 @@ DetermineEpochStartLsn(WalProposer *wp)
|
||||
if (SkipXLogPageHeader(wp, wp->propEpochStartLsn) != wp->api.get_redo_start_lsn(wp))
|
||||
{
|
||||
/*
|
||||
* However, allow to proceed if previously elected leader was me;
|
||||
* plain restart of walproposer not intervened by concurrent
|
||||
* compute (who could generate WAL) is ok.
|
||||
* However, allow to proceed if last_log_term on the node which gave
|
||||
* the highest vote (i.e. point where we are going to start writing)
|
||||
* actually had been won by me; plain restart of walproposer not
|
||||
* intervened by concurrent compute which wrote WAL is ok.
|
||||
*
|
||||
* This avoids compute crash after manual term_bump.
|
||||
*/
|
||||
if (!((dth->n_entries >= 1) && (dth->entries[dth->n_entries - 1].term ==
|
||||
pg_atomic_read_u64(&walprop_shared->mineLastElectedTerm))))
|
||||
@@ -1442,12 +1445,17 @@ RecvAppendResponses(Safekeeper *sk)
|
||||
if (sk->appendResponse.term > wp->propTerm)
|
||||
{
|
||||
/*
|
||||
* Another compute with higher term is running. Panic to restart
|
||||
* PG as we likely need to retake basebackup. However, don't dump
|
||||
* core as this is kinda expected scenario.
|
||||
*
|
||||
* Term has changed to higher one, probably another compute is
|
||||
* running. If this is the case we could PANIC as well because
|
||||
* likely it inserted some data and our basebackup is unsuitable
|
||||
* anymore. However, we also bump term manually (term_bump endpoint)
|
||||
* on safekeepers for migration purposes, in this case we do want
|
||||
* compute to stay alive. So restart walproposer with FATAL instead
|
||||
* of panicking; if basebackup is spoiled next election will notice
|
||||
* this.
|
||||
*/
|
||||
disable_core_dump();
|
||||
wp_log(PANIC, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT ", meaning another compute is running at the same time, and it conflicts with us",
|
||||
wp_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT ", meaning another compute is running at the same time, and it conflicts with us",
|
||||
sk->host, sk->port,
|
||||
sk->appendResponse.term, wp->propTerm);
|
||||
}
|
||||
|
||||
2
proxy/src/cache/timed_lru.rs
vendored
2
proxy/src/cache/timed_lru.rs
vendored
@@ -16,7 +16,7 @@ use tracing::debug;
|
||||
// On the other hand, `hashlink` has good download stats and appears to be maintained.
|
||||
use hashlink::{linked_hash_map::RawEntryMut, LruCache};
|
||||
|
||||
use super::{common::Cached, *};
|
||||
use super::{common::Cached, timed_lru, Cache};
|
||||
|
||||
/// An implementation of timed LRU cache with fixed capacity.
|
||||
/// Key properties:
|
||||
|
||||
@@ -44,16 +44,14 @@
|
||||
clippy::items_after_statements,
|
||||
)]
|
||||
// List of temporarily allowed lints.
|
||||
// TODO: Switch to except() once stable with 1.81.
|
||||
// TODO: fix code and reduce list or move to permanent list above.
|
||||
#![allow(
|
||||
#![expect(
|
||||
clippy::cargo_common_metadata,
|
||||
clippy::cast_possible_truncation,
|
||||
clippy::cast_possible_wrap,
|
||||
clippy::cast_precision_loss,
|
||||
clippy::cast_sign_loss,
|
||||
clippy::doc_markdown,
|
||||
clippy::implicit_hasher,
|
||||
clippy::inline_always,
|
||||
clippy::match_same_arms,
|
||||
clippy::match_wild_err_arm,
|
||||
@@ -61,21 +59,28 @@
|
||||
clippy::missing_panics_doc,
|
||||
clippy::module_name_repetitions,
|
||||
clippy::needless_pass_by_value,
|
||||
clippy::needless_raw_string_hashes,
|
||||
clippy::redundant_closure_for_method_calls,
|
||||
clippy::return_self_not_must_use,
|
||||
clippy::similar_names,
|
||||
clippy::single_match_else,
|
||||
clippy::struct_excessive_bools,
|
||||
clippy::struct_field_names,
|
||||
clippy::too_many_lines,
|
||||
clippy::unreadable_literal,
|
||||
clippy::unused_async,
|
||||
clippy::unused_self,
|
||||
clippy::wildcard_imports
|
||||
clippy::unused_self
|
||||
)]
|
||||
#![cfg_attr(
|
||||
any(test, feature = "testing"),
|
||||
allow(
|
||||
clippy::needless_raw_string_hashes,
|
||||
clippy::unreadable_literal,
|
||||
clippy::unused_async,
|
||||
)
|
||||
)]
|
||||
// List of temporarily allowed lints to unblock beta/nightly.
|
||||
#![allow(unknown_lints, clippy::manual_inspect)]
|
||||
#![allow(
|
||||
unknown_lints,
|
||||
// TODO: 1.82: Add `use<T>` where necessary and remove from this list.
|
||||
impl_trait_overcaptures,
|
||||
)]
|
||||
|
||||
use std::{convert::Infallible, future::Future};
|
||||
|
||||
|
||||
@@ -217,6 +217,7 @@ impl sasl::Mechanism for Exchange<'_> {
|
||||
self.state = ExchangeState::SaltSent(sent);
|
||||
Ok(Step::Continue(self, msg))
|
||||
}
|
||||
#[allow(unreachable_patterns)] // TODO: 1.82: simply drop this match
|
||||
Step::Success(x, _) => match x {},
|
||||
Step::Failure(msg) => Ok(Step::Failure(msg)),
|
||||
}
|
||||
@@ -224,6 +225,7 @@ impl sasl::Mechanism for Exchange<'_> {
|
||||
ExchangeState::SaltSent(sent) => {
|
||||
match sent.transition(self.secret, &self.tls_server_end_point, input)? {
|
||||
Step::Success(keys, msg) => Ok(Step::Success(keys, msg)),
|
||||
#[allow(unreachable_patterns)] // TODO: 1.82: simply drop this match
|
||||
Step::Continue(x, _) => match x {},
|
||||
Step::Failure(msg) => Ok(Step::Failure(msg)),
|
||||
}
|
||||
|
||||
@@ -745,22 +745,20 @@ impl BatchQueryData {
|
||||
builder = builder.deferrable(true);
|
||||
}
|
||||
|
||||
let transaction = builder.start().await.map_err(|e| {
|
||||
let transaction = builder.start().await.inspect_err(|_| {
|
||||
// if we cannot start a transaction, we should return immediately
|
||||
// and not return to the pool. connection is clearly broken
|
||||
discard.discard();
|
||||
e
|
||||
})?;
|
||||
|
||||
let json_output =
|
||||
match query_batch(cancel.child_token(), &transaction, self, parsed_headers).await {
|
||||
Ok(json_output) => {
|
||||
info!("commit");
|
||||
let status = transaction.commit().await.map_err(|e| {
|
||||
let status = transaction.commit().await.inspect_err(|_| {
|
||||
// if we cannot commit - for now don't return connection to pool
|
||||
// TODO: get a query status from the error
|
||||
discard.discard();
|
||||
e
|
||||
})?;
|
||||
discard.check_idle(status);
|
||||
json_output
|
||||
@@ -776,11 +774,10 @@ impl BatchQueryData {
|
||||
}
|
||||
Err(err) => {
|
||||
info!("rollback");
|
||||
let status = transaction.rollback().await.map_err(|e| {
|
||||
let status = transaction.rollback().await.inspect_err(|_| {
|
||||
// if we cannot rollback - for now don't return connection to pool
|
||||
// TODO: get a query status from the error
|
||||
discard.discard();
|
||||
e
|
||||
})?;
|
||||
discard.check_idle(status);
|
||||
return Err(err);
|
||||
|
||||
@@ -14,6 +14,7 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||
use tokio_rustls::server::TlsStream;
|
||||
|
||||
/// Stream wrapper which implements libpq's protocol.
|
||||
///
|
||||
/// NOTE: This object deliberately doesn't implement [`AsyncRead`]
|
||||
/// or [`AsyncWrite`] to prevent subtle errors (e.g. trying
|
||||
/// to pass random malformed bytes through the connection).
|
||||
|
||||
@@ -3,5 +3,5 @@ channel = "1.81.0"
|
||||
profile = "default"
|
||||
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
|
||||
# https://rust-lang.github.io/rustup/concepts/profiles.html
|
||||
# but we also need `llvm-tools-preview` for coverage data merges on CI
|
||||
components = ["llvm-tools-preview", "rustfmt", "clippy"]
|
||||
# but we also need `llvm-tools` for coverage data merges on CI
|
||||
components = ["llvm-tools", "rustfmt", "clippy"]
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
use utils::auth::{AuthError, Claims, Scope};
|
||||
use utils::id::TenantId;
|
||||
|
||||
/// If tenant_id is provided, allow if token (claims) is for this tenant or
|
||||
/// whole safekeeper scope (SafekeeperData). Else, allow only if token is
|
||||
/// SafekeeperData.
|
||||
pub fn check_permission(claims: &Claims, tenant_id: Option<TenantId>) -> Result<(), AuthError> {
|
||||
match (&claims.scope, tenant_id) {
|
||||
(Scope::Tenant, None) => Err(AuthError(
|
||||
|
||||
@@ -19,7 +19,7 @@ use std::fs::{self, File};
|
||||
use std::io::{ErrorKind, Write};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
use storage_broker::Uri;
|
||||
|
||||
use tracing::*;
|
||||
@@ -261,6 +261,15 @@ async fn main() -> anyhow::Result<()> {
|
||||
// Change into the data directory.
|
||||
std::env::set_current_dir(&workdir)?;
|
||||
|
||||
// Prevent running multiple safekeepers on the same directory
|
||||
let lock_file_path = workdir.join(PID_FILE_NAME);
|
||||
let lock_file =
|
||||
pid_file::claim_for_current_process(&lock_file_path).context("claim pid file")?;
|
||||
info!("claimed pid file at {lock_file_path:?}");
|
||||
// ensure that the lock file is held even if the main thread of the process is panics
|
||||
// we need to release the lock file only when the current process is gone
|
||||
std::mem::forget(lock_file);
|
||||
|
||||
// Set or read our ID.
|
||||
let id = set_id(&workdir, args.id.map(NodeId))?;
|
||||
if args.init {
|
||||
@@ -364,15 +373,15 @@ async fn main() -> anyhow::Result<()> {
|
||||
type JoinTaskRes = Result<anyhow::Result<()>, JoinError>;
|
||||
|
||||
async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
// Prevent running multiple safekeepers on the same directory
|
||||
let lock_file_path = conf.workdir.join(PID_FILE_NAME);
|
||||
let lock_file =
|
||||
pid_file::claim_for_current_process(&lock_file_path).context("claim pid file")?;
|
||||
info!("claimed pid file at {lock_file_path:?}");
|
||||
|
||||
// ensure that the lock file is held even if the main thread of the process is panics
|
||||
// we need to release the lock file only when the current process is gone
|
||||
std::mem::forget(lock_file);
|
||||
// fsync the datadir to make sure we have a consistent state on disk.
|
||||
let dfd = File::open(&conf.workdir).context("open datadir for syncfs")?;
|
||||
let started = Instant::now();
|
||||
utils::crashsafe::syncfs(dfd)?;
|
||||
let elapsed = started.elapsed();
|
||||
info!(
|
||||
elapsed_ms = elapsed.as_millis(),
|
||||
"syncfs data directory done"
|
||||
);
|
||||
|
||||
info!("starting safekeeper WAL service on {}", conf.listen_pg_addr);
|
||||
let pg_listener = tcp_listener::bind(conf.listen_pg_addr.clone()).map_err(|e| {
|
||||
|
||||
@@ -18,8 +18,8 @@ use utils::http::endpoint::{prometheus_metrics_handler, request_span, ChannelWri
|
||||
use utils::http::request::parse_query_param;
|
||||
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use safekeeper_api::models::TimelineCreateRequest;
|
||||
use safekeeper_api::models::{SkTimelineInfo, TimelineCopyRequest};
|
||||
use safekeeper_api::models::{TimelineCreateRequest, TimelineTermBumpRequest};
|
||||
use utils::{
|
||||
auth::SwappableJwtAuth,
|
||||
http::{
|
||||
@@ -408,6 +408,28 @@ async fn timeline_backup_partial_reset(request: Request<Body>) -> Result<Respons
|
||||
json_response(StatusCode::OK, response)
|
||||
}
|
||||
|
||||
/// Make term at least as high as one in request. If one in request is None,
|
||||
/// increment current one.
|
||||
async fn timeline_term_bump_handler(
|
||||
mut request: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let ttid = TenantTimelineId::new(
|
||||
parse_request_param(&request, "tenant_id")?,
|
||||
parse_request_param(&request, "timeline_id")?,
|
||||
);
|
||||
check_permission(&request, Some(ttid.tenant_id))?;
|
||||
|
||||
let request_data: TimelineTermBumpRequest = json_request(&mut request).await?;
|
||||
|
||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
||||
let response = tli
|
||||
.term_bump(request_data.term)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, response)
|
||||
}
|
||||
|
||||
/// Used only in tests to hand craft required data.
|
||||
async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let ttid = TenantTimelineId::new(
|
||||
@@ -630,6 +652,10 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/backup_partial_reset",
|
||||
|r| request_span(r, timeline_backup_partial_reset),
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/term_bump",
|
||||
|r| request_span(r, timeline_term_bump_handler),
|
||||
)
|
||||
.post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
|
||||
request_span(r, record_safekeeper_info)
|
||||
})
|
||||
|
||||
@@ -484,6 +484,7 @@ pub async fn validate_temp_timeline(
|
||||
}
|
||||
|
||||
/// Move timeline from a temp directory to the main storage, and load it to the global map.
|
||||
///
|
||||
/// This operation is done under a lock to prevent bugs if several concurrent requests are
|
||||
/// trying to load the same timeline. Note that it doesn't guard against creating the
|
||||
/// timeline with the same ttid, but no one should be doing this anyway.
|
||||
|
||||
@@ -448,8 +448,10 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
|
||||
|
||||
/// Encapsulates a task which takes messages from msg_rx, processes and pushes
|
||||
/// replies to reply_tx; reading from socket and writing to disk in parallel is
|
||||
/// beneficial for performance, this struct provides writing to disk part.
|
||||
/// replies to reply_tx.
|
||||
///
|
||||
/// Reading from socket and writing to disk in parallel is beneficial for
|
||||
/// performance, this struct provides the writing to disk part.
|
||||
pub struct WalAcceptor {
|
||||
tli: WalResidentTimeline,
|
||||
msg_rx: Receiver<ProposerAcceptorMessage>,
|
||||
|
||||
@@ -938,8 +938,9 @@ where
|
||||
}
|
||||
|
||||
trace!(
|
||||
"processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
|
||||
"processed AppendRequest of len {}, begin_lsn={}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
|
||||
msg.wal_data.len(),
|
||||
msg.h.begin_lsn,
|
||||
msg.h.end_lsn,
|
||||
msg.h.commit_lsn,
|
||||
msg.h.truncate_lsn,
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
//! Defines per timeline data stored persistently (SafeKeeperPersistentState)
|
||||
//! and its wrapper with in memory layer (SafekeeperState).
|
||||
|
||||
use std::ops::Deref;
|
||||
use std::{cmp::max, ops::Deref};
|
||||
|
||||
use anyhow::Result;
|
||||
use safekeeper_api::models::TimelineTermBumpResponse;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::{
|
||||
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
|
||||
@@ -12,7 +13,7 @@ use utils::{
|
||||
|
||||
use crate::{
|
||||
control_file,
|
||||
safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, TermHistory},
|
||||
safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, Term, TermHistory},
|
||||
wal_backup_partial::{self},
|
||||
};
|
||||
|
||||
@@ -147,9 +148,11 @@ pub struct TimelineMemState {
|
||||
pub proposer_uuid: PgUuid,
|
||||
}
|
||||
|
||||
/// Safekeeper persistent state plus in memory layer, to avoid frequent fsyncs
|
||||
/// when we update fields like commit_lsn which don't need immediate
|
||||
/// persistence. Provides transactional like API to atomically update the state.
|
||||
/// Safekeeper persistent state plus in memory layer.
|
||||
///
|
||||
/// Allows us to avoid frequent fsyncs when we update fields like commit_lsn
|
||||
/// which don't need immediate persistence. Provides transactional like API
|
||||
/// to atomically update the state.
|
||||
///
|
||||
/// Implements Deref into *persistent* part.
|
||||
pub struct TimelineState<CTRL: control_file::Storage> {
|
||||
@@ -209,6 +212,27 @@ where
|
||||
let s = self.start_change();
|
||||
self.finish_change(&s).await
|
||||
}
|
||||
|
||||
/// Make term at least as `to`. If `to` is None, increment current one. This
|
||||
/// is not in safekeeper.rs because we want to be able to do it even if
|
||||
/// timeline is offloaded.
|
||||
pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
|
||||
let before = self.acceptor_state.term;
|
||||
let mut state = self.start_change();
|
||||
let new = match to {
|
||||
Some(to) => max(state.acceptor_state.term, to),
|
||||
None => state.acceptor_state.term + 1,
|
||||
};
|
||||
if new > state.acceptor_state.term {
|
||||
state.acceptor_state.term = new;
|
||||
self.finish_change(&state).await?;
|
||||
}
|
||||
let after = self.acceptor_state.term;
|
||||
Ok(TimelineTermBumpResponse {
|
||||
previous_term: before,
|
||||
current_term: after,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<CTRL> Deref for TimelineState<CTRL>
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use camino::Utf8PathBuf;
|
||||
use remote_storage::RemotePath;
|
||||
use safekeeper_api::models::TimelineTermBumpResponse;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::fs::{self};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -169,6 +170,7 @@ impl<'a> Drop for WriteGuardSharedState<'a> {
|
||||
}
|
||||
|
||||
/// This structure is stored in shared state and represents the state of the timeline.
|
||||
///
|
||||
/// Usually it holds SafeKeeper, but it also supports offloaded timeline state. In this
|
||||
/// case, SafeKeeper is not available (because WAL is not present on disk) and all
|
||||
/// operations can be done only with control file.
|
||||
@@ -214,6 +216,10 @@ impl StateSK {
|
||||
.get_last_log_term(self.flush_lsn())
|
||||
}
|
||||
|
||||
pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
|
||||
self.state_mut().term_bump(to).await
|
||||
}
|
||||
|
||||
/// Close open WAL files to release FDs.
|
||||
fn close_wal_store(&mut self) {
|
||||
if let StateSK::Loaded(sk) = self {
|
||||
@@ -853,6 +859,11 @@ impl Timeline {
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
pub async fn term_bump(self: &Arc<Self>, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
|
||||
let mut state = self.write_shared_state().await;
|
||||
state.sk.term_bump(to).await
|
||||
}
|
||||
|
||||
/// Get the timeline guard for reading/writing WAL files.
|
||||
/// If WAL files are not present on disk (evicted), they will be automatically
|
||||
/// downloaded from remote storage. This is done in the manager task, which is
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
//! Code related to evicting WAL files to remote storage. The actual upload is done by the
|
||||
//! partial WAL backup code. This file has code to delete and re-download WAL files,
|
||||
//! cross-validate with partial WAL backup if local file is still present.
|
||||
//! Code related to evicting WAL files to remote storage.
|
||||
//!
|
||||
//! The actual upload is done by the partial WAL backup code. This file has
|
||||
//! code to delete and re-download WAL files, cross-validate with partial WAL
|
||||
//! backup if local file is still present.
|
||||
|
||||
use anyhow::Context;
|
||||
use camino::Utf8PathBuf;
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
//! Timeline residence guard is needed to ensure that WAL segments are present on disk,
|
||||
//! Timeline residence guard
|
||||
//!
|
||||
//! It is needed to ensure that WAL segments are present on disk,
|
||||
//! as long as the code is holding the guard. This file implements guard logic, to issue
|
||||
//! and drop guards, and to notify the manager when the guard is dropped.
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
//! The timeline manager task is responsible for managing the timeline's background tasks.
|
||||
//!
|
||||
//! It is spawned alongside each timeline and exits when the timeline is deleted.
|
||||
//! It watches for changes in the timeline state and decides when to spawn or kill background tasks.
|
||||
//! It also can manage some reactive state, like should the timeline be active for broker pushes or not.
|
||||
|
||||
@@ -60,7 +60,8 @@ impl TimelinesSet {
|
||||
}
|
||||
}
|
||||
|
||||
/// Guard is used to add or remove timeline from the set.
|
||||
/// Guard is used to add or remove timelines from the set.
|
||||
///
|
||||
/// If the timeline present in set, it will be removed from it on drop.
|
||||
/// Note: do not use more than one guard for the same timeline, it caches the presence state.
|
||||
/// It is designed to be used in the manager task only.
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
//! Safekeeper timeline has a background task which is subscribed to `commit_lsn`
|
||||
//! and `flush_lsn` updates. After the partial segment was updated (`flush_lsn`
|
||||
//! was changed), the segment will be uploaded to S3 in about 15 minutes.
|
||||
//! and `flush_lsn` updates.
|
||||
//!
|
||||
//! After the partial segment was updated (`flush_lsn` was changed), the segment
|
||||
//! will be uploaded to S3 within the configured `partial_backup_timeout`.
|
||||
//!
|
||||
//! The filename format for partial segments is
|
||||
//! `Segment_Term_Flush_Commit_skNN.partial`, where:
|
||||
|
||||
@@ -17,6 +17,7 @@ use crate::SafeKeeperConf;
|
||||
use postgres_backend::{AuthType, PostgresBackend};
|
||||
|
||||
/// Accept incoming TCP connections and spawn them into a background thread.
|
||||
///
|
||||
/// allowed_auth_scope is either SafekeeperData (wide JWT tokens giving access
|
||||
/// to any tenant are allowed) or Tenant (only tokens giving access to specific
|
||||
/// tenant are allowed). Doesn't matter if auth is disabled in conf.
|
||||
|
||||
@@ -98,7 +98,19 @@ pub struct PhysicalStorage {
|
||||
/// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
|
||||
write_lsn: Lsn,
|
||||
|
||||
/// The LSN of the last WAL record written to disk. Still can be not fully flushed.
|
||||
/// The LSN of the last WAL record written to disk. Still can be not fully
|
||||
/// flushed.
|
||||
///
|
||||
/// Note: Normally it (and flush_record_lsn) is <= write_lsn, but after xlog
|
||||
/// switch ingest the reverse is true because we don't bump write_lsn up to
|
||||
/// the next segment: WAL stream from the compute doesn't have the gap and
|
||||
/// for simplicity / as a sanity check we disallow any non-sequential
|
||||
/// writes, so write zeros as is.
|
||||
///
|
||||
/// Similar effect is in theory possible due to LSN alignment: if record
|
||||
/// ends at *2, decoder will report end lsn as *8 even though we haven't
|
||||
/// written these zeros yet. In practice compute likely never sends
|
||||
/// non-aligned chunks of data.
|
||||
write_record_lsn: Lsn,
|
||||
|
||||
/// The LSN of the last WAL record flushed to disk.
|
||||
@@ -167,8 +179,7 @@ impl PhysicalStorage {
|
||||
)
|
||||
};
|
||||
|
||||
// TODO: do we really know that write_lsn is fully flushed to disk?
|
||||
// If not, maybe it's better to call fsync() here to be sure?
|
||||
// note: this assumes we fsync'ed whole datadir on start.
|
||||
let flush_lsn = write_lsn;
|
||||
|
||||
debug!(
|
||||
@@ -440,11 +451,12 @@ impl Storage for PhysicalStorage {
|
||||
.with_label_values(&["truncate_wal"])
|
||||
.start_timer();
|
||||
|
||||
// Streaming must not create a hole, so truncate cannot be called on non-written lsn
|
||||
if self.write_lsn != Lsn(0) && end_pos > self.write_lsn {
|
||||
// Streaming must not create a hole, so truncate cannot be called on
|
||||
// non-written lsn.
|
||||
if self.write_record_lsn != Lsn(0) && end_pos > self.write_record_lsn {
|
||||
bail!(
|
||||
"truncate_wal called on non-written WAL, write_lsn={}, end_pos={}",
|
||||
self.write_lsn,
|
||||
"truncate_wal called on non-written WAL, write_record_lsn={}, end_pos={}",
|
||||
self.write_record_lsn,
|
||||
end_pos
|
||||
);
|
||||
}
|
||||
|
||||
@@ -134,7 +134,7 @@ class LLVM:
|
||||
# Show a user-friendly warning
|
||||
raise Exception(' '.join([
|
||||
f"It appears that you don't have `{name}` installed.",
|
||||
"Please execute `rustup component add llvm-tools-preview`,",
|
||||
"Please execute `rustup component add llvm-tools`,",
|
||||
"or install it via your package manager of choice.",
|
||||
"LLVM tools should be the same version as LLVM in `rustc --version --verbose`.",
|
||||
]))
|
||||
@@ -518,7 +518,7 @@ def main() -> None:
|
||||
example = f"""
|
||||
prerequisites:
|
||||
# alternatively, install a system package for `llvm-tools`
|
||||
rustup component add llvm-tools-preview
|
||||
rustup component add llvm-tools
|
||||
|
||||
self-contained example:
|
||||
{app} run make
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
ALTER TABLE tenant_shards DROP preferred_az_id;
|
||||
@@ -0,0 +1 @@
|
||||
ALTER TABLE tenant_shards ADD preferred_az_id VARCHAR;
|
||||
@@ -14,14 +14,14 @@ use metrics::{BuildInfo, NeonMetrics};
|
||||
use pageserver_api::controller_api::{
|
||||
MetadataHealthListOutdatedRequest, MetadataHealthListOutdatedResponse,
|
||||
MetadataHealthListUnhealthyResponse, MetadataHealthUpdateRequest, MetadataHealthUpdateResponse,
|
||||
TenantCreateRequest,
|
||||
ShardsPreferredAzsRequest, TenantCreateRequest,
|
||||
};
|
||||
use pageserver_api::models::{
|
||||
TenantConfigRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
|
||||
TenantTimeTravelRequest, TimelineArchivalConfigRequest, TimelineCreateRequest,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::mgmt_api;
|
||||
use pageserver_client::{mgmt_api, BlockUnblock};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -369,6 +369,23 @@ async fn handle_tenant_timeline_detach_ancestor(
|
||||
json_response(StatusCode::OK, res)
|
||||
}
|
||||
|
||||
async fn handle_tenant_timeline_block_unblock_gc(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
dir: BlockUnblock,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
||||
check_permissions(&req, Scope::PageServerApi)?;
|
||||
|
||||
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
|
||||
|
||||
service
|
||||
.tenant_timeline_block_unblock_gc(tenant_id, timeline_id, dir)
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn handle_tenant_timeline_passthrough(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
@@ -539,6 +556,17 @@ async fn handle_node_status(req: Request<Body>) -> Result<Response<Body>, ApiErr
|
||||
json_response(StatusCode::OK, node_status)
|
||||
}
|
||||
|
||||
async fn handle_node_shards(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let state = get_state(&req);
|
||||
let node_id: NodeId = parse_request_param(&req, "node_id")?;
|
||||
|
||||
let node_status = state.service.get_node_shards(node_id).await?;
|
||||
|
||||
json_response(StatusCode::OK, node_status)
|
||||
}
|
||||
|
||||
async fn handle_get_leader(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
@@ -688,6 +716,18 @@ async fn handle_tenant_update_policy(mut req: Request<Body>) -> Result<Response<
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_update_preferred_azs(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let azs_req = json_request::<ShardsPreferredAzsRequest>(&mut req).await?;
|
||||
let state = get_state(&req);
|
||||
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
state.service.update_shards_preferred_azs(azs_req).await?,
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_step_down(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
@@ -1097,6 +1137,13 @@ pub fn make_router(
|
||||
.get("/control/v1/node/:node_id", |r| {
|
||||
named_request_span(r, handle_node_status, RequestName("control_v1_node_status"))
|
||||
})
|
||||
.get("/control/v1/node/:node_id/shards", |r| {
|
||||
named_request_span(
|
||||
r,
|
||||
handle_node_shards,
|
||||
RequestName("control_v1_node_describe"),
|
||||
)
|
||||
})
|
||||
.get("/control/v1/leader", |r| {
|
||||
named_request_span(r, handle_get_leader, RequestName("control_v1_get_leader"))
|
||||
})
|
||||
@@ -1174,6 +1221,13 @@ pub fn make_router(
|
||||
RequestName("control_v1_tenant_policy"),
|
||||
)
|
||||
})
|
||||
.put("/control/v1/preferred_azs", |r| {
|
||||
named_request_span(
|
||||
r,
|
||||
handle_update_preferred_azs,
|
||||
RequestName("control_v1_preferred_azs"),
|
||||
)
|
||||
})
|
||||
.put("/control/v1/step_down", |r| {
|
||||
named_request_span(r, handle_step_down, RequestName("control_v1_step_down"))
|
||||
})
|
||||
@@ -1255,6 +1309,26 @@ pub fn make_router(
|
||||
)
|
||||
},
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/block_gc",
|
||||
|r| {
|
||||
tenant_service_handler(
|
||||
r,
|
||||
|s, r| handle_tenant_timeline_block_unblock_gc(s, r, BlockUnblock::Block),
|
||||
RequestName("v1_tenant_timeline_block_unblock_gc"),
|
||||
)
|
||||
},
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/unblock_gc",
|
||||
|r| {
|
||||
tenant_service_handler(
|
||||
r,
|
||||
|s, r| handle_tenant_timeline_block_unblock_gc(s, r, BlockUnblock::Unblock),
|
||||
RequestName("v1_tenant_timeline_block_unblock_gc"),
|
||||
)
|
||||
},
|
||||
)
|
||||
// Tenant detail GET passthrough to shard zero:
|
||||
.get("/v1/tenant/:tenant_id", |r| {
|
||||
tenant_service_handler(
|
||||
|
||||
@@ -7,7 +7,10 @@ use pageserver_api::{
|
||||
},
|
||||
shard::TenantShardId,
|
||||
};
|
||||
use pageserver_client::mgmt_api::{Client, Result};
|
||||
use pageserver_client::{
|
||||
mgmt_api::{Client, Result},
|
||||
BlockUnblock,
|
||||
};
|
||||
use reqwest::StatusCode;
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
|
||||
@@ -258,6 +261,24 @@ impl PageserverClient {
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn timeline_block_unblock_gc(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
dir: BlockUnblock,
|
||||
) -> Result<()> {
|
||||
// measuring these makes no sense because we synchronize with the gc loop and remote
|
||||
// storage on block_gc so there should be huge outliers
|
||||
measured_request!(
|
||||
"timeline_block_unblock_gc",
|
||||
crate::metrics::Method::Post,
|
||||
&self.node_id_label,
|
||||
self.inner
|
||||
.timeline_block_unblock_gc(tenant_shard_id, timeline_id, dir)
|
||||
.await
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_utilization(&self) -> Result<PageserverUtilization> {
|
||||
measured_request!(
|
||||
"utilization",
|
||||
|
||||
@@ -105,6 +105,7 @@ pub(crate) enum DatabaseOperation {
|
||||
ListMetadataHealthOutdated,
|
||||
GetLeader,
|
||||
UpdateLeader,
|
||||
SetPreferredAzs,
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
@@ -664,6 +665,33 @@ impl Persistence {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn set_tenant_shard_preferred_azs(
|
||||
&self,
|
||||
preferred_azs: Vec<(TenantShardId, String)>,
|
||||
) -> DatabaseResult<Vec<(TenantShardId, String)>> {
|
||||
use crate::schema::tenant_shards::dsl::*;
|
||||
|
||||
self.with_measured_conn(DatabaseOperation::SetPreferredAzs, move |conn| {
|
||||
let mut shards_updated = Vec::default();
|
||||
|
||||
for (tenant_shard_id, preferred_az) in preferred_azs.iter() {
|
||||
let updated = diesel::update(tenant_shards)
|
||||
.filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
|
||||
.filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
|
||||
.filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
|
||||
.set(preferred_az_id.eq(preferred_az))
|
||||
.execute(conn)?;
|
||||
|
||||
if updated == 1 {
|
||||
shards_updated.push((*tenant_shard_id, preferred_az.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(shards_updated)
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
|
||||
use crate::schema::tenant_shards::dsl::*;
|
||||
self.with_measured_conn(DatabaseOperation::Detach, move |conn| {
|
||||
@@ -1050,6 +1078,11 @@ pub(crate) struct TenantShardPersistence {
|
||||
pub(crate) config: String,
|
||||
#[serde(default)]
|
||||
pub(crate) scheduling_policy: String,
|
||||
|
||||
// Hint that we should attempt to schedule this tenant shard the given
|
||||
// availability zone in order to minimise the chances of cross-AZ communication
|
||||
// with compute.
|
||||
pub(crate) preferred_az_id: Option<String>,
|
||||
}
|
||||
|
||||
impl TenantShardPersistence {
|
||||
|
||||
@@ -41,6 +41,7 @@ diesel::table! {
|
||||
splitting -> Int2,
|
||||
config -> Text,
|
||||
scheduling_policy -> Varchar,
|
||||
preferred_az_id -> Nullable<Varchar>,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ use crate::{
|
||||
ShardGenerationState, TenantFilter,
|
||||
},
|
||||
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
|
||||
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
|
||||
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
|
||||
tenant_shard::{
|
||||
MigrateAttachment, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization,
|
||||
ScheduleOptimizationAction,
|
||||
@@ -41,7 +41,8 @@ use itertools::Itertools;
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability, NodeRegisterRequest,
|
||||
NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy, TenantCreateRequest,
|
||||
NodeSchedulingPolicy, NodeShard, NodeShardResponse, PlacementPolicy, ShardSchedulingPolicy,
|
||||
ShardsPreferredAzsRequest, ShardsPreferredAzsResponse, TenantCreateRequest,
|
||||
TenantCreateResponse, TenantCreateResponseShard, TenantDescribeResponse,
|
||||
TenantDescribeResponseShard, TenantLocateResponse, TenantPolicyRequest,
|
||||
TenantShardMigrateRequest, TenantShardMigrateResponse,
|
||||
@@ -68,7 +69,7 @@ use pageserver_api::{
|
||||
ValidateResponse, ValidateResponseTenant,
|
||||
},
|
||||
};
|
||||
use pageserver_client::mgmt_api;
|
||||
use pageserver_client::{mgmt_api, BlockUnblock};
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::{
|
||||
@@ -116,7 +117,9 @@ pub(crate) const STARTUP_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
pub const MAX_OFFLINE_INTERVAL_DEFAULT: Duration = Duration::from_secs(30);
|
||||
|
||||
/// How long a node may be unresponsive to heartbeats during start up before we declare it
|
||||
/// offline. This is much more lenient than [`MAX_OFFLINE_INTERVAL_DEFAULT`] since the pageserver's
|
||||
/// offline.
|
||||
///
|
||||
/// This is much more lenient than [`MAX_OFFLINE_INTERVAL_DEFAULT`] since the pageserver's
|
||||
/// handling of the re-attach response may take a long time and blocks heartbeats from
|
||||
/// being handled on the pageserver side.
|
||||
pub const MAX_WARMING_UP_INTERVAL_DEFAULT: Duration = Duration::from_secs(300);
|
||||
@@ -139,6 +142,7 @@ enum TenantOperations {
|
||||
AttachHook,
|
||||
TimelineArchivalConfig,
|
||||
TimelineDetachAncestor,
|
||||
TimelineGcBlockUnblock,
|
||||
}
|
||||
|
||||
#[derive(Clone, strum_macros::Display)]
|
||||
@@ -353,6 +357,12 @@ impl From<DatabaseError> for ApiError {
|
||||
}
|
||||
}
|
||||
|
||||
enum InitialShardScheduleOutcome {
|
||||
Scheduled(TenantCreateResponseShard),
|
||||
NotScheduled,
|
||||
ShardScheduleError(ScheduleError),
|
||||
}
|
||||
|
||||
pub struct Service {
|
||||
inner: Arc<std::sync::RwLock<ServiceState>>,
|
||||
config: Config,
|
||||
@@ -442,7 +452,7 @@ struct ShardSplitParams {
|
||||
// When preparing for a shard split, we may either choose to proceed with the split,
|
||||
// or find that the work is already done and return NoOp.
|
||||
enum ShardSplitAction {
|
||||
Split(ShardSplitParams),
|
||||
Split(Box<ShardSplitParams>),
|
||||
NoOp(TenantShardSplitResponse),
|
||||
}
|
||||
|
||||
@@ -1452,6 +1462,7 @@ impl Service {
|
||||
splitting: SplitState::default(),
|
||||
scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
|
||||
.unwrap(),
|
||||
preferred_az_id: None,
|
||||
};
|
||||
|
||||
match self.persistence.insert_tenant_shards(vec![tsp]).await {
|
||||
@@ -2023,6 +2034,7 @@ impl Service {
|
||||
splitting: SplitState::default(),
|
||||
scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
|
||||
.unwrap(),
|
||||
preferred_az_id: None,
|
||||
})
|
||||
.collect();
|
||||
|
||||
@@ -2046,99 +2058,87 @@ impl Service {
|
||||
};
|
||||
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
let mut schedule_error = None;
|
||||
let mut response_shards = Vec::new();
|
||||
for tenant_shard_id in create_ids {
|
||||
tracing::info!("Creating shard {tenant_shard_id}...");
|
||||
|
||||
let (waiters, response_shards) = {
|
||||
let outcome = self
|
||||
.do_initial_shard_scheduling(
|
||||
tenant_shard_id,
|
||||
initial_generation,
|
||||
&create_req.shard_parameters,
|
||||
create_req.config.clone(),
|
||||
placement_policy.clone(),
|
||||
&mut schedule_context,
|
||||
)
|
||||
.await;
|
||||
|
||||
match outcome {
|
||||
InitialShardScheduleOutcome::Scheduled(resp) => response_shards.push(resp),
|
||||
InitialShardScheduleOutcome::NotScheduled => {}
|
||||
InitialShardScheduleOutcome::ShardScheduleError(err) => {
|
||||
schedule_error = Some(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let preferred_azs = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
response_shards
|
||||
.iter()
|
||||
.filter_map(|resp| {
|
||||
let az_id = locked
|
||||
.nodes
|
||||
.get(&resp.node_id)
|
||||
.map(|n| n.get_availability_zone_id().to_string())?;
|
||||
|
||||
Some((resp.shard_id, az_id))
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
// Note that we persist the preferred AZ for the new shards separately.
|
||||
// In theory, we could "peek" the scheduler to determine where the shard will
|
||||
// land, but the subsequent "real" call into the scheduler might select a different
|
||||
// node. Hence, we do this awkward update to keep things consistent.
|
||||
let updated = self
|
||||
.persistence
|
||||
.set_tenant_shard_preferred_azs(preferred_azs)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Failed to persist preferred az ids: {err}"
|
||||
))
|
||||
})?;
|
||||
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
let mut response_shards = Vec::new();
|
||||
let mut schcedule_error = None;
|
||||
|
||||
for tenant_shard_id in create_ids {
|
||||
tracing::info!("Creating shard {tenant_shard_id}...");
|
||||
|
||||
use std::collections::btree_map::Entry;
|
||||
match tenants.entry(tenant_shard_id) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
tracing::info!(
|
||||
"Tenant shard {tenant_shard_id} already exists while creating"
|
||||
);
|
||||
|
||||
// TODO: schedule() should take an anti-affinity expression that pushes
|
||||
// attached and secondary locations (independently) away frorm those
|
||||
// pageservers also holding a shard for this tenant.
|
||||
|
||||
entry
|
||||
.get_mut()
|
||||
.schedule(scheduler, &mut schedule_context)
|
||||
.map_err(|e| {
|
||||
ApiError::Conflict(format!(
|
||||
"Failed to schedule shard {tenant_shard_id}: {e}"
|
||||
))
|
||||
})?;
|
||||
|
||||
if let Some(node_id) = entry.get().intent.get_attached() {
|
||||
let generation = entry
|
||||
.get()
|
||||
.generation
|
||||
.expect("Generation is set when in attached mode");
|
||||
response_shards.push(TenantCreateResponseShard {
|
||||
shard_id: tenant_shard_id,
|
||||
node_id: *node_id,
|
||||
generation: generation.into().unwrap(),
|
||||
});
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
let state = entry.insert(TenantShard::new(
|
||||
tenant_shard_id,
|
||||
ShardIdentity::from_params(
|
||||
tenant_shard_id.shard_number,
|
||||
&create_req.shard_parameters,
|
||||
),
|
||||
placement_policy.clone(),
|
||||
));
|
||||
|
||||
state.generation = initial_generation;
|
||||
state.config = create_req.config.clone();
|
||||
if let Err(e) = state.schedule(scheduler, &mut schedule_context) {
|
||||
schcedule_error = Some(e);
|
||||
}
|
||||
|
||||
// Only include shards in result if we are attaching: the purpose
|
||||
// of the response is to tell the caller where the shards are attached.
|
||||
if let Some(node_id) = state.intent.get_attached() {
|
||||
let generation = state
|
||||
.generation
|
||||
.expect("Generation is set when in attached mode");
|
||||
response_shards.push(TenantCreateResponseShard {
|
||||
shard_id: tenant_shard_id,
|
||||
node_id: *node_id,
|
||||
generation: generation.into().unwrap(),
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
for (tid, az_id) in updated {
|
||||
if let Some(shard) = locked.tenants.get_mut(&tid) {
|
||||
shard.set_preferred_az(az_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we failed to schedule shards, then they are still created in the controller,
|
||||
// but we return an error to the requester to avoid a silent failure when someone
|
||||
// tries to e.g. create a tenant whose placement policy requires more nodes than
|
||||
// are present in the system. We do this here rather than in the above loop, to
|
||||
// avoid situations where we only create a subset of shards in the tenant.
|
||||
if let Some(e) = schcedule_error {
|
||||
return Err(ApiError::Conflict(format!(
|
||||
"Failed to schedule shard(s): {e}"
|
||||
)));
|
||||
}
|
||||
// If we failed to schedule shards, then they are still created in the controller,
|
||||
// but we return an error to the requester to avoid a silent failure when someone
|
||||
// tries to e.g. create a tenant whose placement policy requires more nodes than
|
||||
// are present in the system. We do this here rather than in the above loop, to
|
||||
// avoid situations where we only create a subset of shards in the tenant.
|
||||
if let Some(e) = schedule_error {
|
||||
return Err(ApiError::Conflict(format!(
|
||||
"Failed to schedule shard(s): {e}"
|
||||
)));
|
||||
}
|
||||
|
||||
let waiters = tenants
|
||||
let waiters = {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, _scheduler) = locked.parts_mut();
|
||||
tenants
|
||||
.range_mut(TenantShardId::tenant_range(tenant_id))
|
||||
.filter_map(|(_shard_id, shard)| self.maybe_reconcile_shard(shard, nodes))
|
||||
.collect::<Vec<_>>();
|
||||
(waiters, response_shards)
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
Ok((
|
||||
@@ -2149,6 +2149,78 @@ impl Service {
|
||||
))
|
||||
}
|
||||
|
||||
/// Helper for tenant creation that does the scheduling for an individual shard. Covers both the
|
||||
/// case of a new tenant and a pre-existing one.
|
||||
async fn do_initial_shard_scheduling(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
initial_generation: Option<Generation>,
|
||||
shard_params: &ShardParameters,
|
||||
config: TenantConfig,
|
||||
placement_policy: PlacementPolicy,
|
||||
schedule_context: &mut ScheduleContext,
|
||||
) -> InitialShardScheduleOutcome {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (_nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
use std::collections::btree_map::Entry;
|
||||
match tenants.entry(tenant_shard_id) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
tracing::info!("Tenant shard {tenant_shard_id} already exists while creating");
|
||||
|
||||
// TODO: schedule() should take an anti-affinity expression that pushes
|
||||
// attached and secondary locations (independently) away frorm those
|
||||
// pageservers also holding a shard for this tenant.
|
||||
|
||||
if let Err(err) = entry.get_mut().schedule(scheduler, schedule_context) {
|
||||
return InitialShardScheduleOutcome::ShardScheduleError(err);
|
||||
}
|
||||
|
||||
if let Some(node_id) = entry.get().intent.get_attached() {
|
||||
let generation = entry
|
||||
.get()
|
||||
.generation
|
||||
.expect("Generation is set when in attached mode");
|
||||
InitialShardScheduleOutcome::Scheduled(TenantCreateResponseShard {
|
||||
shard_id: tenant_shard_id,
|
||||
node_id: *node_id,
|
||||
generation: generation.into().unwrap(),
|
||||
})
|
||||
} else {
|
||||
InitialShardScheduleOutcome::NotScheduled
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
let state = entry.insert(TenantShard::new(
|
||||
tenant_shard_id,
|
||||
ShardIdentity::from_params(tenant_shard_id.shard_number, shard_params),
|
||||
placement_policy,
|
||||
));
|
||||
|
||||
state.generation = initial_generation;
|
||||
state.config = config;
|
||||
if let Err(e) = state.schedule(scheduler, schedule_context) {
|
||||
return InitialShardScheduleOutcome::ShardScheduleError(e);
|
||||
}
|
||||
|
||||
// Only include shards in result if we are attaching: the purpose
|
||||
// of the response is to tell the caller where the shards are attached.
|
||||
if let Some(node_id) = state.intent.get_attached() {
|
||||
let generation = state
|
||||
.generation
|
||||
.expect("Generation is set when in attached mode");
|
||||
InitialShardScheduleOutcome::Scheduled(TenantCreateResponseShard {
|
||||
shard_id: tenant_shard_id,
|
||||
node_id: *node_id,
|
||||
generation: generation.into().unwrap(),
|
||||
})
|
||||
} else {
|
||||
InitialShardScheduleOutcome::NotScheduled
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper for functions that reconcile a number of shards, and would like to do a timeout-bounded
|
||||
/// wait for reconciliation to complete before responding.
|
||||
async fn await_waiters(
|
||||
@@ -3126,6 +3198,57 @@ impl Service {
|
||||
}).await?
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_block_unblock_gc(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
dir: BlockUnblock,
|
||||
) -> Result<(), ApiError> {
|
||||
let _tenant_lock = trace_shared_lock(
|
||||
&self.tenant_op_locks,
|
||||
tenant_id,
|
||||
TenantOperations::TimelineGcBlockUnblock,
|
||||
)
|
||||
.await;
|
||||
|
||||
self.tenant_remote_mutation(tenant_id, move |targets| async move {
|
||||
if targets.is_empty() {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant not found").into(),
|
||||
));
|
||||
}
|
||||
|
||||
async fn do_one(
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
node: Node,
|
||||
jwt: Option<String>,
|
||||
dir: BlockUnblock,
|
||||
) -> Result<(), ApiError> {
|
||||
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref());
|
||||
|
||||
client
|
||||
.timeline_block_unblock_gc(tenant_shard_id, timeline_id, dir)
|
||||
.await
|
||||
.map_err(|e| passthrough_api_error(&node, e))
|
||||
}
|
||||
|
||||
// no shard needs to go first/last; the operation should be idempotent
|
||||
self.tenant_for_shards(targets, |tenant_shard_id, node| {
|
||||
futures::FutureExt::boxed(do_one(
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
node,
|
||||
self.config.jwt_token.clone(),
|
||||
dir,
|
||||
))
|
||||
})
|
||||
.await
|
||||
})
|
||||
.await??;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Helper for concurrently calling a pageserver API on a number of shards, such as timeline creation.
|
||||
///
|
||||
/// On success, the returned vector contains exactly the same number of elements as the input `locations`.
|
||||
@@ -3511,6 +3634,7 @@ impl Service {
|
||||
is_pending_compute_notification: shard.pending_compute_notification,
|
||||
is_splitting: matches!(shard.splitting, SplitState::Splitting),
|
||||
scheduling_policy: *shard.get_scheduling_policy(),
|
||||
preferred_az_id: shard.preferred_az().map(ToString::to_string),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -4114,7 +4238,7 @@ impl Service {
|
||||
let policy = policy.unwrap();
|
||||
let config = config.unwrap();
|
||||
|
||||
Ok(ShardSplitAction::Split(ShardSplitParams {
|
||||
Ok(ShardSplitAction::Split(Box::new(ShardSplitParams {
|
||||
old_shard_count,
|
||||
new_shard_count: ShardCount::new(split_req.new_shard_count),
|
||||
new_stripe_size: split_req.new_stripe_size,
|
||||
@@ -4122,13 +4246,13 @@ impl Service {
|
||||
policy,
|
||||
config,
|
||||
shard_ident,
|
||||
}))
|
||||
})))
|
||||
}
|
||||
|
||||
async fn do_tenant_shard_split(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
params: ShardSplitParams,
|
||||
params: Box<ShardSplitParams>,
|
||||
) -> Result<(TenantShardSplitResponse, Vec<ReconcilerWaiter>), ApiError> {
|
||||
// FIXME: we have dropped self.inner lock, and not yet written anything to the database: another
|
||||
// request could occur here, deleting or mutating the tenant. begin_shard_split checks that the
|
||||
@@ -4144,7 +4268,7 @@ impl Service {
|
||||
policy,
|
||||
config,
|
||||
shard_ident,
|
||||
} = params;
|
||||
} = *params;
|
||||
|
||||
// Drop any secondary locations: pageservers do not support splitting these, and in any case the
|
||||
// end-state for a split tenant will usually be to have secondary locations on different nodes.
|
||||
@@ -4214,9 +4338,10 @@ impl Service {
|
||||
config: serde_json::to_string(&config).unwrap(),
|
||||
splitting: SplitState::Splitting,
|
||||
|
||||
// Scheduling policies do not carry through to children
|
||||
// Scheduling policies and preferred AZ do not carry through to children
|
||||
scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
|
||||
.unwrap(),
|
||||
preferred_az_id: None,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -4336,6 +4461,47 @@ impl Service {
|
||||
let (response, child_locations, waiters) =
|
||||
self.tenant_shard_split_commit_inmem(tenant_id, new_shard_count, new_stripe_size);
|
||||
|
||||
// Now that we have scheduled the child shards, attempt to set their preferred AZ
|
||||
// to that of the pageserver they've been attached on.
|
||||
let preferred_azs = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
child_locations
|
||||
.iter()
|
||||
.filter_map(|(tid, node_id, _stripe_size)| {
|
||||
let az_id = locked
|
||||
.nodes
|
||||
.get(node_id)
|
||||
.map(|n| n.get_availability_zone_id().to_string())?;
|
||||
|
||||
Some((*tid, az_id))
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
let updated = self
|
||||
.persistence
|
||||
.set_tenant_shard_preferred_azs(preferred_azs)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Failed to persist preferred az ids: {err}"
|
||||
))
|
||||
});
|
||||
|
||||
match updated {
|
||||
Ok(updated) => {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
for (tid, az_id) in updated {
|
||||
if let Some(shard) = locked.tenants.get_mut(&tid) {
|
||||
shard.set_preferred_az(az_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::warn!("Failed to persist preferred AZs after split: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
// Send compute notifications for all the new shards
|
||||
let mut failed_notifications = Vec::new();
|
||||
for (child_id, child_ps, stripe_size) in child_locations {
|
||||
@@ -4810,6 +4976,45 @@ impl Service {
|
||||
))
|
||||
}
|
||||
|
||||
pub(crate) async fn get_node_shards(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
) -> Result<NodeShardResponse, ApiError> {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let mut shards = Vec::new();
|
||||
for (tid, tenant) in locked.tenants.iter() {
|
||||
let is_intended_secondary = match (
|
||||
tenant.intent.get_attached() == &Some(node_id),
|
||||
tenant.intent.get_secondary().contains(&node_id),
|
||||
) {
|
||||
(true, true) => {
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"{} attached as primary+secondary on the same node",
|
||||
tid
|
||||
)))
|
||||
}
|
||||
(true, false) => Some(false),
|
||||
(false, true) => Some(true),
|
||||
(false, false) => None,
|
||||
};
|
||||
let is_observed_secondary = if let Some(ObservedStateLocation { conf: Some(conf) }) =
|
||||
tenant.observed.locations.get(&node_id)
|
||||
{
|
||||
Some(conf.secondary_conf.is_some())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
if is_intended_secondary.is_some() || is_observed_secondary.is_some() {
|
||||
shards.push(NodeShard {
|
||||
tenant_shard_id: *tid,
|
||||
is_intended_secondary,
|
||||
is_observed_secondary,
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(NodeShardResponse { node_id, shards })
|
||||
}
|
||||
|
||||
pub(crate) async fn get_leader(&self) -> DatabaseResult<Option<ControllerPersistence>> {
|
||||
self.persistence.get_leader().await
|
||||
}
|
||||
@@ -6497,4 +6702,35 @@ impl Service {
|
||||
) -> Result<(), DatabaseError> {
|
||||
self.persistence.safekeeper_upsert(record).await
|
||||
}
|
||||
|
||||
pub(crate) async fn update_shards_preferred_azs(
|
||||
&self,
|
||||
req: ShardsPreferredAzsRequest,
|
||||
) -> Result<ShardsPreferredAzsResponse, ApiError> {
|
||||
let preferred_azs = req.preferred_az_ids.into_iter().collect::<Vec<_>>();
|
||||
let updated = self
|
||||
.persistence
|
||||
.set_tenant_shard_preferred_azs(preferred_azs)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Failed to persist preferred AZs: {err}"
|
||||
))
|
||||
})?;
|
||||
|
||||
let mut updated_in_mem_and_db = Vec::default();
|
||||
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
for (tid, az_id) in updated {
|
||||
let shard = locked.tenants.get_mut(&tid);
|
||||
if let Some(shard) = shard {
|
||||
shard.set_preferred_az(az_id);
|
||||
updated_in_mem_and_db.push(tid);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(ShardsPreferredAzsResponse {
|
||||
updated: updated_in_mem_and_db,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -140,6 +140,10 @@ pub(crate) struct TenantShard {
|
||||
// Support/debug tool: if something is going wrong or flapping with scheduling, this may
|
||||
// be set to a non-active state to avoid making changes while the issue is fixed.
|
||||
scheduling_policy: ShardSchedulingPolicy,
|
||||
|
||||
// We should attempt to schedule this shard in the provided AZ to
|
||||
// decrease chances of cross-AZ compute.
|
||||
preferred_az_id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Default, Clone, Debug, Serialize)]
|
||||
@@ -463,6 +467,7 @@ impl TenantShard {
|
||||
last_error: Arc::default(),
|
||||
pending_compute_notification: false,
|
||||
scheduling_policy: ShardSchedulingPolicy::default(),
|
||||
preferred_az_id: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1297,6 +1302,7 @@ impl TenantShard {
|
||||
pending_compute_notification: false,
|
||||
delayed_reconcile: false,
|
||||
scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
|
||||
preferred_az_id: tsp.preferred_az_id,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1312,8 +1318,17 @@ impl TenantShard {
|
||||
config: serde_json::to_string(&self.config).unwrap(),
|
||||
splitting: SplitState::default(),
|
||||
scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
|
||||
preferred_az_id: self.preferred_az_id.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn preferred_az(&self) -> Option<&str> {
|
||||
self.preferred_az_id.as_deref()
|
||||
}
|
||||
|
||||
pub(crate) fn set_preferred_az(&mut self, preferred_az_id: String) {
|
||||
self.preferred_az_id = Some(preferred_az_id);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
//! Functionality for finding and purging garbage, as in "garbage collection". Garbage means
|
||||
//! S3 objects which are either not referenced by any metadata, or are referenced by a
|
||||
//! control plane tenant/timeline in a deleted state.
|
||||
//! Functionality for finding and purging garbage, as in "garbage collection".
|
||||
//!
|
||||
//! Garbage means S3 objects which are either not referenced by any metadata,
|
||||
//! or are referenced by a control plane tenant/timeline in a deleted state.
|
||||
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
|
||||
@@ -74,7 +74,9 @@ pub async fn stream_tenant_shards<'a>(
|
||||
}
|
||||
|
||||
/// Given a `TenantShardId`, output a stream of the timelines within that tenant, discovered
|
||||
/// using a listing. The listing is done before the stream is built, so that this
|
||||
/// using a listing.
|
||||
///
|
||||
/// The listing is done before the stream is built, so that this
|
||||
/// function can be used to generate concurrency on a stream using buffer_unordered.
|
||||
pub async fn stream_tenant_timelines<'a>(
|
||||
remote_client: &'a GenericRemoteStorage,
|
||||
|
||||
@@ -440,9 +440,10 @@ async fn gc_ancestor(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Physical garbage collection: removing unused S3 objects. This is distinct from the garbage collection
|
||||
/// done inside the pageserver, which operates at a higher level (keys, layers). This type of garbage collection
|
||||
/// is about removing:
|
||||
/// Physical garbage collection: removing unused S3 objects.
|
||||
///
|
||||
/// This is distinct from the garbage collection done inside the pageserver, which operates at a higher level
|
||||
/// (keys, layers). This type of garbage collection is about removing:
|
||||
/// - Objects that were uploaded but never referenced in the remote index (e.g. because of a shutdown between
|
||||
/// uploading a layer and uploading an index)
|
||||
/// - Index objects from historic generations
|
||||
|
||||
@@ -140,6 +140,14 @@ class TenantId(Id):
|
||||
return self.id.hex()
|
||||
|
||||
|
||||
class NodeId(Id):
|
||||
def __repr__(self) -> str:
|
||||
return f'`NodeId("{self.id.hex()}")'
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.id.hex()
|
||||
|
||||
|
||||
class TimelineId(Id):
|
||||
def __repr__(self) -> str:
|
||||
return f'TimelineId("{self.id.hex()}")'
|
||||
|
||||
@@ -62,7 +62,7 @@ from urllib3.util.retry import Retry
|
||||
|
||||
from fixtures import overlayfs
|
||||
from fixtures.broker import NeonBroker
|
||||
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
|
||||
from fixtures.common_types import Lsn, NodeId, TenantId, TenantShardId, TimelineId
|
||||
from fixtures.endpoint.http import EndpointHttpClient
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
|
||||
@@ -2560,7 +2560,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
|
||||
def tenant_describe(self, tenant_id: TenantId):
|
||||
"""
|
||||
:return: list of {"shard_id": "", "node_id": int, "listen_pg_addr": str, "listen_pg_port": int, "listen_http_addr: str, "listen_http_port: int}
|
||||
:return: list of {"shard_id": "", "node_id": int, "listen_pg_addr": str, "listen_pg_port": int, "listen_http_addr: str, "listen_http_port: int, preferred_az_id: str}
|
||||
"""
|
||||
response = self.request(
|
||||
"GET",
|
||||
@@ -2570,6 +2570,30 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def nodes(self):
|
||||
"""
|
||||
:return: list of {"id": ""}
|
||||
"""
|
||||
response = self.request(
|
||||
"GET",
|
||||
f"{self.api}/control/v1/node",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def node_shards(self, node_id: NodeId):
|
||||
"""
|
||||
:return: list of {"shard_id": "", "is_secondary": bool}
|
||||
"""
|
||||
response = self.request(
|
||||
"GET",
|
||||
f"{self.api}/control/v1/node/{node_id}/shards",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def tenant_shard_split(
|
||||
self, tenant_id: TenantId, shard_count: int, shard_stripe_size: Optional[int] = None
|
||||
) -> list[TenantShardId]:
|
||||
@@ -2886,6 +2910,17 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
return None
|
||||
raise e
|
||||
|
||||
def set_preferred_azs(self, preferred_azs: dict[TenantShardId, str]) -> list[TenantShardId]:
|
||||
response = self.request(
|
||||
"PUT",
|
||||
f"{self.api}/control/v1/preferred_azs",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
json={str(tid): az for tid, az in preferred_azs.items()},
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
return [TenantShardId.parse(tid) for tid in response.json()["updated"]]
|
||||
|
||||
def __enter__(self) -> "NeonStorageController":
|
||||
return self
|
||||
|
||||
|
||||
@@ -50,6 +50,19 @@ class SafekeeperMetrics(Metrics):
|
||||
).value
|
||||
|
||||
|
||||
@dataclass
|
||||
class TermBumpResponse:
|
||||
previous_term: int
|
||||
current_term: int
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, d: Dict[str, Any]) -> "TermBumpResponse":
|
||||
return TermBumpResponse(
|
||||
previous_term=d["previous_term"],
|
||||
current_term=d["current_term"],
|
||||
)
|
||||
|
||||
|
||||
class SafekeeperHttpClient(requests.Session, MetricsGetter):
|
||||
HTTPError = requests.HTTPError
|
||||
|
||||
@@ -252,6 +265,22 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
|
||||
res.raise_for_status()
|
||||
return res.json()
|
||||
|
||||
def term_bump(
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
term: Optional[int],
|
||||
) -> TermBumpResponse:
|
||||
body = {}
|
||||
if term is not None:
|
||||
body["term"] = term
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/term_bump",
|
||||
json=body,
|
||||
)
|
||||
res.raise_for_status()
|
||||
return TermBumpResponse.from_json(res.json())
|
||||
|
||||
def record_safekeeper_info(self, tenant_id: TenantId, timeline_id: TimelineId, body):
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/record_safekeeper_info/{tenant_id}/{timeline_id}",
|
||||
|
||||
@@ -1552,6 +1552,12 @@ def test_tenant_import(neon_env_builder: NeonEnvBuilder, shard_count, remote_sto
|
||||
literal_shard_count = 1 if shard_count is None else shard_count
|
||||
assert len(describe["shards"]) == literal_shard_count
|
||||
|
||||
nodes = env.storage_controller.nodes()
|
||||
assert len(nodes) == 2
|
||||
describe1 = env.storage_controller.node_shards(nodes[0]["id"])
|
||||
describe2 = env.storage_controller.node_shards(nodes[1]["id"])
|
||||
assert len(describe1["shards"]) + len(describe2["shards"]) == literal_shard_count
|
||||
|
||||
# Check the data is still there: this implicitly proves that we recovered generation numbers
|
||||
# properly, for the timeline which was written to after a generation bump.
|
||||
for timeline, branch, expect_rows in [
|
||||
@@ -2512,3 +2518,55 @@ def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool:
|
||||
del d[key]
|
||||
|
||||
return compared[0] == compared[1]
|
||||
|
||||
|
||||
@run_only_on_default_postgres("this is like a 'unit test' against storcon db")
|
||||
def test_shard_preferred_azs(neon_env_builder: NeonEnvBuilder):
|
||||
def assign_az(ps_cfg):
|
||||
az = f"az-{ps_cfg['id']}"
|
||||
ps_cfg["availability_zone"] = az
|
||||
|
||||
neon_env_builder.pageserver_config_override = assign_az
|
||||
neon_env_builder.num_pageservers = 2
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
tids = [TenantId.generate() for _ in range(0, 3)]
|
||||
for tid in tids:
|
||||
env.storage_controller.tenant_create(tid)
|
||||
|
||||
shards = env.storage_controller.tenant_describe(tid)["shards"]
|
||||
assert len(shards) == 1
|
||||
attached_to = shards[0]["node_attached"]
|
||||
expected_az = env.get_pageserver(attached_to).az_id
|
||||
|
||||
assert shards[0]["preferred_az_id"] == expected_az
|
||||
|
||||
updated = env.storage_controller.set_preferred_azs(
|
||||
{TenantShardId(tid, 0, 0): "foo" for tid in tids}
|
||||
)
|
||||
|
||||
assert set(updated) == set([TenantShardId(tid, 0, 0) for tid in tids])
|
||||
|
||||
for tid in tids:
|
||||
shards = env.storage_controller.tenant_describe(tid)["shards"]
|
||||
assert len(shards) == 1
|
||||
assert shards[0]["preferred_az_id"] == "foo"
|
||||
|
||||
# Generate a layer to avoid shard split handling on ps from tripping
|
||||
# up on debug assert.
|
||||
timeline_id = TimelineId.generate()
|
||||
env.neon_cli.create_timeline("bar", tids[0], timeline_id)
|
||||
|
||||
workload = Workload(env, tids[0], timeline_id, branch_name="bar")
|
||||
workload.init()
|
||||
workload.write_rows(256)
|
||||
workload.validate()
|
||||
|
||||
env.storage_controller.tenant_shard_split(tids[0], shard_count=2)
|
||||
shards = env.storage_controller.tenant_describe(tids[0])["shards"]
|
||||
assert len(shards) == 2
|
||||
for shard in shards:
|
||||
attached_to = shard["node_attached"]
|
||||
expected_az = env.get_pageserver(attached_to).az_id
|
||||
assert shard["preferred_az_id"] == expected_az
|
||||
|
||||
@@ -1,17 +1,32 @@
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Optional
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
LogCursor,
|
||||
NeonEnvBuilder,
|
||||
NeonPageserver,
|
||||
)
|
||||
from fixtures.pageserver.utils import wait_timeline_detail_404
|
||||
|
||||
|
||||
def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
@pytest.mark.parametrize("sharded", [True, False])
|
||||
def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder, sharded: bool):
|
||||
neon_env_builder.num_pageservers = 2 if sharded else 1
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={"gc_period": "1s", "lsn_lease_length": "0s"}
|
||||
initial_tenant_conf={"gc_period": "1s", "lsn_lease_length": "0s"},
|
||||
initial_tenant_shard_count=2 if sharded else None,
|
||||
)
|
||||
ps = env.pageserver
|
||||
http = ps.http_client()
|
||||
|
||||
if sharded:
|
||||
http = env.storage_controller.pageserver_api()
|
||||
else:
|
||||
http = env.pageserver.http_client()
|
||||
|
||||
pss = ManyPageservers(list(map(lambda ps: ScrollableLog(ps, None), env.pageservers)))
|
||||
|
||||
foo_branch = env.neon_cli.create_branch("foo", "main", env.initial_tenant)
|
||||
|
||||
@@ -22,9 +37,8 @@ def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
tenant_before = http.tenant_status(env.initial_tenant)
|
||||
|
||||
wait_for_another_gc_round()
|
||||
_, offset = ps.assert_log_contains(gc_active_line)
|
||||
|
||||
assert ps.log_contains(gc_skipped_line, offset) is None
|
||||
pss.assert_log_contains(gc_active_line)
|
||||
pss.assert_log_does_not_contain(gc_skipped_line)
|
||||
|
||||
http.timeline_block_gc(env.initial_tenant, foo_branch)
|
||||
|
||||
@@ -34,34 +48,78 @@ def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
assert gc_blocking == "BlockingReasons { timelines: 1, reasons: EnumSet(Manual) }"
|
||||
|
||||
wait_for_another_gc_round()
|
||||
_, offset = ps.assert_log_contains(gc_skipped_line, offset)
|
||||
pss.assert_log_contains(gc_skipped_line)
|
||||
|
||||
ps.restart()
|
||||
ps.quiesce_tenants()
|
||||
pss.restart()
|
||||
pss.quiesce_tenants()
|
||||
|
||||
_, offset = env.pageserver.assert_log_contains(init_gc_skipped, offset)
|
||||
pss.assert_log_contains(init_gc_skipped)
|
||||
|
||||
wait_for_another_gc_round()
|
||||
_, offset = ps.assert_log_contains(gc_skipped_line, offset)
|
||||
pss.assert_log_contains(gc_skipped_line)
|
||||
|
||||
# deletion unblocks gc
|
||||
http.timeline_delete(env.initial_tenant, foo_branch)
|
||||
wait_timeline_detail_404(http, env.initial_tenant, foo_branch, 10, 1.0)
|
||||
|
||||
wait_for_another_gc_round()
|
||||
_, offset = ps.assert_log_contains(gc_active_line, offset)
|
||||
pss.assert_log_contains(gc_active_line)
|
||||
|
||||
http.timeline_block_gc(env.initial_tenant, env.initial_timeline)
|
||||
|
||||
wait_for_another_gc_round()
|
||||
_, offset = ps.assert_log_contains(gc_skipped_line, offset)
|
||||
pss.assert_log_contains(gc_skipped_line)
|
||||
|
||||
# removing the manual block also unblocks gc
|
||||
http.timeline_unblock_gc(env.initial_tenant, env.initial_timeline)
|
||||
|
||||
wait_for_another_gc_round()
|
||||
_, offset = ps.assert_log_contains(gc_active_line, offset)
|
||||
pss.assert_log_contains(gc_active_line)
|
||||
|
||||
|
||||
def wait_for_another_gc_round():
|
||||
time.sleep(2)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ScrollableLog:
|
||||
pageserver: NeonPageserver
|
||||
offset: Optional[LogCursor]
|
||||
|
||||
def assert_log_contains(self, what: str):
|
||||
msg, offset = self.pageserver.assert_log_contains(what, offset=self.offset)
|
||||
old = self.offset
|
||||
self.offset = offset
|
||||
log.info(f"{old} -> {offset}: {msg}")
|
||||
|
||||
def assert_log_does_not_contain(self, what: str):
|
||||
assert self.pageserver.log_contains(what) is None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ManyPageservers:
|
||||
many: List[ScrollableLog]
|
||||
|
||||
def assert_log_contains(self, what: str):
|
||||
for one in self.many:
|
||||
one.assert_log_contains(what)
|
||||
|
||||
def assert_log_does_not_contain(self, what: str):
|
||||
for one in self.many:
|
||||
one.assert_log_does_not_contain(what)
|
||||
|
||||
def restart(self):
|
||||
def do_restart(x: ScrollableLog):
|
||||
x.pageserver.restart()
|
||||
|
||||
with ThreadPoolExecutor(max_workers=len(self.many)) as rt:
|
||||
rt.map(do_restart, self.many)
|
||||
rt.shutdown(wait=True)
|
||||
|
||||
def quiesce_tenants(self):
|
||||
def do_quiesce(x: ScrollableLog):
|
||||
x.pageserver.quiesce_tenants()
|
||||
|
||||
with ThreadPoolExecutor(max_workers=len(self.many)) as rt:
|
||||
rt.map(do_quiesce, self.many)
|
||||
rt.shutdown(wait=True)
|
||||
|
||||
@@ -1057,6 +1057,24 @@ def test_restart_endpoint(neon_env_builder: NeonEnvBuilder):
|
||||
endpoint.start()
|
||||
|
||||
|
||||
# Try restarting endpoint immediately after xlog switch.
|
||||
# https://github.com/neondatabase/neon/issues/8911
|
||||
def test_restart_endpoint_after_switch_wal(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
endpoint.safe_psql("create table t (i int)")
|
||||
|
||||
endpoint.safe_psql("SELECT pg_switch_wal()")
|
||||
|
||||
# we want immediate shutdown to have endpoint restart on xlog switch record,
|
||||
# so prevent shutdown checkpoint.
|
||||
endpoint.stop(mode="immediate")
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
endpoint.safe_psql("SELECT 'works'")
|
||||
|
||||
|
||||
# Context manager which logs passed time on exit.
|
||||
class DurationLogger:
|
||||
def __init__(self, desc):
|
||||
@@ -2176,6 +2194,43 @@ def test_patch_control_file(neon_env_builder: NeonEnvBuilder):
|
||||
assert res["timelines"][0]["control_file"]["timeline_start_lsn"] == "0/1"
|
||||
|
||||
|
||||
def test_term_bump(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
# initialize safekeeper
|
||||
endpoint.safe_psql("create table t(key int, value text)")
|
||||
|
||||
http_cli = env.safekeepers[0].http_client()
|
||||
|
||||
# check that bump up to specific term works
|
||||
curr_term = http_cli.timeline_status(tenant_id, timeline_id).term
|
||||
bump_to = curr_term + 3
|
||||
res = http_cli.term_bump(tenant_id, timeline_id, bump_to)
|
||||
log.info(f"bump to {bump_to} res: {res}")
|
||||
assert res.current_term >= bump_to
|
||||
|
||||
# check that bump to none increments current term
|
||||
res = http_cli.term_bump(tenant_id, timeline_id, None)
|
||||
log.info(f"bump to None res: {res}")
|
||||
assert res.current_term > bump_to
|
||||
assert res.current_term > res.previous_term
|
||||
|
||||
# check that bumping doesn't work downward
|
||||
res = http_cli.term_bump(tenant_id, timeline_id, 2)
|
||||
log.info(f"bump to 2 res: {res}")
|
||||
assert res.current_term > bump_to
|
||||
assert res.current_term == res.previous_term
|
||||
|
||||
# check that this doesn't kill endpoint because last WAL flush was his and
|
||||
# thus its basebackup is still good
|
||||
endpoint.safe_psql("insert into t values (1, 'payload')")
|
||||
|
||||
|
||||
# Test disables periodic pushes from safekeeper to the broker and checks that
|
||||
# pageserver can still discover safekeepers with discovery requests.
|
||||
def test_broker_discovery(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
@@ -8,6 +8,8 @@ version = "0.1.0"
|
||||
description = "workspace-hack package, managed by hakari"
|
||||
# You can choose to publish this crate: see https://docs.rs/cargo-hakari/latest/cargo_hakari/publishing.
|
||||
publish = false
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
# The parts of the file between the BEGIN HAKARI SECTION and END HAKARI SECTION comments
|
||||
# are managed by hakari.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user