Merge commit 'cec0543b5' into problame/standby-horizon-leases

This commit is contained in:
Christian Schwarz
2025-08-06 17:47:10 +02:00
43 changed files with 1216 additions and 266 deletions

3
Cargo.lock generated
View File

@@ -4294,6 +4294,7 @@ dependencies = [
"humantime-serde", "humantime-serde",
"pageserver_api", "pageserver_api",
"pageserver_client", "pageserver_client",
"pageserver_client_grpc",
"pageserver_page_api", "pageserver_page_api",
"rand 0.8.5", "rand 0.8.5",
"reqwest", "reqwest",
@@ -4323,6 +4324,7 @@ dependencies = [
"pageserver_api", "pageserver_api",
"postgres_ffi", "postgres_ffi",
"remote_storage", "remote_storage",
"serde",
"serde_json", "serde_json",
"svg_fmt", "svg_fmt",
"thiserror 1.0.69", "thiserror 1.0.69",
@@ -4499,6 +4501,7 @@ name = "pageserver_client_grpc"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"arc-swap",
"bytes", "bytes",
"compute_api", "compute_api",
"futures", "futures",

View File

@@ -262,6 +262,7 @@ neon-shmem = { version = "0.1", path = "./libs/neon-shmem/" }
pageserver = { path = "./pageserver" } pageserver = { path = "./pageserver" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" } pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
pageserver_client = { path = "./pageserver/client" } pageserver_client = { path = "./pageserver/client" }
pageserver_client_grpc = { path = "./pageserver/client_grpc" }
pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" } pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" }
pageserver_page_api = { path = "./pageserver/page_api" } pageserver_page_api = { path = "./pageserver/page_api" }
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" } postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }

View File

@@ -46,11 +46,14 @@ stateDiagram-v2
Configuration --> Failed : Failed to configure the compute Configuration --> Failed : Failed to configure the compute
Configuration --> Running : Compute has been configured Configuration --> Running : Compute has been configured
Empty --> Init : Compute spec is immediately available Empty --> Init : Compute spec is immediately available
Empty --> TerminationPending : Requested termination Empty --> TerminationPendingFast : Requested termination
Empty --> TerminationPendingImmediate : Requested termination
Init --> Failed : Failed to start Postgres Init --> Failed : Failed to start Postgres
Init --> Running : Started Postgres Init --> Running : Started Postgres
Running --> TerminationPending : Requested termination Running --> TerminationPendingFast : Requested termination
TerminationPending --> Terminated : Terminated compute Running --> TerminationPendingImmediate : Requested termination
TerminationPendingFast --> Terminated compute with 30s delay for cplane to inspect status
TerminationPendingImmediate --> Terminated : Terminated compute immediately
Failed --> [*] : Compute exited Failed --> [*] : Compute exited
Terminated --> [*] : Compute exited Terminated --> [*] : Compute exited
``` ```

View File

@@ -961,14 +961,20 @@ impl ComputeNode {
None None
}; };
let mut delay_exit = false;
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
state.terminate_flush_lsn = lsn; state.terminate_flush_lsn = lsn;
if let ComputeStatus::TerminationPending { mode } = state.status {
let delay_exit = state.status == ComputeStatus::TerminationPendingFast;
if state.status == ComputeStatus::TerminationPendingFast
|| state.status == ComputeStatus::TerminationPendingImmediate
{
info!(
"Changing compute status from {} to {}",
state.status,
ComputeStatus::Terminated
);
state.status = ComputeStatus::Terminated; state.status = ComputeStatus::Terminated;
self.state_changed.notify_all(); self.state_changed.notify_all();
// we were asked to terminate gracefully, don't exit to avoid restart
delay_exit = mode == compute_api::responses::TerminateMode::Fast
} }
drop(state); drop(state);
@@ -1810,6 +1816,8 @@ impl ComputeNode {
tls_config, tls_config,
)?; )?;
self.pg_reload_conf()?;
if !spec.skip_pg_catalog_updates { if !spec.skip_pg_catalog_updates {
let max_concurrent_connections = spec.reconfigure_concurrency; let max_concurrent_connections = spec.reconfigure_concurrency;
// Temporarily reset max_cluster_size in config // Temporarily reset max_cluster_size in config
@@ -1829,10 +1837,9 @@ impl ComputeNode {
Ok(()) Ok(())
})?; })?;
self.pg_reload_conf()?;
} }
self.pg_reload_conf()?;
let unknown_op = "unknown".to_string(); let unknown_op = "unknown".to_string();
let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op); let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
info!( info!(
@@ -1905,7 +1912,8 @@ impl ComputeNode {
// exit loop // exit loop
ComputeStatus::Failed ComputeStatus::Failed
| ComputeStatus::TerminationPending { .. } | ComputeStatus::TerminationPendingFast
| ComputeStatus::TerminationPendingImmediate
| ComputeStatus::Terminated => break 'cert_update, | ComputeStatus::Terminated => break 'cert_update,
// wait // wait

View File

@@ -70,7 +70,7 @@ impl ComputeNode {
} }
}; };
let row = match client let row = match client
.query_one("select * from get_prewarm_info()", &[]) .query_one("select * from neon.get_prewarm_info()", &[])
.await .await
{ {
Ok(row) => row, Ok(row) => row,
@@ -146,7 +146,7 @@ impl ComputeNode {
ComputeNode::get_maintenance_client(&self.tokio_conn_conf) ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await .await
.context("connecting to postgres")? .context("connecting to postgres")?
.query_one("select prewarm_local_cache($1)", &[&uncompressed]) .query_one("select neon.prewarm_local_cache($1)", &[&uncompressed])
.await .await
.context("loading LFC state into postgres") .context("loading LFC state into postgres")
.map(|_| ()) .map(|_| ())
@@ -196,7 +196,7 @@ impl ComputeNode {
ComputeNode::get_maintenance_client(&self.tokio_conn_conf) ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await .await
.context("connecting to postgres")? .context("connecting to postgres")?
.query_one("select get_local_cache_state()", &[]) .query_one("select neon.get_local_cache_state()", &[])
.await .await
.context("querying LFC state")? .context("querying LFC state")?
.try_get::<usize, &[u8]>(0) .try_get::<usize, &[u8]>(0)

View File

@@ -371,9 +371,28 @@ paths:
summary: Terminate Postgres and wait for it to exit summary: Terminate Postgres and wait for it to exit
description: "" description: ""
operationId: terminate operationId: terminate
parameters:
- name: mode
in: query
description: "Terminate mode: fast (wait 30s before returning) and immediate"
required: false
schema:
type: string
enum: ["fast", "immediate"]
default: fast
responses: responses:
200: 200:
description: Result description: Result
content:
application/json:
schema:
$ref: "#/components/schemas/TerminateResponse"
201:
description: Result if compute is already terminated
content:
application/json:
schema:
$ref: "#/components/schemas/TerminateResponse"
412: 412:
description: "wrong state" description: "wrong state"
content: content:
@@ -530,11 +549,14 @@ components:
type: string type: string
enum: enum:
- empty - empty
- init
- failed
- running
- configuration_pending - configuration_pending
- init
- running
- configuration - configuration
- failed
- termination_pending_fast
- termination_pending_immediate
- terminated
example: running example: running
ExtensionInstallRequest: ExtensionInstallRequest:
@@ -660,6 +682,17 @@ components:
description: Role name. description: Role name.
example: "neon" example: "neon"
TerminateResponse:
type: object
required:
- lsn
properties:
lsn:
type: string
nullable: true
description: "last WAL flush LSN"
example: "0/028F10D8"
SetRoleGrantsResponse: SetRoleGrantsResponse:
type: object type: object
required: required:

View File

@@ -3,7 +3,7 @@ use crate::http::JsonResponse;
use axum::extract::State; use axum::extract::State;
use axum::response::Response; use axum::response::Response;
use axum_extra::extract::OptionalQuery; use axum_extra::extract::OptionalQuery;
use compute_api::responses::{ComputeStatus, TerminateResponse}; use compute_api::responses::{ComputeStatus, TerminateMode, TerminateResponse};
use http::StatusCode; use http::StatusCode;
use serde::Deserialize; use serde::Deserialize;
use std::sync::Arc; use std::sync::Arc;
@@ -12,7 +12,7 @@ use tracing::info;
#[derive(Deserialize, Default)] #[derive(Deserialize, Default)]
pub struct TerminateQuery { pub struct TerminateQuery {
mode: compute_api::responses::TerminateMode, mode: TerminateMode,
} }
/// Terminate the compute. /// Terminate the compute.
@@ -24,16 +24,16 @@ pub(in crate::http) async fn terminate(
{ {
let mut state = compute.state.lock().unwrap(); let mut state = compute.state.lock().unwrap();
if state.status == ComputeStatus::Terminated { if state.status == ComputeStatus::Terminated {
return JsonResponse::success(StatusCode::CREATED, state.terminate_flush_lsn); let response = TerminateResponse {
lsn: state.terminate_flush_lsn,
};
return JsonResponse::success(StatusCode::CREATED, response);
} }
if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) { if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) {
return JsonResponse::invalid_status(state.status); return JsonResponse::invalid_status(state.status);
} }
state.set_status( state.set_status(mode.into(), &compute.state_changed);
ComputeStatus::TerminationPending { mode },
&compute.state_changed,
);
} }
forward_termination_signal(false); forward_termination_signal(false);

View File

@@ -1,3 +1,16 @@
-- On December 8th, 2023, an engineering escalation (INC-110) was opened after
-- it was found that BYPASSRLS was being applied to all roles.
--
-- PR that introduced the issue: https://github.com/neondatabase/neon/pull/5657
-- Subsequent commit on main: https://github.com/neondatabase/neon/commit/ad99fa5f0393e2679e5323df653c508ffa0ac072
--
-- NOBYPASSRLS and INHERIT are the defaults for a Postgres role, but because it
-- isn't easy to know if a Postgres cluster is affected by the issue, we need to
-- keep the migration around for a long time, if not indefinitely, so any
-- cluster can be fixed.
--
-- Branching is the gift that keeps on giving...
DO $$ DO $$
DECLARE DECLARE
role_name text; role_name text;

View File

@@ -85,7 +85,8 @@ impl ComputeMonitor {
if matches!( if matches!(
compute_status, compute_status,
ComputeStatus::Terminated ComputeStatus::Terminated
| ComputeStatus::TerminationPending { .. } | ComputeStatus::TerminationPendingFast
| ComputeStatus::TerminationPendingImmediate
| ComputeStatus::Failed | ComputeStatus::Failed
) { ) {
info!( info!(

View File

@@ -923,7 +923,8 @@ impl Endpoint {
ComputeStatus::Empty ComputeStatus::Empty
| ComputeStatus::ConfigurationPending | ComputeStatus::ConfigurationPending
| ComputeStatus::Configuration | ComputeStatus::Configuration
| ComputeStatus::TerminationPending { .. } | ComputeStatus::TerminationPendingFast
| ComputeStatus::TerminationPendingImmediate
| ComputeStatus::Terminated => { | ComputeStatus::Terminated => {
bail!("unexpected compute status: {:?}", state.status) bail!("unexpected compute status: {:?}", state.status)
} }

View File

@@ -121,6 +121,15 @@ pub enum TerminateMode {
Immediate, Immediate,
} }
impl From<TerminateMode> for ComputeStatus {
fn from(mode: TerminateMode) -> Self {
match mode {
TerminateMode::Fast => ComputeStatus::TerminationPendingFast,
TerminateMode::Immediate => ComputeStatus::TerminationPendingImmediate,
}
}
}
#[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq)] #[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
pub enum ComputeStatus { pub enum ComputeStatus {
@@ -141,7 +150,9 @@ pub enum ComputeStatus {
// control-plane to terminate it. // control-plane to terminate it.
Failed, Failed,
// Termination requested // Termination requested
TerminationPending { mode: TerminateMode }, TerminationPendingFast,
// Termination requested, without waiting 30s before returning from /terminate
TerminationPendingImmediate,
// Terminated Postgres // Terminated Postgres
Terminated, Terminated,
} }
@@ -160,7 +171,10 @@ impl Display for ComputeStatus {
ComputeStatus::Running => f.write_str("running"), ComputeStatus::Running => f.write_str("running"),
ComputeStatus::Configuration => f.write_str("configuration"), ComputeStatus::Configuration => f.write_str("configuration"),
ComputeStatus::Failed => f.write_str("failed"), ComputeStatus::Failed => f.write_str("failed"),
ComputeStatus::TerminationPending { .. } => f.write_str("termination-pending"), ComputeStatus::TerminationPendingFast => f.write_str("termination-pending-fast"),
ComputeStatus::TerminationPendingImmediate => {
f.write_str("termination-pending-immediate")
}
ComputeStatus::Terminated => f.write_str("terminated"), ComputeStatus::Terminated => f.write_str("terminated"),
} }
} }

View File

@@ -428,6 +428,12 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
shard_number: 0, shard_number: 0,
}; };
let empty_wal_rate_limiter = crate::bindings::WalRateLimiter {
should_limit: crate::bindings::pg_atomic_uint32 { value: 0 },
sent_bytes: 0,
last_recorded_time_us: 0,
};
crate::bindings::WalproposerShmemState { crate::bindings::WalproposerShmemState {
propEpochStartLsn: crate::bindings::pg_atomic_uint64 { value: 0 }, propEpochStartLsn: crate::bindings::pg_atomic_uint64 { value: 0 },
donor_name: [0; 64], donor_name: [0; 64],
@@ -441,6 +447,7 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
num_shards: 0, num_shards: 0,
replica_promote: false, replica_promote: false,
min_ps_feedback: empty_feedback, min_ps_feedback: empty_feedback,
wal_rate_limiter: empty_wal_rate_limiter,
} }
} }

View File

@@ -1,4 +1,4 @@
use std::collections::HashMap; use std::collections::{BTreeMap, HashMap};
use std::error::Error as _; use std::error::Error as _;
use std::time::Duration; use std::time::Duration;
@@ -251,6 +251,70 @@ impl Client {
Ok(()) Ok(())
} }
pub async fn tenant_timeline_compact(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
force_image_layer_creation: bool,
must_force_image_layer_creation: bool,
scheduled: bool,
wait_until_done: bool,
) -> Result<()> {
let mut path = reqwest::Url::parse(&format!(
"{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/compact",
self.mgmt_api_endpoint
))
.expect("Cannot build URL");
if force_image_layer_creation {
path.query_pairs_mut()
.append_pair("force_image_layer_creation", "true");
}
if must_force_image_layer_creation {
path.query_pairs_mut()
.append_pair("must_force_image_layer_creation", "true");
}
if scheduled {
path.query_pairs_mut().append_pair("scheduled", "true");
}
if wait_until_done {
path.query_pairs_mut()
.append_pair("wait_until_scheduled_compaction_done", "true");
path.query_pairs_mut()
.append_pair("wait_until_uploaded", "true");
}
self.request(Method::PUT, path, ()).await?;
Ok(())
}
/* BEGIN_HADRON */
pub async fn tenant_timeline_describe(
&self,
tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
) -> Result<TimelineInfo> {
let mut path = reqwest::Url::parse(&format!(
"{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
self.mgmt_api_endpoint
))
.expect("Cannot build URL");
path.query_pairs_mut()
.append_pair("include-image-consistent-lsn", "true");
let response: reqwest::Response = self.request(Method::GET, path, ()).await?;
let body = response.json().await.map_err(Error::ReceiveBody)?;
Ok(body)
}
pub async fn list_tenant_visible_size(&self) -> Result<BTreeMap<TenantShardId, u64>> {
let uri = format!("{}/v1/list_tenant_visible_size", self.mgmt_api_endpoint);
let resp = self.get(&uri).await?;
resp.json().await.map_err(Error::ReceiveBody)
}
/* END_HADRON */
pub async fn tenant_scan_remote_storage( pub async fn tenant_scan_remote_storage(
&self, &self,
tenant_id: TenantId, tenant_id: TenantId,

View File

@@ -9,6 +9,7 @@ testing = ["pageserver_api/testing"]
[dependencies] [dependencies]
anyhow.workspace = true anyhow.workspace = true
arc-swap.workspace = true
bytes.workspace = true bytes.workspace = true
compute_api.workspace = true compute_api.workspace = true
futures.workspace = true futures.workspace = true

View File

@@ -3,8 +3,10 @@ use std::num::NonZero;
use std::sync::Arc; use std::sync::Arc;
use anyhow::anyhow; use anyhow::anyhow;
use arc_swap::ArcSwap;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::{FutureExt as _, StreamExt as _}; use futures::{FutureExt as _, StreamExt as _};
use tonic::codec::CompressionEncoding;
use tracing::instrument; use tracing::instrument;
use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool}; use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool};
@@ -55,28 +57,85 @@ const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero<usize> = NonZero::new(4).unwrap();
/// TODO: this client does not support base backups or LSN leases, as these are only used by /// TODO: this client does not support base backups or LSN leases, as these are only used by
/// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards. /// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards.
pub struct PageserverClient { pub struct PageserverClient {
// TODO: support swapping out the shard map, e.g. via an ArcSwap. /// The tenant ID.
shards: Shards, tenant_id: TenantId,
/// The timeline ID.
timeline_id: TimelineId,
/// The JWT auth token for this tenant, if any.
auth_token: Option<String>,
/// The compression to use, if any.
compression: Option<CompressionEncoding>,
/// The shards for this tenant.
shards: ArcSwap<Shards>,
/// The retry configuration.
retry: Retry, retry: Retry,
} }
impl PageserverClient { impl PageserverClient {
/// Creates a new Pageserver client for a given tenant and timeline. Uses the Pageservers given /// Creates a new Pageserver client for a given tenant and timeline. Uses the Pageservers given
/// in the shard map, which must be complete and must use gRPC URLs. /// in the shard spec, which must be complete and must use gRPC URLs.
pub fn new( pub fn new(
tenant_id: TenantId, tenant_id: TenantId,
timeline_id: TimelineId, timeline_id: TimelineId,
shard_map: HashMap<ShardIndex, String>, shard_spec: ShardSpec,
stripe_size: ShardStripeSize,
auth_token: Option<String>, auth_token: Option<String>,
compression: Option<CompressionEncoding>,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let shards = Shards::new(tenant_id, timeline_id, shard_map, stripe_size, auth_token)?; let shards = Shards::new(
tenant_id,
timeline_id,
shard_spec,
auth_token.clone(),
compression,
)?;
Ok(Self { Ok(Self {
shards, tenant_id,
timeline_id,
auth_token,
compression,
shards: ArcSwap::new(Arc::new(shards)),
retry: Retry, retry: Retry,
}) })
} }
/// Updates the shards from the given shard spec. In-flight requests will complete using the
/// existing shards, but may retry with the new shards if they fail.
///
/// TODO: verify that in-flight requests are allowed to complete, and that the old pools are
/// properly spun down and dropped afterwards.
pub fn update_shards(&self, shard_spec: ShardSpec) -> anyhow::Result<()> {
// Validate the shard spec. We should really use `ArcSwap::rcu` for this, to avoid races
// with concurrent updates, but that involves creating a new `Shards` on every attempt,
// which spins up a bunch of Tokio tasks and such. These should already be checked elsewhere
// in the stack, and if they're violated then we already have problems elsewhere, so a
// best-effort but possibly-racy check is okay here.
let old = self.shards.load_full();
if shard_spec.count < old.count {
return Err(anyhow!(
"can't reduce shard count from {} to {}",
old.count,
shard_spec.count
));
}
if !old.count.is_unsharded() && shard_spec.stripe_size != old.stripe_size {
return Err(anyhow!(
"can't change stripe size from {} to {}",
old.stripe_size,
shard_spec.stripe_size
));
}
let shards = Shards::new(
self.tenant_id,
self.timeline_id,
shard_spec,
self.auth_token.clone(),
self.compression,
)?;
self.shards.store(Arc::new(shards));
Ok(())
}
/// Returns whether a relation exists. /// Returns whether a relation exists.
#[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))] #[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))]
pub async fn check_rel_exists( pub async fn check_rel_exists(
@@ -84,9 +143,9 @@ impl PageserverClient {
req: page_api::CheckRelExistsRequest, req: page_api::CheckRelExistsRequest,
) -> tonic::Result<page_api::CheckRelExistsResponse> { ) -> tonic::Result<page_api::CheckRelExistsResponse> {
self.retry self.retry
.with(async || { .with(async |_| {
// Relation metadata is only available on shard 0. // Relation metadata is only available on shard 0.
let mut client = self.shards.get_zero().client().await?; let mut client = self.shards.load_full().get_zero().client().await?;
client.check_rel_exists(req).await client.check_rel_exists(req).await
}) })
.await .await
@@ -99,16 +158,17 @@ impl PageserverClient {
req: page_api::GetDbSizeRequest, req: page_api::GetDbSizeRequest,
) -> tonic::Result<page_api::GetDbSizeResponse> { ) -> tonic::Result<page_api::GetDbSizeResponse> {
self.retry self.retry
.with(async || { .with(async |_| {
// Relation metadata is only available on shard 0. // Relation metadata is only available on shard 0.
let mut client = self.shards.get_zero().client().await?; let mut client = self.shards.load_full().get_zero().client().await?;
client.get_db_size(req).await client.get_db_size(req).await
}) })
.await .await
} }
/// Fetches pages. The `request_id` must be unique across all in-flight requests. Automatically /// Fetches pages. The `request_id` must be unique across all in-flight requests, and the
/// splits requests that straddle shard boundaries, and assembles the responses. /// `attempt` must be 0 (incremented on retry). Automatically splits requests that straddle
/// shard boundaries, and assembles the responses.
/// ///
/// Unlike `page_api::Client`, this automatically converts `status_code` into `tonic::Status` /// Unlike `page_api::Client`, this automatically converts `status_code` into `tonic::Status`
/// errors. All responses will have `GetPageStatusCode::Ok`. /// errors. All responses will have `GetPageStatusCode::Ok`.
@@ -128,72 +188,96 @@ impl PageserverClient {
if req.block_numbers.is_empty() { if req.block_numbers.is_empty() {
return Err(tonic::Status::invalid_argument("no block number")); return Err(tonic::Status::invalid_argument("no block number"));
} }
// The request attempt must be 0. The client will increment it internally.
if req.request_id.attempt != 0 {
return Err(tonic::Status::invalid_argument("request attempt must be 0"));
}
// The shards may change while we're fetching pages. We execute the request using a stable
// view of the shards (especially important for requests that span shards), but retry the
// top-level (pre-split) request to pick up shard changes. This can lead to unnecessary
// retries and re-splits in some cases where requests span shards, but these are expected to
// be rare.
//
// TODO: the gRPC server and client doesn't yet properly support shard splits. Revisit this
// once we figure out how to handle these.
self.retry
.with(async |attempt| {
let mut req = req.clone();
req.request_id.attempt = attempt as u32;
Self::get_page_with_shards(req, &self.shards.load_full()).await
})
.await
}
/// Fetches pages using the given shards. This uses a stable view of the shards, regardless of
/// concurrent shard updates. Does not retry internally, but is retried by `get_page()`.
async fn get_page_with_shards(
req: page_api::GetPageRequest,
shards: &Shards,
) -> tonic::Result<page_api::GetPageResponse> {
// Fast path: request is for a single shard. // Fast path: request is for a single shard.
if let Some(shard_id) = if let Some(shard_id) =
GetPageSplitter::is_single_shard(&req, self.shards.count, self.shards.stripe_size) GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size)
{ {
return self.get_page_for_shard(shard_id, req).await; return Self::get_page_with_shard(req, shards.get(shard_id)?).await;
} }
// Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and // Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and
// reassemble the responses. // reassemble the responses.
// let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size);
// TODO: when we support shard map updates, we need to detect when it changes and re-split
// the request on errors.
let mut splitter = GetPageSplitter::split(req, self.shards.count, self.shards.stripe_size);
let mut shard_requests: FuturesUnordered<_> = splitter let mut shard_requests = FuturesUnordered::new();
.drain_requests() for (shard_id, shard_req) in splitter.drain_requests() {
.map(|(shard_id, shard_req)| { let future = Self::get_page_with_shard(shard_req, shards.get(shard_id)?)
// NB: each request will retry internally. .map(move |result| result.map(|resp| (shard_id, resp)));
self.get_page_for_shard(shard_id, shard_req) shard_requests.push(future);
.map(move |result| result.map(|resp| (shard_id, resp))) }
})
.collect();
while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? { while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? {
splitter.add_response(shard_id, shard_response)?; splitter.add_response(shard_id, shard_response)?;
} }
splitter.assemble_response() splitter.get_response()
} }
/// Fetches pages that belong to the given shard. /// Fetches pages on the given shard. Does not retry internally.
#[instrument(skip_all, fields(shard = %shard_id))] async fn get_page_with_shard(
async fn get_page_for_shard(
&self,
shard_id: ShardIndex,
req: page_api::GetPageRequest, req: page_api::GetPageRequest,
shard: &Shard,
) -> tonic::Result<page_api::GetPageResponse> { ) -> tonic::Result<page_api::GetPageResponse> {
let resp = self let stream = shard.stream(req.request_class.is_bulk()).await;
.retry let resp = stream.send(req.clone()).await?;
.with(async || {
let stream = self
.shards
.get(shard_id)?
.stream(req.request_class.is_bulk())
.await;
let resp = stream.send(req.clone()).await?;
// Convert per-request errors into a tonic::Status. // Convert per-request errors into a tonic::Status.
if resp.status_code != page_api::GetPageStatusCode::Ok { if resp.status_code != page_api::GetPageStatusCode::Ok {
return Err(tonic::Status::new( return Err(tonic::Status::new(
resp.status_code.into(), resp.status_code.into(),
resp.reason.unwrap_or_else(|| String::from("unknown error")), resp.reason.unwrap_or_else(|| String::from("unknown error")),
)); ));
} }
Ok(resp) // Check that we received the expected pages.
}) if req.rel != resp.rel {
.await?;
// Make sure we got the right number of pages.
// NB: check outside of the retry loop, since we don't want to retry this.
let (expected, actual) = (req.block_numbers.len(), resp.page_images.len());
if expected != actual {
return Err(tonic::Status::internal(format!( return Err(tonic::Status::internal(format!(
"expected {expected} pages for shard {shard_id}, got {actual}", "shard {} returned wrong relation, expected {} got {}",
shard.id, req.rel, resp.rel
)));
}
if !req
.block_numbers
.iter()
.copied()
.eq(resp.pages.iter().map(|p| p.block_number))
{
return Err(tonic::Status::internal(format!(
"shard {} returned wrong pages, expected {:?} got {:?}",
shard.id,
req.block_numbers,
resp.pages
.iter()
.map(|page| page.block_number)
.collect::<Vec<_>>()
))); )));
} }
@@ -207,9 +291,9 @@ impl PageserverClient {
req: page_api::GetRelSizeRequest, req: page_api::GetRelSizeRequest,
) -> tonic::Result<page_api::GetRelSizeResponse> { ) -> tonic::Result<page_api::GetRelSizeResponse> {
self.retry self.retry
.with(async || { .with(async |_| {
// Relation metadata is only available on shard 0. // Relation metadata is only available on shard 0.
let mut client = self.shards.get_zero().client().await?; let mut client = self.shards.load_full().get_zero().client().await?;
client.get_rel_size(req).await client.get_rel_size(req).await
}) })
.await .await
@@ -222,50 +306,53 @@ impl PageserverClient {
req: page_api::GetSlruSegmentRequest, req: page_api::GetSlruSegmentRequest,
) -> tonic::Result<page_api::GetSlruSegmentResponse> { ) -> tonic::Result<page_api::GetSlruSegmentResponse> {
self.retry self.retry
.with(async || { .with(async |_| {
// SLRU segments are only available on shard 0. // SLRU segments are only available on shard 0.
let mut client = self.shards.get_zero().client().await?; let mut client = self.shards.load_full().get_zero().client().await?;
client.get_slru_segment(req).await client.get_slru_segment(req).await
}) })
.await .await
} }
} }
/// Tracks the tenant's shards. /// Shard specification for a PageserverClient.
struct Shards { pub struct ShardSpec {
/// Maps shard indices to gRPC URLs.
///
/// INVARIANT: every shard 0..count is present, and shard 0 is always present.
/// INVARIANT: every URL is valid and uses grpc:// scheme.
urls: HashMap<ShardIndex, String>,
/// The shard count. /// The shard count.
/// ///
/// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention. /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
count: ShardCount, count: ShardCount,
/// The stripe size. Only used for sharded tenants. /// The stripe size for these shards.
stripe_size: ShardStripeSize, stripe_size: ShardStripeSize,
/// Shards by shard index.
///
/// NB: unsharded tenants use count 0, like `ShardIndex::unsharded()`.
///
/// INVARIANT: every shard 0..count is present.
/// INVARIANT: shard 0 is always present.
map: HashMap<ShardIndex, Shard>,
} }
impl Shards { impl ShardSpec {
/// Creates a new set of shards based on a shard map. /// Creates a new shard spec with the given URLs and stripe size. All shards must be given.
fn new( /// The stripe size may be omitted for unsharded tenants.
tenant_id: TenantId, pub fn new(
timeline_id: TimelineId, urls: HashMap<ShardIndex, String>,
shard_map: HashMap<ShardIndex, String>, stripe_size: Option<ShardStripeSize>,
stripe_size: ShardStripeSize,
auth_token: Option<String>,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let count = match shard_map.len() { // Compute the shard count.
let count = match urls.len() {
0 => return Err(anyhow!("no shards provided")), 0 => return Err(anyhow!("no shards provided")),
1 => ShardCount::new(0), // NB: unsharded tenants use 0, like `ShardIndex::unsharded()` 1 => ShardCount::new(0), // NB: unsharded tenants use 0, like `ShardIndex::unsharded()`
n if n > u8::MAX as usize => return Err(anyhow!("too many shards: {n}")), n if n > u8::MAX as usize => return Err(anyhow!("too many shards: {n}")),
n => ShardCount::new(n as u8), n => ShardCount::new(n as u8),
}; };
let mut map = HashMap::new(); // Determine the stripe size. It doesn't matter for unsharded tenants.
for (shard_id, url) in shard_map { if stripe_size.is_none() && !count.is_unsharded() {
return Err(anyhow!("stripe size must be given for sharded tenants"));
}
let stripe_size = stripe_size.unwrap_or_default();
// Validate the shard spec.
for (shard_id, url) in &urls {
// The shard index must match the computed shard count, even for unsharded tenants. // The shard index must match the computed shard count, even for unsharded tenants.
if shard_id.shard_count != count { if shard_id.shard_count != count {
return Err(anyhow!("invalid shard index {shard_id}, expected {count}")); return Err(anyhow!("invalid shard index {shard_id}, expected {count}"));
@@ -276,21 +363,72 @@ impl Shards {
} }
// The above conditions guarantee that we have all shards 0..count: len() matches count, // The above conditions guarantee that we have all shards 0..count: len() matches count,
// shard number < count, and numbers are unique (via hashmap). // shard number < count, and numbers are unique (via hashmap).
let shard = Shard::new(url, tenant_id, timeline_id, shard_id, auth_token.clone())?;
map.insert(shard_id, shard); // Validate the URL.
if PageserverProtocol::from_connstring(url)? != PageserverProtocol::Grpc {
return Err(anyhow!("invalid shard URL {url}: must use gRPC"));
}
} }
Ok(Self { Ok(Self {
urls,
count, count,
stripe_size, stripe_size,
map, })
}
}
/// Tracks the tenant's shards.
struct Shards {
/// Shards by shard index.
///
/// INVARIANT: every shard 0..count is present.
/// INVARIANT: shard 0 is always present.
by_index: HashMap<ShardIndex, Shard>,
/// The shard count.
///
/// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
count: ShardCount,
/// The stripe size. Only used for sharded tenants.
stripe_size: ShardStripeSize,
}
impl Shards {
/// Creates a new set of shards based on a shard spec.
fn new(
tenant_id: TenantId,
timeline_id: TimelineId,
shard_spec: ShardSpec,
auth_token: Option<String>,
compression: Option<CompressionEncoding>,
) -> anyhow::Result<Self> {
// NB: the shard spec has already been validated when constructed.
let mut shards = HashMap::with_capacity(shard_spec.urls.len());
for (shard_id, url) in shard_spec.urls {
shards.insert(
shard_id,
Shard::new(
url,
tenant_id,
timeline_id,
shard_id,
auth_token.clone(),
compression,
)?,
);
}
Ok(Self {
by_index: shards,
count: shard_spec.count,
stripe_size: shard_spec.stripe_size,
}) })
} }
/// Looks up the given shard. /// Looks up the given shard.
#[allow(clippy::result_large_err)] // TODO: check perf impact #[allow(clippy::result_large_err)] // TODO: check perf impact
fn get(&self, shard_id: ShardIndex) -> tonic::Result<&Shard> { fn get(&self, shard_id: ShardIndex) -> tonic::Result<&Shard> {
self.map self.by_index
.get(&shard_id) .get(&shard_id)
.ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}"))) .ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))
} }
@@ -312,6 +450,8 @@ impl Shards {
/// * Bulk client pool: unbounded. /// * Bulk client pool: unbounded.
/// * Bulk stream pool: MAX_BULK_STREAMS and MAX_BULK_STREAM_QUEUE_DEPTH. /// * Bulk stream pool: MAX_BULK_STREAMS and MAX_BULK_STREAM_QUEUE_DEPTH.
struct Shard { struct Shard {
/// The shard ID.
id: ShardIndex,
/// Unary gRPC client pool. /// Unary gRPC client pool.
client_pool: Arc<ClientPool>, client_pool: Arc<ClientPool>,
/// GetPage stream pool. /// GetPage stream pool.
@@ -328,12 +468,8 @@ impl Shard {
timeline_id: TimelineId, timeline_id: TimelineId,
shard_id: ShardIndex, shard_id: ShardIndex,
auth_token: Option<String>, auth_token: Option<String>,
compression: Option<CompressionEncoding>,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
// Sanity-check that the URL uses gRPC.
if PageserverProtocol::from_connstring(&url)? != PageserverProtocol::Grpc {
return Err(anyhow!("invalid shard URL {url}: must use gRPC"));
}
// Common channel pool for unary and stream requests. Bounded by client/stream pools. // Common channel pool for unary and stream requests. Bounded by client/stream pools.
let channel_pool = ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?; let channel_pool = ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?;
@@ -344,6 +480,7 @@ impl Shard {
timeline_id, timeline_id,
shard_id, shard_id,
auth_token.clone(), auth_token.clone(),
compression,
Some(MAX_UNARY_CLIENTS), Some(MAX_UNARY_CLIENTS),
); );
@@ -356,6 +493,7 @@ impl Shard {
timeline_id, timeline_id,
shard_id, shard_id,
auth_token.clone(), auth_token.clone(),
compression,
None, // unbounded, limited by stream pool None, // unbounded, limited by stream pool
), ),
Some(MAX_STREAMS), Some(MAX_STREAMS),
@@ -371,6 +509,7 @@ impl Shard {
timeline_id, timeline_id,
shard_id, shard_id,
auth_token, auth_token,
compression,
None, // unbounded, limited by stream pool None, // unbounded, limited by stream pool
), ),
Some(MAX_BULK_STREAMS), Some(MAX_BULK_STREAMS),
@@ -378,6 +517,7 @@ impl Shard {
); );
Ok(Self { Ok(Self {
id: shard_id,
client_pool, client_pool,
stream_pool, stream_pool,
bulk_stream_pool, bulk_stream_pool,

View File

@@ -3,4 +3,4 @@ mod pool;
mod retry; mod retry;
mod split; mod split;
pub use client::PageserverClient; pub use client::{PageserverClient, ShardSpec};

View File

@@ -40,6 +40,7 @@ use futures::StreamExt as _;
use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot}; use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tonic::codec::CompressionEncoding;
use tonic::transport::{Channel, Endpoint}; use tonic::transport::{Channel, Endpoint};
use tracing::{error, warn}; use tracing::{error, warn};
@@ -242,6 +243,8 @@ pub struct ClientPool {
shard_id: ShardIndex, shard_id: ShardIndex,
/// Authentication token, if any. /// Authentication token, if any.
auth_token: Option<String>, auth_token: Option<String>,
/// Compression to use.
compression: Option<CompressionEncoding>,
/// Channel pool to acquire channels from. /// Channel pool to acquire channels from.
channel_pool: Arc<ChannelPool>, channel_pool: Arc<ChannelPool>,
/// Limits the max number of concurrent clients for this pool. None if the pool is unbounded. /// Limits the max number of concurrent clients for this pool. None if the pool is unbounded.
@@ -281,6 +284,7 @@ impl ClientPool {
timeline_id: TimelineId, timeline_id: TimelineId,
shard_id: ShardIndex, shard_id: ShardIndex,
auth_token: Option<String>, auth_token: Option<String>,
compression: Option<CompressionEncoding>,
max_clients: Option<NonZero<usize>>, max_clients: Option<NonZero<usize>>,
) -> Arc<Self> { ) -> Arc<Self> {
let pool = Arc::new(Self { let pool = Arc::new(Self {
@@ -288,6 +292,7 @@ impl ClientPool {
timeline_id, timeline_id,
shard_id, shard_id,
auth_token, auth_token,
compression,
channel_pool, channel_pool,
idle: Mutex::default(), idle: Mutex::default(),
idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL), idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
@@ -331,7 +336,7 @@ impl ClientPool {
self.timeline_id, self.timeline_id,
self.shard_id, self.shard_id,
self.auth_token.clone(), self.auth_token.clone(),
None, self.compression,
)?; )?;
Ok(ClientGuard { Ok(ClientGuard {
@@ -586,6 +591,10 @@ impl StreamPool {
// Track caller response channels by request ID. If the task returns early, these response // Track caller response channels by request ID. If the task returns early, these response
// channels will be dropped and the waiting callers will receive an error. // channels will be dropped and the waiting callers will receive an error.
//
// NB: this will leak entries if the server doesn't respond to a request (by request ID).
// It shouldn't happen, and if it does it will often hold onto queue depth quota anyway and
// block further use. But we could consider reaping closed channels after some time.
let mut callers = HashMap::new(); let mut callers = HashMap::new();
// Process requests and responses. // Process requests and responses.
@@ -690,6 +699,15 @@ impl Drop for StreamGuard {
// Release the queue depth reservation on drop. This can prematurely decrement it if dropped // Release the queue depth reservation on drop. This can prematurely decrement it if dropped
// before the response is received, but that's okay. // before the response is received, but that's okay.
//
// TODO: actually, it's probably not okay. Queue depth release should be moved into the
// stream task, such that it continues to account for the queue depth slot until the server
// responds. Otherwise, if a slow request times out and keeps blocking the stream, the
// server will keep waiting on it and we can pile on subsequent requests (including the
// timeout retry) in the same stream and get blocked. But we may also want to avoid blocking
// requests on e.g. LSN waits and layer downloads, instead returning early to free up the
// stream. Or just scale out streams with a queue depth of 1 to sidestep all head-of-line
// blocking. TBD.
let mut streams = pool.streams.lock().unwrap(); let mut streams = pool.streams.lock().unwrap();
let entry = streams.get_mut(&self.id).expect("unknown stream"); let entry = streams.get_mut(&self.id).expect("unknown stream");
assert!(entry.idle_since.is_none(), "active stream marked idle"); assert!(entry.idle_since.is_none(), "active stream marked idle");

View File

@@ -23,14 +23,14 @@ impl Retry {
/// If true, log successful requests. For debugging. /// If true, log successful requests. For debugging.
const LOG_SUCCESS: bool = false; const LOG_SUCCESS: bool = false;
/// Runs the given async closure with timeouts and retries (exponential backoff). Logs errors, /// Runs the given async closure with timeouts and retries (exponential backoff), passing the
/// using the current tracing span for context. /// attempt number starting at 0. Logs errors, using the current tracing span for context.
/// ///
/// Only certain gRPC status codes are retried, see [`Self::should_retry`]. For default /// Only certain gRPC status codes are retried, see [`Self::should_retry`]. For default
/// timeouts, see [`Self::REQUEST_TIMEOUT`] and [`Self::TOTAL_TIMEOUT`]. /// timeouts, see [`Self::REQUEST_TIMEOUT`] and [`Self::TOTAL_TIMEOUT`].
pub async fn with<T, F, O>(&self, mut f: F) -> tonic::Result<T> pub async fn with<T, F, O>(&self, mut f: F) -> tonic::Result<T>
where where
F: FnMut() -> O, F: FnMut(usize) -> O, // takes attempt number, starting at 0
O: Future<Output = tonic::Result<T>>, O: Future<Output = tonic::Result<T>>,
{ {
let started = Instant::now(); let started = Instant::now();
@@ -47,7 +47,7 @@ impl Retry {
} }
let request_started = Instant::now(); let request_started = Instant::now();
tokio::time::timeout(Self::REQUEST_TIMEOUT, f()) tokio::time::timeout(Self::REQUEST_TIMEOUT, f(retries))
.await .await
.map_err(|_| { .map_err(|_| {
tonic::Status::deadline_exceeded(format!( tonic::Status::deadline_exceeded(format!(
@@ -131,7 +131,6 @@ impl Retry {
tonic::Code::Aborted => true, tonic::Code::Aborted => true,
tonic::Code::Cancelled => true, tonic::Code::Cancelled => true,
tonic::Code::DeadlineExceeded => true, // maybe transient slowness tonic::Code::DeadlineExceeded => true, // maybe transient slowness
tonic::Code::Internal => true, // maybe transient failure?
tonic::Code::ResourceExhausted => true, tonic::Code::ResourceExhausted => true,
tonic::Code::Unavailable => true, tonic::Code::Unavailable => true,
@@ -139,6 +138,10 @@ impl Retry {
tonic::Code::AlreadyExists => false, tonic::Code::AlreadyExists => false,
tonic::Code::DataLoss => false, tonic::Code::DataLoss => false,
tonic::Code::FailedPrecondition => false, tonic::Code::FailedPrecondition => false,
// NB: don't retry Internal. It is intended for serious errors such as invariant
// violations, and is also used for client-side invariant checks that would otherwise
// result in retry loops.
tonic::Code::Internal => false,
tonic::Code::InvalidArgument => false, tonic::Code::InvalidArgument => false,
tonic::Code::NotFound => false, tonic::Code::NotFound => false,
tonic::Code::OutOfRange => false, tonic::Code::OutOfRange => false,

View File

@@ -5,27 +5,24 @@ use bytes::Bytes;
use pageserver_api::key::rel_block_to_key; use pageserver_api::key::rel_block_to_key;
use pageserver_api::shard::{ShardStripeSize, key_to_shard_number}; use pageserver_api::shard::{ShardStripeSize, key_to_shard_number};
use pageserver_page_api as page_api; use pageserver_page_api as page_api;
use utils::shard::{ShardCount, ShardIndex}; use utils::shard::{ShardCount, ShardIndex, ShardNumber};
/// Splits GetPageRequests that straddle shard boundaries and assembles the responses. /// Splits GetPageRequests that straddle shard boundaries and assembles the responses.
/// TODO: add tests for this. /// TODO: add tests for this.
pub struct GetPageSplitter { pub struct GetPageSplitter {
/// The original request ID. Used for all shard requests.
request_id: page_api::RequestID,
/// Split requests by shard index. /// Split requests by shard index.
requests: HashMap<ShardIndex, page_api::GetPageRequest>, requests: HashMap<ShardIndex, page_api::GetPageRequest>,
/// Maps the offset in `GetPageRequest::block_numbers` to the owning shard. Used to assemble /// The response being assembled. Preallocated with empty pages, to be filled in.
/// the response pages in the same order as the original request. response: page_api::GetPageResponse,
/// Maps the offset in `request.block_numbers` and `response.pages` to the owning shard. Used
/// to assemble the response pages in the same order as the original request.
block_shards: Vec<ShardIndex>, block_shards: Vec<ShardIndex>,
/// Page responses by shard index. Will be assembled into a single response.
responses: HashMap<ShardIndex, Vec<Bytes>>,
} }
impl GetPageSplitter { impl GetPageSplitter {
/// Checks if the given request only touches a single shard, and returns the shard ID. This is /// Checks if the given request only touches a single shard, and returns the shard ID. This is
/// the common case, so we check first in order to avoid unnecessary allocations and overhead. /// the common case, so we check first in order to avoid unnecessary allocations and overhead.
/// The caller must ensure that the request has at least one block number, or this will panic. pub fn for_single_shard(
pub fn is_single_shard(
req: &page_api::GetPageRequest, req: &page_api::GetPageRequest,
count: ShardCount, count: ShardCount,
stripe_size: ShardStripeSize, stripe_size: ShardStripeSize,
@@ -35,8 +32,12 @@ impl GetPageSplitter {
return Some(ShardIndex::unsharded()); return Some(ShardIndex::unsharded());
} }
// Find the base shard index for the first page, and compare with the rest. // Find the first page's shard, for comparison. If there are no pages, just return the first
let key = rel_block_to_key(req.rel, *req.block_numbers.first().expect("no pages")); // shard (caller likely checked already, otherwise the server will reject it).
let Some(&first_page) = req.block_numbers.first() else {
return Some(ShardIndex::new(ShardNumber(0), count));
};
let key = rel_block_to_key(req.rel, first_page);
let shard_number = key_to_shard_number(count, stripe_size, &key); let shard_number = key_to_shard_number(count, stripe_size, &key);
req.block_numbers req.block_numbers
@@ -57,19 +58,19 @@ impl GetPageSplitter {
) -> Self { ) -> Self {
// The caller should make sure we don't split requests unnecessarily. // The caller should make sure we don't split requests unnecessarily.
debug_assert!( debug_assert!(
Self::is_single_shard(&req, count, stripe_size).is_none(), Self::for_single_shard(&req, count, stripe_size).is_none(),
"unnecessary request split" "unnecessary request split"
); );
// Split the requests by shard index. // Split the requests by shard index.
let mut requests = HashMap::with_capacity(2); // common case let mut requests = HashMap::with_capacity(2); // common case
let mut block_shards = Vec::with_capacity(req.block_numbers.len()); let mut block_shards = Vec::with_capacity(req.block_numbers.len());
for blkno in req.block_numbers { for &blkno in &req.block_numbers {
let key = rel_block_to_key(req.rel, blkno); let key = rel_block_to_key(req.rel, blkno);
let shard_number = key_to_shard_number(count, stripe_size, &key); let shard_number = key_to_shard_number(count, stripe_size, &key);
let shard_id = ShardIndex::new(shard_number, count); let shard_id = ShardIndex::new(shard_number, count);
let shard_req = requests requests
.entry(shard_id) .entry(shard_id)
.or_insert_with(|| page_api::GetPageRequest { .or_insert_with(|| page_api::GetPageRequest {
request_id: req.request_id, request_id: req.request_id,
@@ -77,27 +78,47 @@ impl GetPageSplitter {
rel: req.rel, rel: req.rel,
read_lsn: req.read_lsn, read_lsn: req.read_lsn,
block_numbers: Vec::new(), block_numbers: Vec::new(),
}); })
shard_req.block_numbers.push(blkno); .block_numbers
.push(blkno);
block_shards.push(shard_id); block_shards.push(shard_id);
} }
Self { // Construct a response to be populated by shard responses. Preallocate empty page slots
// with the expected block numbers.
let response = page_api::GetPageResponse {
request_id: req.request_id, request_id: req.request_id,
responses: HashMap::with_capacity(requests.len()), status_code: page_api::GetPageStatusCode::Ok,
reason: None,
rel: req.rel,
pages: req
.block_numbers
.into_iter()
.map(|block_number| {
page_api::Page {
block_number,
image: Bytes::new(), // empty page slot to be filled in
}
})
.collect(),
};
Self {
requests, requests,
response,
block_shards, block_shards,
} }
} }
/// Drains the per-shard requests, moving them out of the hashmap to avoid extra allocations. /// Drains the per-shard requests, moving them out of the splitter to avoid extra allocations.
pub fn drain_requests( pub fn drain_requests(
&mut self, &mut self,
) -> impl Iterator<Item = (ShardIndex, page_api::GetPageRequest)> { ) -> impl Iterator<Item = (ShardIndex, page_api::GetPageRequest)> {
self.requests.drain() self.requests.drain()
} }
/// Adds a response from the given shard. /// Adds a response from the given shard. The response must match the request ID and have an OK
/// status code. A response must not already exist for the given shard ID.
#[allow(clippy::result_large_err)] #[allow(clippy::result_large_err)]
pub fn add_response( pub fn add_response(
&mut self, &mut self,
@@ -105,68 +126,84 @@ impl GetPageSplitter {
response: page_api::GetPageResponse, response: page_api::GetPageResponse,
) -> tonic::Result<()> { ) -> tonic::Result<()> {
// The caller should already have converted status codes into tonic::Status. // The caller should already have converted status codes into tonic::Status.
assert_eq!(response.status_code, page_api::GetPageStatusCode::Ok); if response.status_code != page_api::GetPageStatusCode::Ok {
// Make sure the response matches the request ID.
if response.request_id != self.request_id {
return Err(tonic::Status::internal(format!( return Err(tonic::Status::internal(format!(
"response ID {} does not match request ID {}", "unexpected non-OK response for shard {shard_id}: {} {}",
response.request_id, self.request_id response.status_code,
response.reason.unwrap_or_default()
))); )));
} }
// Add the response data to the map. if response.request_id != self.response.request_id {
let old = self.responses.insert(shard_id, response.page_images);
if old.is_some() {
return Err(tonic::Status::internal(format!( return Err(tonic::Status::internal(format!(
"duplicate response for shard {shard_id}", "response ID mismatch for shard {shard_id}: expected {}, got {}",
self.response.request_id, response.request_id
)));
}
// Place the shard response pages into the assembled response, in request order.
let mut pages = response.pages.into_iter();
for (i, &s) in self.block_shards.iter().enumerate() {
if shard_id != s {
continue;
}
let Some(slot) = self.response.pages.get_mut(i) else {
return Err(tonic::Status::internal(format!(
"no block_shards slot {i} for shard {shard_id}"
)));
};
let Some(page) = pages.next() else {
return Err(tonic::Status::internal(format!(
"missing page {} in shard {shard_id} response",
slot.block_number
)));
};
if page.block_number != slot.block_number {
return Err(tonic::Status::internal(format!(
"shard {shard_id} returned wrong page at index {i}, expected {} got {}",
slot.block_number, page.block_number
)));
}
if !slot.image.is_empty() {
return Err(tonic::Status::internal(format!(
"shard {shard_id} returned duplicate page {} at index {i}",
slot.block_number
)));
}
*slot = page;
}
// Make sure we've consumed all pages from the shard response.
if let Some(extra_page) = pages.next() {
return Err(tonic::Status::internal(format!(
"shard {shard_id} returned extra page: {}",
extra_page.block_number
))); )));
} }
Ok(()) Ok(())
} }
/// Assembles the shard responses into a single response. Responses must be present for all /// Fetches the final, assembled response.
/// relevant shards, and the total number of pages must match the original request.
#[allow(clippy::result_large_err)] #[allow(clippy::result_large_err)]
pub fn assemble_response(self) -> tonic::Result<page_api::GetPageResponse> { pub fn get_response(self) -> tonic::Result<page_api::GetPageResponse> {
let mut response = page_api::GetPageResponse { // Check that the response is complete.
request_id: self.request_id, for (i, page) in self.response.pages.iter().enumerate() {
status_code: page_api::GetPageStatusCode::Ok, if page.image.is_empty() {
reason: None,
page_images: Vec::with_capacity(self.block_shards.len()),
};
// Set up per-shard page iterators we can pull from.
let mut shard_responses = HashMap::with_capacity(self.responses.len());
for (shard_id, responses) in self.responses {
shard_responses.insert(shard_id, responses.into_iter());
}
// Reassemble the responses in the same order as the original request.
for shard_id in &self.block_shards {
let page = shard_responses
.get_mut(shard_id)
.ok_or_else(|| {
tonic::Status::internal(format!("missing response for shard {shard_id}"))
})?
.next()
.ok_or_else(|| {
tonic::Status::internal(format!("missing page from shard {shard_id}"))
})?;
response.page_images.push(page);
}
// Make sure there are no additional pages.
for (shard_id, mut pages) in shard_responses {
if pages.next().is_some() {
return Err(tonic::Status::internal(format!( return Err(tonic::Status::internal(format!(
"extra pages returned from shard {shard_id}" "missing page {} for shard {}",
page.block_number,
self.block_shards
.get(i)
.map(|s| s.to_string())
.unwrap_or_else(|| "?".to_string())
))); )));
} }
} }
Ok(response) Ok(self.response)
} }
} }

View File

@@ -17,6 +17,7 @@ pageserver = { path = ".." }
pageserver_api.workspace = true pageserver_api.workspace = true
remote_storage = { path = "../../libs/remote_storage" } remote_storage = { path = "../../libs/remote_storage" }
postgres_ffi.workspace = true postgres_ffi.workspace = true
serde.workspace = true
thiserror.workspace = true thiserror.workspace = true
tokio.workspace = true tokio.workspace = true
tokio-util.workspace = true tokio-util.workspace = true

View File

@@ -0,0 +1,85 @@
use camino::Utf8PathBuf;
use clap::Parser;
use tokio_util::sync::CancellationToken;
/// Download a specific object from remote storage to a local file.
///
/// The remote storage configuration is supplied via the `REMOTE_STORAGE_CONFIG` environment
/// variable, in the same TOML format that the pageserver itself understands. This allows the
/// command to work with any cloud supported by the `remote_storage` crate (currently AWS S3,
/// Azure Blob Storage and local files), as long as the credentials are available via the
/// standard environment variables expected by the underlying SDKs.
///
/// Examples for setting the environment variable:
///
/// ```bash
/// # AWS S3 (region can also be provided via AWS_REGION)
/// export REMOTE_STORAGE_CONFIG='remote_storage = { bucket_name = "my-bucket", bucket_region = "us-east-2" }'
///
/// # Azure Blob Storage (account key picked up from AZURE_STORAGE_ACCOUNT_KEY)
/// export REMOTE_STORAGE_CONFIG='remote_storage = { container = "my-container", account = "my-account" }'
/// ```
#[derive(Parser)]
pub(crate) struct DownloadRemoteObjectCmd {
/// Key / path of the object to download (relative to the remote storage prefix).
///
/// Examples:
/// "wal/3aa8f.../00000001000000000000000A"
/// "pageserver/v1/tenants/<tenant_id>/timelines/<timeline_id>/layer_12345"
pub remote_path: String,
/// Path of the local file to create. Existing file will be overwritten.
///
/// Examples:
/// "./segment"
/// "/tmp/layer_12345.parquet"
pub output_file: Utf8PathBuf,
}
pub(crate) async fn main(cmd: &DownloadRemoteObjectCmd) -> anyhow::Result<()> {
use remote_storage::{DownloadOpts, GenericRemoteStorage, RemotePath, RemoteStorageConfig};
// Fetch remote storage configuration from the environment
let config_str = std::env::var("REMOTE_STORAGE_CONFIG").map_err(|_| {
anyhow::anyhow!(
"'REMOTE_STORAGE_CONFIG' environment variable must be set to a valid remote storage TOML config"
)
})?;
let config = RemoteStorageConfig::from_toml_str(&config_str)?;
// Initialise remote storage client
let storage = GenericRemoteStorage::from_config(&config).await?;
// RemotePath must be relative leading slashes confuse the parser.
let remote_path_str = cmd.remote_path.trim_start_matches('/');
let remote_path = RemotePath::from_string(remote_path_str)?;
let cancel = CancellationToken::new();
println!(
"Downloading '{remote_path}' from remote storage bucket {:?} ...",
config.storage.bucket_name()
);
// Start the actual download
let download = storage
.download(&remote_path, &DownloadOpts::default(), &cancel)
.await?;
// Stream to file
let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
let tmp_path = cmd.output_file.with_extension("tmp");
let mut file = tokio::fs::File::create(&tmp_path).await?;
tokio::io::copy(&mut reader, &mut file).await?;
file.sync_all().await?;
// Atomically move into place
tokio::fs::rename(&tmp_path, &cmd.output_file).await?;
println!(
"Downloaded to '{}'. Last modified: {:?}, etag: {}",
cmd.output_file, download.last_modified, download.etag
);
Ok(())
}

View File

@@ -1,14 +1,16 @@
use std::str::FromStr; use std::str::FromStr;
use anyhow::Context; use anyhow::{Context, Ok};
use camino::Utf8PathBuf; use camino::Utf8PathBuf;
use pageserver::tenant::{ use pageserver::tenant::{
IndexPart, IndexPart,
layer_map::{LayerMap, SearchResult}, layer_map::{LayerMap, SearchResult},
remote_timeline_client::remote_layer_path, remote_timeline_client::{index::LayerFileMetadata, remote_layer_path},
storage_layer::{PersistentLayerDesc, ReadableLayerWeak}, storage_layer::{LayerName, LayerVisibilityHint, PersistentLayerDesc, ReadableLayerWeak},
}; };
use pageserver_api::key::Key; use pageserver_api::key::Key;
use serde::Serialize;
use std::collections::BTreeMap;
use utils::{ use utils::{
id::{TenantId, TimelineId}, id::{TenantId, TimelineId},
lsn::Lsn, lsn::Lsn,
@@ -33,6 +35,31 @@ pub(crate) enum IndexPartCmd {
#[arg(long)] #[arg(long)]
lsn: String, lsn: String,
}, },
/// List all visible delta and image layers at the latest LSN.
ListVisibleLayers {
#[arg(long)]
path: Utf8PathBuf,
},
}
fn create_layer_map_from_index_part(
index_part: &IndexPart,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> LayerMap {
let mut layer_map = LayerMap::default();
{
let mut updates = layer_map.batch_update();
for (key, value) in index_part.layer_metadata.iter() {
updates.insert_historic(PersistentLayerDesc::from_filename(
tenant_shard_id,
timeline_id,
key.clone(),
value.file_size,
));
}
}
layer_map
} }
async fn search_layers( async fn search_layers(
@@ -49,18 +76,7 @@ async fn search_layers(
let bytes = tokio::fs::read(path).await?; let bytes = tokio::fs::read(path).await?;
IndexPart::from_json_bytes(&bytes).unwrap() IndexPart::from_json_bytes(&bytes).unwrap()
}; };
let mut layer_map = LayerMap::default(); let layer_map = create_layer_map_from_index_part(&index_json, tenant_shard_id, timeline_id);
{
let mut updates = layer_map.batch_update();
for (key, value) in index_json.layer_metadata.iter() {
updates.insert_historic(PersistentLayerDesc::from_filename(
tenant_shard_id,
timeline_id,
key.clone(),
value.file_size,
));
}
}
let key = Key::from_hex(key)?; let key = Key::from_hex(key)?;
let lsn = Lsn::from_str(lsn).unwrap(); let lsn = Lsn::from_str(lsn).unwrap();
@@ -98,6 +114,69 @@ async fn search_layers(
Ok(()) Ok(())
} }
#[derive(Debug, Clone, Serialize)]
struct VisibleLayers {
pub total_images: u64,
pub total_image_bytes: u64,
pub total_deltas: u64,
pub total_delta_bytes: u64,
pub layer_metadata: BTreeMap<LayerName, LayerFileMetadata>,
}
impl VisibleLayers {
pub fn new() -> Self {
Self {
layer_metadata: BTreeMap::new(),
total_images: 0,
total_image_bytes: 0,
total_deltas: 0,
total_delta_bytes: 0,
}
}
pub fn add_layer(&mut self, name: LayerName, layer: LayerFileMetadata) {
match name {
LayerName::Image(_) => {
self.total_images += 1;
self.total_image_bytes += layer.file_size;
}
LayerName::Delta(_) => {
self.total_deltas += 1;
self.total_delta_bytes += layer.file_size;
}
}
self.layer_metadata.insert(name, layer);
}
}
async fn list_visible_layers(path: &Utf8PathBuf) -> anyhow::Result<()> {
let tenant_id = TenantId::generate();
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let timeline_id = TimelineId::generate();
let bytes = tokio::fs::read(path).await.context("read file")?;
let index_part = IndexPart::from_json_bytes(&bytes).context("deserialize")?;
let layer_map = create_layer_map_from_index_part(&index_part, tenant_shard_id, timeline_id);
let mut visible_layers = VisibleLayers::new();
let (layers, _key_space) = layer_map.get_visibility(Vec::new());
for (layer, visibility) in layers {
if visibility == LayerVisibilityHint::Visible {
visible_layers.add_layer(
layer.layer_name(),
index_part
.layer_metadata
.get(&layer.layer_name())
.unwrap()
.clone(),
);
}
}
let output = serde_json::to_string_pretty(&visible_layers).context("serialize output")?;
println!("{output}");
Ok(())
}
pub(crate) async fn main(cmd: &IndexPartCmd) -> anyhow::Result<()> { pub(crate) async fn main(cmd: &IndexPartCmd) -> anyhow::Result<()> {
match cmd { match cmd {
IndexPartCmd::Dump { path } => { IndexPartCmd::Dump { path } => {
@@ -114,5 +193,6 @@ pub(crate) async fn main(cmd: &IndexPartCmd) -> anyhow::Result<()> {
key, key,
lsn, lsn,
} => search_layers(tenant_id, timeline_id, path, key, lsn).await, } => search_layers(tenant_id, timeline_id, path, key, lsn).await,
IndexPartCmd::ListVisibleLayers { path } => list_visible_layers(path).await,
} }
} }

View File

@@ -4,6 +4,7 @@
//! //!
//! Separate, `metadata` subcommand allows to print and update pageserver's metadata file. //! Separate, `metadata` subcommand allows to print and update pageserver's metadata file.
mod download_remote_object;
mod draw_timeline_dir; mod draw_timeline_dir;
mod index_part; mod index_part;
mod key; mod key;
@@ -16,6 +17,7 @@ use std::time::{Duration, SystemTime};
use camino::{Utf8Path, Utf8PathBuf}; use camino::{Utf8Path, Utf8PathBuf};
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use download_remote_object::DownloadRemoteObjectCmd;
use index_part::IndexPartCmd; use index_part::IndexPartCmd;
use layers::LayerCmd; use layers::LayerCmd;
use page_trace::PageTraceCmd; use page_trace::PageTraceCmd;
@@ -63,6 +65,7 @@ enum Commands {
/// Debug print a hex key found from logs /// Debug print a hex key found from logs
Key(key::DescribeKeyCommand), Key(key::DescribeKeyCommand),
PageTrace(PageTraceCmd), PageTrace(PageTraceCmd),
DownloadRemoteObject(DownloadRemoteObjectCmd),
} }
/// Read and update pageserver metadata file /// Read and update pageserver metadata file
@@ -185,6 +188,9 @@ async fn main() -> anyhow::Result<()> {
} }
Commands::Key(dkc) => dkc.execute(), Commands::Key(dkc) => dkc.execute(),
Commands::PageTrace(cmd) => page_trace::main(&cmd)?, Commands::PageTrace(cmd) => page_trace::main(&cmd)?,
Commands::DownloadRemoteObject(cmd) => {
download_remote_object::main(&cmd).await?;
}
}; };
Ok(()) Ok(())
} }

View File

@@ -165,7 +165,7 @@ message GetDbSizeResponse {
message GetPageRequest { message GetPageRequest {
// A request ID. Will be included in the response. Should be unique for // A request ID. Will be included in the response. Should be unique for
// in-flight requests on the stream. // in-flight requests on the stream.
uint64 request_id = 1; RequestID request_id = 1;
// The request class. // The request class.
GetPageClass request_class = 2; GetPageClass request_class = 2;
// The LSN to read at. // The LSN to read at.
@@ -189,6 +189,14 @@ message GetPageRequest {
repeated uint32 block_number = 5; repeated uint32 block_number = 5;
} }
// A Request ID. Should be unique for in-flight requests on a stream. Included in the response.
message RequestID {
// The base request ID.
uint64 id = 1;
// The request attempt. Starts at 0, incremented on each retry.
uint32 attempt = 2;
}
// A GetPageRequest class. Primarily intended for observability, but may also be // A GetPageRequest class. Primarily intended for observability, but may also be
// used for prioritization in the future. // used for prioritization in the future.
enum GetPageClass { enum GetPageClass {
@@ -211,13 +219,26 @@ enum GetPageClass {
// the entire batch is ready, so no one can make use of the individual pages. // the entire batch is ready, so no one can make use of the individual pages.
message GetPageResponse { message GetPageResponse {
// The original request's ID. // The original request's ID.
uint64 request_id = 1; RequestID request_id = 1;
// The response status code. // The response status code. If not OK, the rel and page fields will be empty.
GetPageStatusCode status_code = 2; GetPageStatusCode status_code = 2;
// A string describing the status, if any. // A string describing the status, if any.
string reason = 3; string reason = 3;
// The 8KB page images, in the same order as the request. Empty if status_code != OK. // The relation that the pages belong to.
repeated bytes page_image = 4; RelTag rel = 4;
// The page(s), in the same order as the request.
repeated Page page = 5;
}
// A page.
//
// TODO: it would be slightly more efficient (but less convenient) to have separate arrays of block
// numbers and images, but given the 8KB page size it's probably negligible. Benchmark it anyway.
message Page {
// The page number.
uint32 block_number = 1;
// The materialized page image, as an 8KB byte vector.
bytes image = 2;
} }
// A GetPageResponse status code. // A GetPageResponse status code.

View File

@@ -1,4 +1,5 @@
use anyhow::Context as _; use anyhow::Context as _;
use futures::future::ready;
use futures::{Stream, StreamExt as _, TryStreamExt as _}; use futures::{Stream, StreamExt as _, TryStreamExt as _};
use tokio::io::AsyncRead; use tokio::io::AsyncRead;
use tokio_util::io::StreamReader; use tokio_util::io::StreamReader;
@@ -110,7 +111,7 @@ impl Client {
) -> tonic::Result<impl Stream<Item = tonic::Result<GetPageResponse>> + Send + 'static> { ) -> tonic::Result<impl Stream<Item = tonic::Result<GetPageResponse>> + Send + 'static> {
let reqs = reqs.map(proto::GetPageRequest::from); let reqs = reqs.map(proto::GetPageRequest::from);
let resps = self.inner.get_pages(reqs).await?.into_inner(); let resps = self.inner.get_pages(reqs).await?.into_inner();
Ok(resps.map_ok(GetPageResponse::from)) Ok(resps.and_then(|resp| ready(GetPageResponse::try_from(resp).map_err(|err| err.into()))))
} }
/// Returns the size of a relation, as # of blocks. /// Returns the size of a relation, as # of blocks.

View File

@@ -359,7 +359,10 @@ impl TryFrom<proto::GetPageRequest> for GetPageRequest {
return Err(ProtocolError::Missing("block_number")); return Err(ProtocolError::Missing("block_number"));
} }
Ok(Self { Ok(Self {
request_id: pb.request_id, request_id: pb
.request_id
.ok_or(ProtocolError::Missing("request_id"))?
.into(),
request_class: pb.request_class.into(), request_class: pb.request_class.into(),
read_lsn: pb read_lsn: pb
.read_lsn .read_lsn
@@ -374,7 +377,7 @@ impl TryFrom<proto::GetPageRequest> for GetPageRequest {
impl From<GetPageRequest> for proto::GetPageRequest { impl From<GetPageRequest> for proto::GetPageRequest {
fn from(request: GetPageRequest) -> Self { fn from(request: GetPageRequest) -> Self {
Self { Self {
request_id: request.request_id, request_id: Some(request.request_id.into()),
request_class: request.request_class.into(), request_class: request.request_class.into(),
read_lsn: Some(request.read_lsn.into()), read_lsn: Some(request.read_lsn.into()),
rel: Some(request.rel.into()), rel: Some(request.rel.into()),
@@ -383,8 +386,51 @@ impl From<GetPageRequest> for proto::GetPageRequest {
} }
} }
/// A GetPage request ID. /// A GetPage request ID and retry attempt. Should be unique for in-flight requests on a stream.
pub type RequestID = u64; #[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct RequestID {
/// The base request ID.
pub id: u64,
// The request attempt. Starts at 0, incremented on each retry.
pub attempt: u32,
}
impl RequestID {
/// Creates a new RequestID with the given ID and an initial attempt of 0.
pub fn new(id: u64) -> Self {
Self { id, attempt: 0 }
}
}
impl Display for RequestID {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}", self.id, self.attempt)
}
}
impl From<proto::RequestId> for RequestID {
fn from(pb: proto::RequestId) -> Self {
Self {
id: pb.id,
attempt: pb.attempt,
}
}
}
impl From<u64> for RequestID {
fn from(id: u64) -> Self {
Self::new(id)
}
}
impl From<RequestID> for proto::RequestId {
fn from(request_id: RequestID) -> Self {
Self {
id: request_id.id,
attempt: request_id.attempt,
}
}
}
/// A GetPage request class. /// A GetPage request class.
#[derive(Clone, Copy, Debug, strum_macros::Display)] #[derive(Clone, Copy, Debug, strum_macros::Display)]
@@ -459,32 +505,41 @@ impl From<GetPageClass> for i32 {
pub struct GetPageResponse { pub struct GetPageResponse {
/// The original request's ID. /// The original request's ID.
pub request_id: RequestID, pub request_id: RequestID,
/// The response status code. /// The response status code. If not OK, the `rel` and `pages` fields will be empty.
pub status_code: GetPageStatusCode, pub status_code: GetPageStatusCode,
/// A string describing the status, if any. /// A string describing the status, if any.
pub reason: Option<String>, pub reason: Option<String>,
/// The 8KB page images, in the same order as the request. Empty if status != OK. /// The relation that the pages belong to.
pub page_images: Vec<Bytes>, pub rel: RelTag,
// The page(s), in the same order as the request.
pub pages: Vec<Page>,
} }
impl From<proto::GetPageResponse> for GetPageResponse { impl TryFrom<proto::GetPageResponse> for GetPageResponse {
fn from(pb: proto::GetPageResponse) -> Self { type Error = ProtocolError;
Self {
request_id: pb.request_id, fn try_from(pb: proto::GetPageResponse) -> Result<Self, ProtocolError> {
Ok(Self {
request_id: pb
.request_id
.ok_or(ProtocolError::Missing("request_id"))?
.into(),
status_code: pb.status_code.into(), status_code: pb.status_code.into(),
reason: Some(pb.reason).filter(|r| !r.is_empty()), reason: Some(pb.reason).filter(|r| !r.is_empty()),
page_images: pb.page_image, rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
} pages: pb.page.into_iter().map(Page::from).collect(),
})
} }
} }
impl From<GetPageResponse> for proto::GetPageResponse { impl From<GetPageResponse> for proto::GetPageResponse {
fn from(response: GetPageResponse) -> Self { fn from(response: GetPageResponse) -> Self {
Self { Self {
request_id: response.request_id, request_id: Some(response.request_id.into()),
status_code: response.status_code.into(), status_code: response.status_code.into(),
reason: response.reason.unwrap_or_default(), reason: response.reason.unwrap_or_default(),
page_image: response.page_images, rel: Some(response.rel.into()),
page: response.pages.into_iter().map(proto::Page::from).collect(),
} }
} }
} }
@@ -517,11 +572,39 @@ impl GetPageResponse {
request_id, request_id,
status_code, status_code,
reason: Some(status.message().to_string()), reason: Some(status.message().to_string()),
page_images: Vec::new(), rel: RelTag::default(),
pages: Vec::new(),
}) })
} }
} }
// A page.
#[derive(Clone, Debug)]
pub struct Page {
/// The page number.
pub block_number: u32,
/// The materialized page image, as an 8KB byte vector.
pub image: Bytes,
}
impl From<proto::Page> for Page {
fn from(pb: proto::Page) -> Self {
Self {
block_number: pb.block_number,
image: pb.image,
}
}
}
impl From<Page> for proto::Page {
fn from(page: Page) -> Self {
Self {
block_number: page.block_number,
image: page.image,
}
}
}
/// A GetPage response status code. /// A GetPage response status code.
/// ///
/// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream /// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream

View File

@@ -27,8 +27,9 @@ tokio-util.workspace = true
tonic.workspace = true tonic.workspace = true
url.workspace = true url.workspace = true
pageserver_client.workspace = true
pageserver_api.workspace = true pageserver_api.workspace = true
pageserver_client.workspace = true
pageserver_client_grpc.workspace = true
pageserver_page_api.workspace = true pageserver_page_api.workspace = true
utils = { path = "../../libs/utils/" } utils = { path = "../../libs/utils/" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" } workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -10,12 +10,14 @@ use anyhow::Context;
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use camino::Utf8PathBuf; use camino::Utf8PathBuf;
use futures::stream::FuturesUnordered;
use futures::{Stream, StreamExt as _}; use futures::{Stream, StreamExt as _};
use pageserver_api::key::Key; use pageserver_api::key::Key;
use pageserver_api::keyspace::KeySpaceAccum; use pageserver_api::keyspace::KeySpaceAccum;
use pageserver_api::pagestream_api::{PagestreamGetPageRequest, PagestreamRequest}; use pageserver_api::pagestream_api::{PagestreamGetPageRequest, PagestreamRequest};
use pageserver_api::reltag::RelTag; use pageserver_api::reltag::RelTag;
use pageserver_api::shard::TenantShardId; use pageserver_api::shard::TenantShardId;
use pageserver_client_grpc::{self as client_grpc, ShardSpec};
use pageserver_page_api as page_api; use pageserver_page_api as page_api;
use rand::prelude::*; use rand::prelude::*;
use tokio::task::JoinSet; use tokio::task::JoinSet;
@@ -37,6 +39,10 @@ pub(crate) struct Args {
/// Pageserver connection string. Supports postgresql:// and grpc:// protocols. /// Pageserver connection string. Supports postgresql:// and grpc:// protocols.
#[clap(long, default_value = "postgres://postgres@localhost:64000")] #[clap(long, default_value = "postgres://postgres@localhost:64000")]
page_service_connstring: String, page_service_connstring: String,
/// Use the rich gRPC Pageserver client `client_grpc::PageserverClient`, rather than the basic
/// no-frills `page_api::Client`. Only valid with grpc:// connstrings.
#[clap(long)]
rich_client: bool,
#[clap(long)] #[clap(long)]
pageserver_jwt: Option<String>, pageserver_jwt: Option<String>,
#[clap(long, default_value = "1")] #[clap(long, default_value = "1")]
@@ -332,6 +338,7 @@ async fn main_impl(
let client: Box<dyn Client> = match scheme.as_str() { let client: Box<dyn Client> = match scheme.as_str() {
"postgresql" | "postgres" => { "postgresql" | "postgres" => {
assert!(!args.compression, "libpq does not support compression"); assert!(!args.compression, "libpq does not support compression");
assert!(!args.rich_client, "rich client requires grpc://");
Box::new( Box::new(
LibpqClient::new(&args.page_service_connstring, worker_id.timeline) LibpqClient::new(&args.page_service_connstring, worker_id.timeline)
.await .await
@@ -339,6 +346,16 @@ async fn main_impl(
) )
} }
"grpc" if args.rich_client => Box::new(
RichGrpcClient::new(
&args.page_service_connstring,
worker_id.timeline,
args.compression,
)
.await
.unwrap(),
),
"grpc" => Box::new( "grpc" => Box::new(
GrpcClient::new( GrpcClient::new(
&args.page_service_connstring, &args.page_service_connstring,
@@ -657,7 +674,7 @@ impl Client for GrpcClient {
blks: Vec<u32>, blks: Vec<u32>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let req = page_api::GetPageRequest { let req = page_api::GetPageRequest {
request_id: req_id, request_id: req_id.into(),
request_class: page_api::GetPageClass::Normal, request_class: page_api::GetPageClass::Normal,
read_lsn: page_api::ReadLsn { read_lsn: page_api::ReadLsn {
request_lsn: req_lsn, request_lsn: req_lsn,
@@ -677,6 +694,79 @@ impl Client for GrpcClient {
"unexpected status code: {}", "unexpected status code: {}",
resp.status_code, resp.status_code,
); );
Ok((resp.request_id, resp.page_images)) Ok((
resp.request_id.id,
resp.pages.into_iter().map(|p| p.image).collect(),
))
}
}
/// A rich gRPC Pageserver client.
struct RichGrpcClient {
inner: Arc<client_grpc::PageserverClient>,
requests: FuturesUnordered<
Pin<Box<dyn Future<Output = anyhow::Result<page_api::GetPageResponse>> + Send>>,
>,
}
impl RichGrpcClient {
async fn new(
connstring: &str,
ttid: TenantTimelineId,
compression: bool,
) -> anyhow::Result<Self> {
let inner = Arc::new(client_grpc::PageserverClient::new(
ttid.tenant_id,
ttid.timeline_id,
ShardSpec::new(
[(ShardIndex::unsharded(), connstring.to_string())].into(),
None,
)?,
None,
compression.then_some(tonic::codec::CompressionEncoding::Zstd),
)?);
Ok(Self {
inner,
requests: FuturesUnordered::new(),
})
}
}
#[async_trait]
impl Client for RichGrpcClient {
async fn send_get_page(
&mut self,
req_id: u64,
req_lsn: Lsn,
mod_lsn: Lsn,
rel: RelTag,
blks: Vec<u32>,
) -> anyhow::Result<()> {
let req = page_api::GetPageRequest {
request_id: req_id.into(),
request_class: page_api::GetPageClass::Normal,
read_lsn: page_api::ReadLsn {
request_lsn: req_lsn,
not_modified_since_lsn: Some(mod_lsn),
},
rel,
block_numbers: blks,
};
let inner = self.inner.clone();
self.requests.push(Box::pin(async move {
inner
.get_page(req)
.await
.map_err(|err| anyhow::anyhow!("{err}"))
}));
Ok(())
}
async fn recv_get_page(&mut self) -> anyhow::Result<(u64, Vec<Bytes>)> {
let resp = self.requests.next().await.unwrap()?;
Ok((
resp.request_id.id,
resp.pages.into_iter().map(|p| p.image).collect(),
))
} }
} }

View File

@@ -2,7 +2,9 @@
//! Management HTTP API //! Management HTTP API
//! //!
use std::cmp::Reverse; use std::cmp::Reverse;
use std::collections::{BinaryHeap, HashMap}; use std::collections::BTreeMap;
use std::collections::BinaryHeap;
use std::collections::HashMap;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@@ -3217,6 +3219,30 @@ async fn get_utilization(
.map_err(ApiError::InternalServerError) .map_err(ApiError::InternalServerError)
} }
/// HADRON
async fn list_tenant_visible_size_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let state = get_state(&request);
let mut map = BTreeMap::new();
for (tenant_shard_id, slot) in state.tenant_manager.list() {
match slot {
TenantSlot::Attached(tenant) => {
let visible_size = tenant.get_visible_size();
map.insert(tenant_shard_id, visible_size);
}
TenantSlot::Secondary(_) | TenantSlot::InProgress(_) => {
continue;
}
}
}
json_response(StatusCode::OK, map)
}
async fn list_aux_files( async fn list_aux_files(
mut request: Request<Body>, mut request: Request<Body>,
_cancel: CancellationToken, _cancel: CancellationToken,
@@ -4154,6 +4180,7 @@ pub fn make_router(
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler)) .put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
.put("/v1/io_mode", |r| api_handler(r, put_io_mode_handler)) .put("/v1/io_mode", |r| api_handler(r, put_io_mode_handler))
.get("/v1/utilization", |r| api_handler(r, get_utilization)) .get("/v1/utilization", |r| api_handler(r, get_utilization))
.get("/v1/list_tenant_visible_size", |r| api_handler(r, list_tenant_visible_size_handler))
.post( .post(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/ingest_aux_files", "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/ingest_aux_files",
|r| testing_api_handler("ingest_aux_files", r, ingest_aux_files), |r| testing_api_handler("ingest_aux_files", r, ingest_aux_files),

View File

@@ -2867,6 +2867,24 @@ pub(crate) static MISROUTED_PAGESTREAM_REQUESTS: Lazy<IntCounter> = Lazy::new(||
.expect("failed to define a metric") .expect("failed to define a metric")
}); });
// Global counter for PageStream request results by outcome. Outcomes are divided into 3 categories:
// - success
// - internal_error: errors that indicate bugs in the storage cluster (e.g. page reconstruction errors, misrouted requests, LSN timeout errors)
// - other_error: transient error conditions that are expected in normal operation or indicate bugs with other parts of the system (e.g. error due to pageserver shutdown, malformed requests etc.)
pub(crate) static PAGESTREAM_HANDLER_RESULTS_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_pagestream_handler_results_total",
"Number of pageserver pagestream handler results by outcome (success, internal_error, other_error)",
&["outcome"]
)
.expect("failed to define a metric")
});
// Constants for pageserver_pagestream_handler_results_total's outcome labels
pub(crate) const PAGESTREAM_HANDLER_OUTCOME_SUCCESS: &str = "success";
pub(crate) const PAGESTREAM_HANDLER_OUTCOME_INTERNAL_ERROR: &str = "internal_error";
pub(crate) const PAGESTREAM_HANDLER_OUTCOME_OTHER_ERROR: &str = "other_error";
// Metrics collected on WAL redo operations // Metrics collected on WAL redo operations
// //
// We collect the time spent in actual WAL redo ('redo'), and time waiting // We collect the time spent in actual WAL redo ('redo'), and time waiting

View File

@@ -70,7 +70,7 @@ use crate::context::{
}; };
use crate::metrics::{ use crate::metrics::{
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS, self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS,
MISROUTED_PAGESTREAM_REQUESTS, SmgrOpTimer, TimelineMetrics, MISROUTED_PAGESTREAM_REQUESTS, PAGESTREAM_HANDLER_RESULTS_TOTAL, SmgrOpTimer, TimelineMetrics,
}; };
use crate::pgdatadir_mapping::{LsnRange, Version}; use crate::pgdatadir_mapping::{LsnRange, Version};
use crate::span::{ use crate::span::{
@@ -1441,20 +1441,57 @@ impl PageServerHandler {
let (response_msg, ctx) = match handler_result { let (response_msg, ctx) = match handler_result {
Err(e) => match &e.err { Err(e) => match &e.err {
PageStreamError::Shutdown => { PageStreamError::Shutdown => {
// BEGIN HADRON
PAGESTREAM_HANDLER_RESULTS_TOTAL
.with_label_values(&[metrics::PAGESTREAM_HANDLER_OUTCOME_OTHER_ERROR])
.inc();
// END HADRON
// If we fail to fulfil a request during shutdown, which may be _because_ of // If we fail to fulfil a request during shutdown, which may be _because_ of
// shutdown, then do not send the error to the client. Instead just drop the // shutdown, then do not send the error to the client. Instead just drop the
// connection. // connection.
span.in_scope(|| info!("dropping connection due to shutdown")); span.in_scope(|| info!("dropping connection due to shutdown"));
return Err(QueryError::Shutdown); return Err(QueryError::Shutdown);
} }
PageStreamError::Reconnect(reason) => { PageStreamError::Reconnect(_reason) => {
span.in_scope(|| info!("handler requested reconnect: {reason}")); span.in_scope(|| {
// BEGIN HADRON
// We can get here because the compute node is pointing at the wrong PS. We
// already have a metric to keep track of this so suppressing this log to
// reduce log spam. The information in this log message is not going to be that
// helpful given the volume of logs that can be generated.
// info!("handler requested reconnect: {reason}")
// END HADRON
});
// BEGIN HADRON
PAGESTREAM_HANDLER_RESULTS_TOTAL
.with_label_values(&[
metrics::PAGESTREAM_HANDLER_OUTCOME_INTERNAL_ERROR,
])
.inc();
// END HADRON
return Err(QueryError::Reconnect); return Err(QueryError::Reconnect);
} }
PageStreamError::Read(_) PageStreamError::Read(_)
| PageStreamError::LsnTimeout(_) | PageStreamError::LsnTimeout(_)
| PageStreamError::NotFound(_) | PageStreamError::NotFound(_)
| PageStreamError::BadRequest(_) => { | PageStreamError::BadRequest(_) => {
// BEGIN HADRON
if let PageStreamError::Read(_) | PageStreamError::LsnTimeout(_) = &e.err {
PAGESTREAM_HANDLER_RESULTS_TOTAL
.with_label_values(&[
metrics::PAGESTREAM_HANDLER_OUTCOME_INTERNAL_ERROR,
])
.inc();
} else {
PAGESTREAM_HANDLER_RESULTS_TOTAL
.with_label_values(&[
metrics::PAGESTREAM_HANDLER_OUTCOME_OTHER_ERROR,
])
.inc();
}
// END HADRON
// print the all details to the log with {:#}, but for the client the // print the all details to the log with {:#}, but for the client the
// error message is enough. Do not log if shutting down, as the anyhow::Error // error message is enough. Do not log if shutting down, as the anyhow::Error
// here includes cancellation which is not an error. // here includes cancellation which is not an error.
@@ -1472,7 +1509,15 @@ impl PageServerHandler {
) )
} }
}, },
Ok((response_msg, _op_timer_already_observed, ctx)) => (response_msg, Some(ctx)), Ok((response_msg, _op_timer_already_observed, ctx)) => {
// BEGIN HADRON
PAGESTREAM_HANDLER_RESULTS_TOTAL
.with_label_values(&[metrics::PAGESTREAM_HANDLER_OUTCOME_SUCCESS])
.inc();
// END HADRON
(response_msg, Some(ctx))
}
}; };
let ctx = ctx.map(|req_ctx| { let ctx = ctx.map(|req_ctx| {
@@ -3421,9 +3466,12 @@ impl GrpcPageServiceHandler {
} }
/// Generates a PagestreamRequest header from a ReadLsn and request ID. /// Generates a PagestreamRequest header from a ReadLsn and request ID.
fn make_hdr(read_lsn: page_api::ReadLsn, req_id: u64) -> PagestreamRequest { fn make_hdr(
read_lsn: page_api::ReadLsn,
req_id: Option<page_api::RequestID>,
) -> PagestreamRequest {
PagestreamRequest { PagestreamRequest {
reqid: req_id, reqid: req_id.map(|r| r.id).unwrap_or_default(),
request_lsn: read_lsn.request_lsn, request_lsn: read_lsn.request_lsn,
not_modified_since: read_lsn not_modified_since: read_lsn
.not_modified_since_lsn .not_modified_since_lsn
@@ -3533,7 +3581,7 @@ impl GrpcPageServiceHandler {
batch.push(BatchedGetPageRequest { batch.push(BatchedGetPageRequest {
req: PagestreamGetPageRequest { req: PagestreamGetPageRequest {
hdr: Self::make_hdr(req.read_lsn, req.request_id), hdr: Self::make_hdr(req.read_lsn, Some(req.request_id)),
rel: req.rel, rel: req.rel,
blkno, blkno,
}, },
@@ -3563,12 +3611,16 @@ impl GrpcPageServiceHandler {
request_id: req.request_id, request_id: req.request_id,
status_code: page_api::GetPageStatusCode::Ok, status_code: page_api::GetPageStatusCode::Ok,
reason: None, reason: None,
page_images: Vec::with_capacity(results.len()), rel: req.rel,
pages: Vec::with_capacity(results.len()),
}; };
for result in results { for result in results {
match result { match result {
Ok((PagestreamBeMessage::GetPage(r), _, _)) => resp.page_images.push(r.page), Ok((PagestreamBeMessage::GetPage(r), _, _)) => resp.pages.push(page_api::Page {
block_number: r.req.blkno,
image: r.page,
}),
Ok((resp, _, _)) => { Ok((resp, _, _)) => {
return Err(tonic::Status::internal(format!( return Err(tonic::Status::internal(format!(
"unexpected response: {resp:?}" "unexpected response: {resp:?}"
@@ -3611,7 +3663,7 @@ impl proto::PageService for GrpcPageServiceHandler {
span_record!(rel=%req.rel, lsn=%req.read_lsn); span_record!(rel=%req.rel, lsn=%req.read_lsn);
let req = PagestreamExistsRequest { let req = PagestreamExistsRequest {
hdr: Self::make_hdr(req.read_lsn, 0), hdr: Self::make_hdr(req.read_lsn, None),
rel: req.rel, rel: req.rel,
}; };
@@ -3761,7 +3813,7 @@ impl proto::PageService for GrpcPageServiceHandler {
span_record!(db_oid=%req.db_oid, lsn=%req.read_lsn); span_record!(db_oid=%req.db_oid, lsn=%req.read_lsn);
let req = PagestreamDbSizeRequest { let req = PagestreamDbSizeRequest {
hdr: Self::make_hdr(req.read_lsn, 0), hdr: Self::make_hdr(req.read_lsn, None),
dbnode: req.db_oid, dbnode: req.db_oid,
}; };
@@ -3811,7 +3863,7 @@ impl proto::PageService for GrpcPageServiceHandler {
.await? .await?
.downgrade(); .downgrade();
while let Some(req) = reqs.message().await? { while let Some(req) = reqs.message().await? {
let req_id = req.request_id; let req_id = req.request_id.map(page_api::RequestID::from).unwrap_or_default();
let result = Self::get_page(&ctx, &timeline, req, io_concurrency.clone()) let result = Self::get_page(&ctx, &timeline, req, io_concurrency.clone())
.instrument(span.clone()) // propagate request span .instrument(span.clone()) // propagate request span
.await; .await;
@@ -3850,7 +3902,7 @@ impl proto::PageService for GrpcPageServiceHandler {
span_record!(rel=%req.rel, lsn=%req.read_lsn); span_record!(rel=%req.rel, lsn=%req.read_lsn);
let req = PagestreamNblocksRequest { let req = PagestreamNblocksRequest {
hdr: Self::make_hdr(req.read_lsn, 0), hdr: Self::make_hdr(req.read_lsn, None),
rel: req.rel, rel: req.rel,
}; };
@@ -3883,7 +3935,7 @@ impl proto::PageService for GrpcPageServiceHandler {
span_record!(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn); span_record!(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn);
let req = PagestreamGetSlruSegmentRequest { let req = PagestreamGetSlruSegmentRequest {
hdr: Self::make_hdr(req.read_lsn, 0), hdr: Self::make_hdr(req.read_lsn, None),
kind: req.kind as u8, kind: req.kind as u8,
segno: req.segno, segno: req.segno,
}; };

View File

@@ -5772,6 +5772,16 @@ impl TenantShard {
.unwrap_or(0) .unwrap_or(0)
} }
/// HADRON
/// Return the visible size of all timelines in this tenant.
pub(crate) fn get_visible_size(&self) -> u64 {
let timelines = self.timelines.lock().unwrap();
timelines
.values()
.map(|t| t.metrics.visible_physical_size_gauge.get())
.sum()
}
/// Builds a new tenant manifest, and uploads it if it differs from the last-known tenant /// Builds a new tenant manifest, and uploads it if it differs from the last-known tenant
/// manifest in `Self::remote_tenant_manifest`. /// manifest in `Self::remote_tenant_manifest`.
/// ///

View File

@@ -225,7 +225,7 @@ impl fmt::Display for ImageLayerName {
/// storage and object names in remote storage consist of the LayerName plus some extra qualifiers /// storage and object names in remote storage consist of the LayerName plus some extra qualifiers
/// that uniquely identify the physical incarnation of a layer (see [crate::tenant::remote_timeline_client::remote_layer_path]) /// that uniquely identify the physical incarnation of a layer (see [crate::tenant::remote_timeline_client::remote_layer_path])
/// and [`crate::tenant::storage_layer::layer::local_layer_path`]) /// and [`crate::tenant::storage_layer::layer::local_layer_path`])
#[derive(Debug, PartialEq, Eq, Hash, Clone)] #[derive(Debug, PartialEq, Eq, Hash, Clone, Ord, PartialOrd)]
pub enum LayerName { pub enum LayerName {
Image(ImageLayerName), Image(ImageLayerName),
Delta(DeltaLayerName), Delta(DeltaLayerName),

View File

@@ -1410,7 +1410,7 @@ pg_init_libpagestore(void)
"sharding stripe size", "sharding stripe size",
NULL, NULL,
&stripe_size, &stripe_size,
32768, 1, INT_MAX, 2048, 1, INT_MAX,
PGC_SIGHUP, PGC_SIGHUP,
GUC_UNIT_BLOCKS, GUC_UNIT_BLOCKS,
NULL, NULL, NULL); NULL, NULL, NULL);

View File

@@ -376,6 +376,18 @@ typedef struct PageserverFeedback
uint32 shard_number; uint32 shard_number;
} PageserverFeedback; } PageserverFeedback;
/* BEGIN_HADRON */
typedef struct WalRateLimiter
{
/* If the value is 1, PG backends will hit backpressure. */
pg_atomic_uint32 should_limit;
/* The number of bytes sent in the current second. */
uint64 sent_bytes;
/* The last recorded time in microsecond. */
TimestampTz last_recorded_time_us;
} WalRateLimiter;
/* END_HADRON */
typedef struct WalproposerShmemState typedef struct WalproposerShmemState
{ {
pg_atomic_uint64 propEpochStartLsn; pg_atomic_uint64 propEpochStartLsn;
@@ -395,6 +407,11 @@ typedef struct WalproposerShmemState
/* aggregated feedback with min LSNs across shards */ /* aggregated feedback with min LSNs across shards */
PageserverFeedback min_ps_feedback; PageserverFeedback min_ps_feedback;
/* BEGIN_HADRON */
/* The WAL rate limiter */
WalRateLimiter wal_rate_limiter;
/* END_HADRON */
} WalproposerShmemState; } WalproposerShmemState;
/* /*

View File

@@ -66,6 +66,9 @@ int wal_acceptor_reconnect_timeout = 1000;
int wal_acceptor_connection_timeout = 10000; int wal_acceptor_connection_timeout = 10000;
int safekeeper_proto_version = 3; int safekeeper_proto_version = 3;
char *safekeeper_conninfo_options = ""; char *safekeeper_conninfo_options = "";
/* BEGIN_HADRON */
int databricks_max_wal_mb_per_second = -1;
/* END_HADRON */
/* Set to true in the walproposer bgw. */ /* Set to true in the walproposer bgw. */
static bool am_walproposer; static bool am_walproposer;
@@ -252,6 +255,18 @@ nwp_register_gucs(void)
PGC_POSTMASTER, PGC_POSTMASTER,
0, 0,
NULL, NULL, NULL); NULL, NULL, NULL);
/* BEGIN_HADRON */
DefineCustomIntVariable(
"databricks.max_wal_mb_per_second",
"The maximum WAL MB per second allowed. If breached, sending WAL hit the backpressure. Setting to -1 disables the limit.",
NULL,
&databricks_max_wal_mb_per_second,
-1, -1, INT_MAX,
PGC_SUSET,
GUC_UNIT_MB,
NULL, NULL, NULL);
/* END_HADRON */
} }
@@ -393,6 +408,7 @@ assign_neon_safekeepers(const char *newval, void *extra)
static uint64 static uint64
backpressure_lag_impl(void) backpressure_lag_impl(void)
{ {
struct WalproposerShmemState* state = NULL;
if (max_replication_apply_lag > 0 || max_replication_flush_lag > 0 || max_replication_write_lag > 0) if (max_replication_apply_lag > 0 || max_replication_flush_lag > 0 || max_replication_write_lag > 0)
{ {
XLogRecPtr writePtr; XLogRecPtr writePtr;
@@ -426,6 +442,18 @@ backpressure_lag_impl(void)
return (myFlushLsn - applyPtr - max_replication_apply_lag * MB); return (myFlushLsn - applyPtr - max_replication_apply_lag * MB);
} }
} }
/* BEGIN_HADRON */
if (databricks_max_wal_mb_per_second == -1) {
return 0;
}
state = GetWalpropShmemState();
if (state != NULL && pg_atomic_read_u32(&state->wal_rate_limiter.should_limit) == 1)
{
return 1;
}
/* END_HADRON */
return 0; return 0;
} }
@@ -472,6 +500,9 @@ WalproposerShmemInit(void)
pg_atomic_init_u64(&walprop_shared->mineLastElectedTerm, 0); pg_atomic_init_u64(&walprop_shared->mineLastElectedTerm, 0);
pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0); pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0);
pg_atomic_init_u64(&walprop_shared->currentClusterSize, 0); pg_atomic_init_u64(&walprop_shared->currentClusterSize, 0);
/* BEGIN_HADRON */
pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.should_limit, 0);
/* END_HADRON */
} }
LWLockRelease(AddinShmemInitLock); LWLockRelease(AddinShmemInitLock);
@@ -487,6 +518,9 @@ WalproposerShmemInit_SyncSafekeeper(void)
pg_atomic_init_u64(&walprop_shared->propEpochStartLsn, 0); pg_atomic_init_u64(&walprop_shared->propEpochStartLsn, 0);
pg_atomic_init_u64(&walprop_shared->mineLastElectedTerm, 0); pg_atomic_init_u64(&walprop_shared->mineLastElectedTerm, 0);
pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0); pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0);
/* BEGIN_HADRON */
pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.should_limit, 0);
/* END_HADRON */
} }
#define BACK_PRESSURE_DELAY 10000L // 0.01 sec #define BACK_PRESSURE_DELAY 10000L // 0.01 sec
@@ -521,7 +555,6 @@ backpressure_throttling_impl(void)
if (lag == 0) if (lag == 0)
return retry; return retry;
old_status = get_ps_display(&len); old_status = get_ps_display(&len);
new_status = (char *) palloc(len + 64 + 1); new_status = (char *) palloc(len + 64 + 1);
memcpy(new_status, old_status, len); memcpy(new_status, old_status, len);
@@ -1458,6 +1491,8 @@ XLogBroadcastWalProposer(WalProposer *wp)
{ {
XLogRecPtr startptr; XLogRecPtr startptr;
XLogRecPtr endptr; XLogRecPtr endptr;
struct WalproposerShmemState *state = NULL;
TimestampTz now = 0;
/* Start from the last sent position */ /* Start from the last sent position */
startptr = sentPtr; startptr = sentPtr;
@@ -1502,13 +1537,36 @@ XLogBroadcastWalProposer(WalProposer *wp)
* that arbitrary LSN is eventually reported as written, flushed and * that arbitrary LSN is eventually reported as written, flushed and
* applied, so that it can measure the elapsed time. * applied, so that it can measure the elapsed time.
*/ */
LagTrackerWrite(endptr, GetCurrentTimestamp()); now = GetCurrentTimestamp();
LagTrackerWrite(endptr, now);
/* Do we have any work to do? */ /* Do we have any work to do? */
Assert(startptr <= endptr); Assert(startptr <= endptr);
if (endptr <= startptr) if (endptr <= startptr)
return; return;
/* BEGIN_HADRON */
state = GetWalpropShmemState();
if (databricks_max_wal_mb_per_second != -1 && state != NULL)
{
uint64 max_wal_bytes = (uint64) databricks_max_wal_mb_per_second * 1024 * 1024;
struct WalRateLimiter *limiter = &state->wal_rate_limiter;
if (now - limiter->last_recorded_time_us > USECS_PER_SEC)
{
/* Reset the rate limiter */
limiter->last_recorded_time_us = now;
limiter->sent_bytes = 0;
pg_atomic_exchange_u32(&limiter->should_limit, 0);
}
limiter->sent_bytes += (endptr - startptr);
if (limiter->sent_bytes > max_wal_bytes)
{
pg_atomic_exchange_u32(&limiter->should_limit, 1);
}
}
/* END_HADRON */
WalProposerBroadcast(wp, startptr, endptr); WalProposerBroadcast(wp, startptr, endptr);
sentPtr = endptr; sentPtr = endptr;

View File

@@ -59,6 +59,15 @@ pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
.expect("Failed to register safekeeper_flush_wal_seconds histogram") .expect("Failed to register safekeeper_flush_wal_seconds histogram")
}); });
/* BEGIN_HADRON */ /* BEGIN_HADRON */
// Counter of all ProposerAcceptorMessage requests received
pub static PROPOSER_ACCEPTOR_MESSAGES_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"safekeeper_proposer_acceptor_messages_total",
"Total number of ProposerAcceptorMessage requests received by the Safekeeper.",
&["outcome"]
)
.expect("Failed to register safekeeper_proposer_acceptor_messages_total counter")
});
pub static WAL_DISK_IO_ERRORS: Lazy<IntCounter> = Lazy::new(|| { pub static WAL_DISK_IO_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!( register_int_counter!(
"safekeeper_wal_disk_io_errors", "safekeeper_wal_disk_io_errors",

View File

@@ -24,7 +24,7 @@ use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn; use utils::lsn::Lsn;
use utils::pageserver_feedback::PageserverFeedback; use utils::pageserver_feedback::PageserverFeedback;
use crate::metrics::MISC_OPERATION_SECONDS; use crate::metrics::{MISC_OPERATION_SECONDS, PROPOSER_ACCEPTOR_MESSAGES_TOTAL};
use crate::state::TimelineState; use crate::state::TimelineState;
use crate::{control_file, wal_storage}; use crate::{control_file, wal_storage};
@@ -938,7 +938,7 @@ where
&mut self, &mut self,
msg: &ProposerAcceptorMessage, msg: &ProposerAcceptorMessage,
) -> Result<Option<AcceptorProposerMessage>> { ) -> Result<Option<AcceptorProposerMessage>> {
match msg { let res = match msg {
ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await, ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await, ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await, ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
@@ -949,7 +949,20 @@ where
self.handle_append_request(msg, false).await self.handle_append_request(msg, false).await
} }
ProposerAcceptorMessage::FlushWAL => self.handle_flush().await, ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
} };
// BEGIN HADRON
match &res {
Ok(_) => PROPOSER_ACCEPTOR_MESSAGES_TOTAL
.with_label_values(&["success"])
.inc(),
Err(_) => PROPOSER_ACCEPTOR_MESSAGES_TOTAL
.with_label_values(&["error"])
.inc(),
};
res
// END HADRON
} }
/// Handle initial message from proposer: check its sanity and send my /// Handle initial message from proposer: check its sanity and send my

View File

@@ -333,6 +333,13 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
res = self.post(f"http://localhost:{self.port}/v1/reload_auth_validation_keys") res = self.post(f"http://localhost:{self.port}/v1/reload_auth_validation_keys")
self.verbose_error(res) self.verbose_error(res)
def list_tenant_visible_size(self) -> dict[TenantShardId, int]:
res = self.get(f"http://localhost:{self.port}/v1/list_tenant_visible_size")
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def tenant_list(self) -> list[dict[Any, Any]]: def tenant_list(self) -> list[dict[Any, Any]]:
res = self.get(f"http://localhost:{self.port}/v1/tenant") res = self.get(f"http://localhost:{self.port}/v1/tenant")
self.verbose_error(res) self.verbose_error(res)

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
import re import re
import socket import socket
from contextlib import closing from contextlib import closing
from itertools import cycle
from fixtures.log_helper import log from fixtures.log_helper import log
@@ -34,15 +35,23 @@ def can_bind(host: str, port: int) -> bool:
class PortDistributor: class PortDistributor:
def __init__(self, base_port: int, port_number: int): def __init__(self, base_port: int, port_number: int):
self.iterator = iter(range(base_port, base_port + port_number)) self.base_port = base_port
self.port_number = port_number
self.cycle = cycle(range(base_port, base_port + port_number))
self.port_map: dict[int, int] = {} self.port_map: dict[int, int] = {}
def get_port(self) -> int: def get_port(self) -> int:
for port in self.iterator: checked = 0
for port in self.cycle:
if can_bind("localhost", port): if can_bind("localhost", port):
return port return port
elif checked < self.port_number:
checked += 1
else:
break
raise RuntimeError( raise RuntimeError(
"port range configured for test is exhausted, consider enlarging the range" f"port range ({self.base_port}..{self.base_port + self.port_number}) configured for test is exhausted, consider enlarging the range"
) )
def replace_with_new_port(self, value: int | str) -> int | str: def replace_with_new_port(self, value: int | str) -> int | str:

View File

@@ -40,7 +40,7 @@ def prom_parse(client: EndpointHttpClient) -> dict[str, float]:
def offload_lfc(method: PrewarmMethod, client: EndpointHttpClient, cur: Cursor) -> Any: def offload_lfc(method: PrewarmMethod, client: EndpointHttpClient, cur: Cursor) -> Any:
if method == PrewarmMethod.POSTGRES: if method == PrewarmMethod.POSTGRES:
cur.execute("select get_local_cache_state()") cur.execute("select neon.get_local_cache_state()")
return cur.fetchall()[0][0] return cur.fetchall()[0][0]
if method == PrewarmMethod.AUTOPREWARM: if method == PrewarmMethod.AUTOPREWARM:
@@ -72,7 +72,7 @@ def prewarm_endpoint(
elif method == PrewarmMethod.COMPUTE_CTL: elif method == PrewarmMethod.COMPUTE_CTL:
client.prewarm_lfc() client.prewarm_lfc()
elif method == PrewarmMethod.POSTGRES: elif method == PrewarmMethod.POSTGRES:
cur.execute("select prewarm_local_cache(%s)", (lfc_state,)) cur.execute("select neon.prewarm_local_cache(%s)", (lfc_state,))
def check_prewarmed( def check_prewarmed(
@@ -116,7 +116,7 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod):
pg_conn = endpoint.connect() pg_conn = endpoint.connect()
pg_cur = pg_conn.cursor() pg_cur = pg_conn.cursor()
pg_cur.execute("create extension neon") pg_cur.execute("create schema neon; create extension neon with schema neon")
pg_cur.execute("create database lfc") pg_cur.execute("create database lfc")
lfc_conn = endpoint.connect(dbname="lfc") lfc_conn = endpoint.connect(dbname="lfc")
@@ -142,10 +142,12 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod):
lfc_cur = lfc_conn.cursor() lfc_cur = lfc_conn.cursor()
prewarm_endpoint(method, client, pg_cur, lfc_state) prewarm_endpoint(method, client, pg_cur, lfc_state)
pg_cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'") pg_cur.execute(
"select lfc_value from neon.neon_lfc_stats where lfc_key='file_cache_used_pages'"
)
lfc_used_pages = pg_cur.fetchall()[0][0] lfc_used_pages = pg_cur.fetchall()[0][0]
log.info(f"Used LFC size: {lfc_used_pages}") log.info(f"Used LFC size: {lfc_used_pages}")
pg_cur.execute("select * from get_prewarm_info()") pg_cur.execute("select * from neon.get_prewarm_info()")
total, prewarmed, skipped, _ = pg_cur.fetchall()[0] total, prewarmed, skipped, _ = pg_cur.fetchall()[0]
log.info(f"Prewarm info: {total=} {prewarmed=} {skipped=}") log.info(f"Prewarm info: {total=} {prewarmed=} {skipped=}")
progress = (prewarmed + skipped) * 100 // total progress = (prewarmed + skipped) * 100 // total
@@ -186,7 +188,7 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, method: PrewarmMet
pg_conn = endpoint.connect() pg_conn = endpoint.connect()
pg_cur = pg_conn.cursor() pg_cur = pg_conn.cursor()
pg_cur.execute("create extension neon") pg_cur.execute("create schema neon; create extension neon with schema neon")
pg_cur.execute("CREATE DATABASE lfc") pg_cur.execute("CREATE DATABASE lfc")
lfc_conn = endpoint.connect(dbname="lfc") lfc_conn = endpoint.connect(dbname="lfc")

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from fixtures.common_types import Lsn, TenantId, TimelineId from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import ( from fixtures.neon_fixtures import (
DEFAULT_BRANCH_NAME, DEFAULT_BRANCH_NAME,
NeonEnv, NeonEnv,
@@ -164,3 +165,15 @@ def test_pageserver_http_index_part_force_patch(neon_env_builder: NeonEnvBuilder
{"rel_size_migration": "legacy"}, {"rel_size_migration": "legacy"},
) )
assert client.timeline_detail(tenant_id, timeline_id)["rel_size_migration"] == "legacy" assert client.timeline_detail(tenant_id, timeline_id)["rel_size_migration"] == "legacy"
def test_pageserver_get_tenant_visible_size(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_pageservers = 1
env = neon_env_builder.init_start()
env.create_tenant(shard_count=4)
env.create_tenant(shard_count=2)
json = env.pageserver.http_client().list_tenant_visible_size()
log.info(f"{json}")
# initial tennat + 2 newly created tenants
assert len(json) == 7

View File

@@ -60,7 +60,7 @@ def test_replica_promote(neon_simple_env: NeonEnv, method: PromoteMethod):
with primary.connect() as primary_conn: with primary.connect() as primary_conn:
primary_cur = primary_conn.cursor() primary_cur = primary_conn.cursor()
primary_cur.execute("create extension neon") primary_cur.execute("create schema neon;create extension neon with schema neon")
primary_cur.execute( primary_cur.execute(
"create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)" "create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)"
) )
@@ -172,7 +172,7 @@ def test_replica_promote_handler_disconnects(neon_simple_env: NeonEnv):
secondary: Endpoint = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") secondary: Endpoint = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
with primary.connect() as conn, conn.cursor() as cur: with primary.connect() as conn, conn.cursor() as cur:
cur.execute("create extension neon") cur.execute("create schema neon;create extension neon with schema neon")
cur.execute("create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)") cur.execute("create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)")
cur.execute("INSERT INTO t(payload) SELECT generate_series(1, 100)") cur.execute("INSERT INTO t(payload) SELECT generate_series(1, 100)")
cur.execute("show neon.safekeepers") cur.execute("show neon.safekeepers")