diff --git a/Cargo.lock b/Cargo.lock index 55ea89149e..7edc974e93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4499,7 +4499,9 @@ name = "pageserver_client_grpc" version = "0.1.0" dependencies = [ "anyhow", + "compute_api", "futures", + "pageserver_api", "pageserver_page_api", "tokio", "tokio-stream", diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index 202834dbb4..d34774aed0 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -444,7 +444,7 @@ pub struct JwksSettings { } /// Protocol used to connect to a Pageserver. Parsed from the connstring scheme. -#[derive(Clone, Copy, Debug, Default)] +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] pub enum PageserverProtocol { /// The original protocol based on libpq and COPY. Uses postgresql:// or postgres:// scheme. #[default] diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index fd8931bcda..a9b75060e7 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -387,7 +387,7 @@ pub struct SafekeepersInfo { pub safekeepers: Vec, } -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct SafekeeperInfo { pub id: NodeId, pub hostname: String, diff --git a/libs/pageserver_api/src/shard.rs b/libs/pageserver_api/src/shard.rs index 5a13aace64..d6f4cd5e66 100644 --- a/libs/pageserver_api/src/shard.rs +++ b/libs/pageserver_api/src/shard.rs @@ -332,7 +332,11 @@ fn hash_combine(mut a: u32, mut b: u32) -> u32 { /// /// The mapping of key to shard is not stable across changes to ShardCount: this is intentional /// and will be handled at higher levels when shards are split. -fn key_to_shard_number(count: ShardCount, stripe_size: ShardStripeSize, key: &Key) -> ShardNumber { +pub fn key_to_shard_number( + count: ShardCount, + stripe_size: ShardStripeSize, + key: &Key, +) -> ShardNumber { // Fast path for un-sharded tenants or broadcast keys if count < ShardCount(2) || key_is_shard0(key) { return ShardNumber(0); diff --git a/libs/utils/src/shard.rs b/libs/utils/src/shard.rs index f2b81373e2..5a0edf8cea 100644 --- a/libs/utils/src/shard.rs +++ b/libs/utils/src/shard.rs @@ -171,6 +171,12 @@ impl std::fmt::Display for ShardNumber { } } +impl std::fmt::Display for ShardCount { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + impl std::fmt::Display for ShardSlug<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( diff --git a/pageserver/client_grpc/Cargo.toml b/pageserver/client_grpc/Cargo.toml index 5a3a2761c2..0a8bcad2ef 100644 --- a/pageserver/client_grpc/Cargo.toml +++ b/pageserver/client_grpc/Cargo.toml @@ -6,7 +6,9 @@ license.workspace = true [dependencies] anyhow.workspace = true +compute_api.workspace = true futures.workspace = true +pageserver_api.workspace = true pageserver_page_api.workspace = true tokio.workspace = true tokio-stream.workspace = true diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs new file mode 100644 index 0000000000..5bccdeede3 --- /dev/null +++ b/pageserver/client_grpc/src/client.rs @@ -0,0 +1,303 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use anyhow::anyhow; +use tracing::instrument; + +use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool}; +use crate::retry::Retry; +use compute_api::spec::PageserverProtocol; +use pageserver_api::key::{Key, rel_block_to_key}; +use pageserver_api::shard::{ShardStripeSize, key_to_shard_number}; +use pageserver_page_api as page_api; +use utils::id::{TenantId, TimelineId}; +use utils::shard::{ShardCount, ShardIndex, ShardNumber}; + +/// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the +/// basic `page_api::Client` gRPC client, and supports: +/// +/// * Sharded tenants across multiple Pageservers. +/// * Pooling of connections, clients, and streams for efficient resource use. +/// * Concurrent use by many callers. +/// * Internal handling of GetPage bidirectional streams, with pipelining and error handling. +/// * Automatic retries. +/// * Observability. +/// +/// TODO: this client does not support base backups or LSN leases, as these are only used by +/// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards. +pub struct PageserverClient { + // TODO: support swapping out the shard map, e.g. via an ArcSwap. + shards: Shards, + retry: Retry, +} + +impl PageserverClient { + /// Creates a new Pageserver client for a given tenant and timeline. Uses the Pageservers given + /// in the shard map, which must be complete and must use gRPC URLs. + pub fn new( + tenant_id: TenantId, + timeline_id: TimelineId, + shard_map: HashMap, + stripe_size: ShardStripeSize, + auth_token: Option, + ) -> anyhow::Result { + let shards = Shards::new(tenant_id, timeline_id, shard_map, stripe_size, auth_token)?; + Ok(Self { + shards, + retry: Retry, + }) + } + + /// Returns whether a relation exists. + #[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))] + pub async fn check_rel_exists( + &self, + req: page_api::CheckRelExistsRequest, + ) -> tonic::Result { + self.retry + .with(async || { + // Relation metadata is only available on shard 0. + let mut client = self.shards.get_zero().client().await?; + client.check_rel_exists(req).await + }) + .await + } + + /// Returns the total size of a database, as # of bytes. + #[instrument(skip_all, fields(db_oid=%req.db_oid, lsn=%req.read_lsn))] + pub async fn get_db_size( + &self, + req: page_api::GetDbSizeRequest, + ) -> tonic::Result { + self.retry + .with(async || { + // Relation metadata is only available on shard 0. + let mut client = self.shards.get_zero().client().await?; + client.get_db_size(req).await + }) + .await + } + + /// Fetches a page. The `request_id` must be unique across all in-flight requests. + /// + /// Unlike the `page_api::Client`, this client automatically converts `status_code` into + /// `tonic::Status` errors. All responses will have `GetPageStatusCode::Ok`. + #[instrument(skip_all, fields( + req_id = %req.request_id, + rel = %req.rel, + blkno = %req.block_numbers[0], + blks = %req.block_numbers.len(), + lsn = %req.read_lsn, + ))] + pub async fn get_page( + &self, + req: page_api::GetPageRequest, + ) -> tonic::Result { + // TODO: this needs to split batch requests across shards and reassemble responses into a + // single response. It must also re-split the batch in case the shard map changes. For now, + // just use the first page. + let key = rel_block_to_key( + req.rel, + req.block_numbers + .first() + .copied() + .ok_or_else(|| tonic::Status::invalid_argument("no block numbers provided"))?, + ); + + self.retry + .with(async || { + let stream = self.shards.get_for_key(key).stream().await; + let resp = stream.send(req.clone()).await?; + + if resp.status_code != page_api::GetPageStatusCode::Ok { + return Err(tonic::Status::new( + resp.status_code.into(), + resp.reason.unwrap_or_else(|| String::from("unknown error")), + )); + } + + Ok(resp) + }) + .await + } + + /// Returns the size of a relation, as # of blocks. + #[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))] + pub async fn get_rel_size( + &self, + req: page_api::GetRelSizeRequest, + ) -> tonic::Result { + self.retry + .with(async || { + // Relation metadata is only available on shard 0. + let mut client = self.shards.get_zero().client().await?; + client.get_rel_size(req).await + }) + .await + } + + /// Fetches an SLRU segment. + #[instrument(skip_all, fields(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn))] + pub async fn get_slru_segment( + &self, + req: page_api::GetSlruSegmentRequest, + ) -> tonic::Result { + self.retry + .with(async || { + // SLRU segments are only available on shard 0. + let mut client = self.shards.get_zero().client().await?; + client.get_slru_segment(req).await + }) + .await + } +} + +/// Tracks the tenant's shards. +struct Shards { + /// The shard count. + /// + /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention. + count: ShardCount, + /// The stripe size. Only used for sharded tenants. + stripe_size: ShardStripeSize, + /// Shards by shard index. + /// + /// NB: unsharded tenants use count 0, like `ShardIndex::unsharded()`. + /// + /// INVARIANT: every shard 0..count is present. + /// INVARIANT: shard 0 is always present. + map: HashMap, +} + +impl Shards { + /// Creates a new set of shards based on a shard map. + fn new( + tenant_id: TenantId, + timeline_id: TimelineId, + shard_map: HashMap, + stripe_size: ShardStripeSize, + auth_token: Option, + ) -> anyhow::Result { + let count = match shard_map.len() { + 0 => return Err(anyhow!("no shards provided")), + 1 => ShardCount::new(0), // NB: unsharded tenants use 0, like `ShardIndex::unsharded()` + n if n > u8::MAX as usize => return Err(anyhow!("too many shards: {n}")), + n => ShardCount::new(n as u8), + }; + + let mut map = HashMap::new(); + for (shard_id, url) in shard_map { + // The shard index must match the computed shard count, even for unsharded tenants. + if shard_id.shard_count != count { + return Err(anyhow!("invalid shard index {shard_id}, expected {count}")); + } + // The shard index' number and count must be consistent. + if !shard_id.is_unsharded() && shard_id.shard_number.0 >= shard_id.shard_count.0 { + return Err(anyhow!("invalid shard index {shard_id}")); + } + // The above conditions guarantee that we have all shards 0..count: len() matches count, + // shard number < count, and numbers are unique (via hashmap). + let shard = Shard::new(url, tenant_id, timeline_id, shard_id, auth_token.clone())?; + map.insert(shard_id, shard); + } + + Ok(Self { + count, + stripe_size, + map, + }) + } + + /// Looks up the given shard. + #[allow(clippy::result_large_err)] // TODO: check perf impact + fn get(&self, shard_id: ShardIndex) -> tonic::Result<&Shard> { + self.map + .get(&shard_id) + .ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}"))) + } + + /// Looks up the shard that owns the given key. + fn get_for_key(&self, key: Key) -> &Shard { + let shard_number = key_to_shard_number(self.count, self.stripe_size, &key); + self.get(ShardIndex::new(shard_number, self.count)) + .expect("must exist") + } + + /// Returns shard 0. + fn get_zero(&self) -> &Shard { + self.get(ShardIndex::new(ShardNumber(0), self.count)) + .expect("always present") + } +} + +/// A single shard. +/// +/// TODO: consider separate pools for normal and bulk traffic, with different settings. +struct Shard { + /// Dedicated channel pool for this shard. Shared by all clients/streams in this shard. + _channel_pool: Arc, + /// Unary gRPC client pool for this shard. Uses the shared channel pool. + client_pool: Arc, + /// GetPage stream pool for this shard. Uses a dedicated client pool, but shares the channel + /// pool with unary clients. + stream_pool: Arc, +} + +impl Shard { + /// Creates a new shard. It has its own dedicated resource pools. + fn new( + url: String, + tenant_id: TenantId, + timeline_id: TimelineId, + shard_id: ShardIndex, + auth_token: Option, + ) -> anyhow::Result { + // Sanity-check that the URL uses gRPC. + if PageserverProtocol::from_connstring(&url)? != PageserverProtocol::Grpc { + return Err(anyhow!("invalid shard URL {url}: must use gRPC")); + } + + // Use a common channel pool for all clients, to multiplex unary and stream requests across + // the same TCP connections. The channel pool is unbounded (but client pools are bounded). + let channel_pool = ChannelPool::new(url)?; + + // Dedicated client pool for unary requests. + let client_pool = ClientPool::new( + channel_pool.clone(), + tenant_id, + timeline_id, + shard_id, + auth_token.clone(), + ); + + // Stream pool with dedicated client pool. If this shared a client pool with unary requests, + // long-lived streams could fill up the client pool and starve out unary requests. It shares + // the same underlying channel pool with unary clients though, which is unbounded. + let stream_pool = StreamPool::new(ClientPool::new( + channel_pool.clone(), + tenant_id, + timeline_id, + shard_id, + auth_token, + )); + + Ok(Self { + _channel_pool: channel_pool, + client_pool, + stream_pool, + }) + } + + /// Returns a pooled client for this shard. + async fn client(&self) -> tonic::Result { + self.client_pool + .get() + .await + .map_err(|err| tonic::Status::internal(format!("failed to get client: {err}"))) + } + + /// Returns a pooled stream for this shard. + async fn stream(&self) -> StreamGuard { + self.stream_pool.get().await + } +} diff --git a/pageserver/client_grpc/src/lib.rs b/pageserver/client_grpc/src/lib.rs index c900e1a939..2a59f9868c 100644 --- a/pageserver/client_grpc/src/lib.rs +++ b/pageserver/client_grpc/src/lib.rs @@ -1,14 +1,5 @@ -//! A rich Pageserver gRPC client. This client is more capable than the basic `page_api::Client` -//! gRPC client, and supports: -//! -//! * Sharded tenants across multiple Pageservers. -//! * Pooling of connections, clients, and streams for efficient resource use. -//! * Concurrent use by many callers. -//! * Internal handling of GetPage bidirectional streams. -//! * Automatic retries. -//! * Observability. -//! -//! The client is under development, this package is just a shell. - -#[allow(unused)] +mod client; mod pool; +mod retry; + +pub use client::PageserverClient; diff --git a/pageserver/client_grpc/src/retry.rs b/pageserver/client_grpc/src/retry.rs new file mode 100644 index 0000000000..b0473204d7 --- /dev/null +++ b/pageserver/client_grpc/src/retry.rs @@ -0,0 +1,151 @@ +use std::time::Duration; + +use tokio::time::Instant; +use tracing::{error, info, warn}; + +use utils::backoff::exponential_backoff_duration; + +/// A retry handler for Pageserver gRPC requests. +/// +/// This is used instead of backoff::retry for better control and observability. +pub struct Retry; + +impl Retry { + /// The per-request timeout. + // TODO: tune these, and/or make them configurable. Should we retry forever? + const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); + /// The total timeout across all attempts + const TOTAL_TIMEOUT: Duration = Duration::from_secs(60); + /// The initial backoff duration. + const BASE_BACKOFF: Duration = Duration::from_millis(10); + /// The maximum backoff duration. + const MAX_BACKOFF: Duration = Duration::from_secs(10); + /// If true, log successful requests. For debugging. + const LOG_SUCCESS: bool = false; + + /// Runs the given async closure with timeouts and retries (exponential backoff). Logs errors, + /// using the current tracing span for context. + /// + /// Only certain gRPC status codes are retried, see [`Self::should_retry`]. For default + /// timeouts, see [`Self::REQUEST_TIMEOUT`] and [`Self::TOTAL_TIMEOUT`]. + pub async fn with(&self, mut f: F) -> tonic::Result + where + F: FnMut() -> O, + O: Future>, + { + let started = Instant::now(); + let deadline = started + Self::TOTAL_TIMEOUT; + let mut last_error = None; + let mut retries = 0; + loop { + // Set up a future to wait for the backoff (if any) and run the request with a timeout. + let backoff_and_try = async { + // NB: sleep() always sleeps 1ms, even when given a 0 argument. See: + // https://github.com/tokio-rs/tokio/issues/6866 + if let Some(backoff) = Self::backoff_duration(retries) { + tokio::time::sleep(backoff).await; + } + + let request_started = Instant::now(); + tokio::time::timeout(Self::REQUEST_TIMEOUT, f()) + .await + .map_err(|_| { + tonic::Status::deadline_exceeded(format!( + "request timed out after {:.3}s", + request_started.elapsed().as_secs_f64() + )) + })? + }; + + // Wait for the backoff and request, or bail out if the total timeout is exceeded. + let result = tokio::select! { + result = backoff_and_try => result, + + _ = tokio::time::sleep_until(deadline) => { + let last_error = last_error.unwrap_or_else(|| { + tonic::Status::deadline_exceeded(format!( + "request timed out after {:.3}s", + started.elapsed().as_secs_f64() + )) + }); + error!( + "giving up after {:.3}s and {retries} retries, last error {:?}: {}", + started.elapsed().as_secs_f64(), last_error.code(), last_error.message(), + ); + return Err(last_error); + } + }; + + match result { + // Success, return the result. + Ok(result) => { + if retries > 0 || Self::LOG_SUCCESS { + info!( + "request succeeded after {retries} retries in {:.3}s", + started.elapsed().as_secs_f64(), + ); + } + + return Ok(result); + } + + // Error, retry or bail out. + Err(status) => { + let (code, message) = (status.code(), status.message()); + let attempt = retries + 1; + + if !Self::should_retry(code) { + // NB: include the attempt here too. This isn't necessarily the first + // attempt, because the error may change between attempts. + error!( + "request failed with {code:?}: {message}, not retrying (attempt {attempt})" + ); + return Err(status); + } + + warn!("request failed with {code:?}: {message}, retrying (attempt {attempt})"); + + retries += 1; + last_error = Some(status); + } + } + } + } + + /// Returns the backoff duration for the given retry attempt, or None for no backoff. + fn backoff_duration(retry: usize) -> Option { + let backoff = exponential_backoff_duration( + retry as u32, + Self::BASE_BACKOFF.as_secs_f64(), + Self::MAX_BACKOFF.as_secs_f64(), + ); + (!backoff.is_zero()).then_some(backoff) + } + + /// Returns true if the given status code should be retries. + fn should_retry(code: tonic::Code) -> bool { + match code { + tonic::Code::Ok => panic!("unexpected Ok status code"), + + // These codes are transient, so retry them. + tonic::Code::Aborted => true, + tonic::Code::Cancelled => true, + tonic::Code::DeadlineExceeded => true, // maybe transient slowness + tonic::Code::Internal => true, // maybe transient failure? + tonic::Code::ResourceExhausted => true, + tonic::Code::Unavailable => true, + + // The following codes will like continue to fail, so don't retry. + tonic::Code::AlreadyExists => false, + tonic::Code::DataLoss => false, + tonic::Code::FailedPrecondition => false, + tonic::Code::InvalidArgument => false, + tonic::Code::NotFound => false, + tonic::Code::OutOfRange => false, + tonic::Code::PermissionDenied => false, + tonic::Code::Unauthenticated => false, + tonic::Code::Unimplemented => false, + tonic::Code::Unknown => false, + } + } +} diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index 898a1cb884..e6753f1346 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -602,6 +602,21 @@ impl TryFrom for GetPageStatusCode { } } +impl From for tonic::Code { + fn from(status_code: GetPageStatusCode) -> Self { + use tonic::Code; + + match status_code { + GetPageStatusCode::Unknown => Code::Unknown, + GetPageStatusCode::Ok => Code::Ok, + GetPageStatusCode::NotFound => Code::NotFound, + GetPageStatusCode::InvalidRequest => Code::InvalidArgument, + GetPageStatusCode::InternalError => Code::Internal, + GetPageStatusCode::SlowDown => Code::ResourceExhausted, + } + } +} + // Fetches the size of a relation at a given LSN, as # of blocks. Only valid on shard 0, other // shards will error. #[derive(Clone, Copy, Debug)] diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index e2228f57a8..e404d212b8 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -3481,6 +3481,8 @@ impl GrpcPageServiceHandler { /// NB: errors returned from here are intercepted in get_pages(), and may be converted to a /// GetPageResponse with an appropriate status code to avoid terminating the stream. /// + /// TODO: verify that the requested pages belong to this shard. + /// /// TODO: get_vectored() currently enforces a batch limit of 32. Postgres will typically send /// batches up to effective_io_concurrency = 100. Either we have to accept large batches, or /// split them up in the client or server. diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 403ae15b59..ed6643d641 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -1677,7 +1677,21 @@ impl Service { .collect::>>()?; let safekeepers: HashMap = safekeepers.into_iter().map(|n| (n.get_id(), n)).collect(); - tracing::info!("Loaded {} safekeepers from database.", safekeepers.len()); + let count_policy = |policy| { + safekeepers + .iter() + .filter(|sk| sk.1.scheduling_policy() == policy) + .count() + }; + let active_sk_count = count_policy(SkSchedulingPolicy::Active); + let activating_sk_count = count_policy(SkSchedulingPolicy::Activating); + let pause_sk_count = count_policy(SkSchedulingPolicy::Pause); + let decom_sk_count = count_policy(SkSchedulingPolicy::Decomissioned); + tracing::info!( + "Loaded {} safekeepers from database. Active {active_sk_count}, activating {activating_sk_count}, \ + paused {pause_sk_count}, decomissioned {decom_sk_count}.", + safekeepers.len() + ); metrics::METRICS_REGISTRY .metrics_group .storage_controller_safekeeper_nodes @@ -1969,6 +1983,14 @@ impl Service { } }); + // Check that there is enough safekeepers configured that we can create new timelines + let test_sk_res = this.safekeepers_for_new_timeline().await; + tracing::info!( + timeline_safekeeper_count = config.timeline_safekeeper_count, + timelines_onto_safekeepers = config.timelines_onto_safekeepers, + "viability test result (test timeline creation on safekeepers): {test_sk_res:?}", + ); + Ok(this) }