diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index e6a90fb582..b8ee57bf9f 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -14,9 +14,9 @@ use utils::logging::warn_slow; use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool}; use crate::retry::Retry; -use crate::split::GetPageSplitter; use compute_api::spec::PageserverProtocol; use pageserver_page_api as page_api; +use pageserver_page_api::GetPageSplitter; use utils::id::{TenantId, TimelineId}; use utils::shard::{ShardCount, ShardIndex, ShardNumber, ShardStripeSize}; diff --git a/pageserver/client_grpc/src/lib.rs b/pageserver/client_grpc/src/lib.rs index 14fb3fbd5a..4999fd3d0a 100644 --- a/pageserver/client_grpc/src/lib.rs +++ b/pageserver/client_grpc/src/lib.rs @@ -1,6 +1,5 @@ mod client; mod pool; mod retry; -mod split; pub use client::{PageserverClient, ShardSpec}; diff --git a/pageserver/page_api/src/lib.rs b/pageserver/page_api/src/lib.rs index e78f6ce206..b44df6337f 100644 --- a/pageserver/page_api/src/lib.rs +++ b/pageserver/page_api/src/lib.rs @@ -19,7 +19,9 @@ pub mod proto { } mod client; -pub use client::Client; mod model; +mod split; +pub use client::Client; pub use model::*; +pub use split::GetPageSplitter; diff --git a/pageserver/client_grpc/src/split.rs b/pageserver/page_api/src/split.rs similarity index 91% rename from pageserver/client_grpc/src/split.rs rename to pageserver/page_api/src/split.rs index 8631638686..5ecc90a166 100644 --- a/pageserver/client_grpc/src/split.rs +++ b/pageserver/page_api/src/split.rs @@ -3,18 +3,18 @@ use std::collections::HashMap; use anyhow::anyhow; use bytes::Bytes; +use crate::model::*; use pageserver_api::key::rel_block_to_key; use pageserver_api::shard::key_to_shard_number; -use pageserver_page_api as page_api; use utils::shard::{ShardCount, ShardIndex, ShardStripeSize}; /// Splits GetPageRequests that straddle shard boundaries and assembles the responses. /// TODO: add tests for this. pub struct GetPageSplitter { /// Split requests by shard index. - requests: HashMap, + requests: HashMap, /// The response being assembled. Preallocated with empty pages, to be filled in. - response: page_api::GetPageResponse, + response: GetPageResponse, /// Maps the offset in `request.block_numbers` and `response.pages` to the owning shard. Used /// to assemble the response pages in the same order as the original request. block_shards: Vec, @@ -24,7 +24,7 @@ impl GetPageSplitter { /// Checks if the given request only touches a single shard, and returns the shard ID. This is /// the common case, so we check first in order to avoid unnecessary allocations and overhead. pub fn for_single_shard( - req: &page_api::GetPageRequest, + req: &GetPageRequest, count: ShardCount, stripe_size: Option, ) -> anyhow::Result> { @@ -57,7 +57,7 @@ impl GetPageSplitter { /// Splits the given request. pub fn split( - req: page_api::GetPageRequest, + req: GetPageRequest, count: ShardCount, stripe_size: Option, ) -> anyhow::Result { @@ -84,7 +84,7 @@ impl GetPageSplitter { requests .entry(shard_id) - .or_insert_with(|| page_api::GetPageRequest { + .or_insert_with(|| GetPageRequest { request_id: req.request_id, request_class: req.request_class, rel: req.rel, @@ -98,16 +98,16 @@ impl GetPageSplitter { // Construct a response to be populated by shard responses. Preallocate empty page slots // with the expected block numbers. - let response = page_api::GetPageResponse { + let response = GetPageResponse { request_id: req.request_id, - status_code: page_api::GetPageStatusCode::Ok, + status_code: GetPageStatusCode::Ok, reason: None, rel: req.rel, pages: req .block_numbers .into_iter() .map(|block_number| { - page_api::Page { + Page { block_number, image: Bytes::new(), // empty page slot to be filled in } @@ -123,9 +123,7 @@ impl GetPageSplitter { } /// Drains the per-shard requests, moving them out of the splitter to avoid extra allocations. - pub fn drain_requests( - &mut self, - ) -> impl Iterator { + pub fn drain_requests(&mut self) -> impl Iterator { self.requests.drain() } @@ -135,10 +133,10 @@ impl GetPageSplitter { pub fn add_response( &mut self, shard_id: ShardIndex, - response: page_api::GetPageResponse, + response: GetPageResponse, ) -> anyhow::Result<()> { // The caller should already have converted status codes into tonic::Status. - if response.status_code != page_api::GetPageStatusCode::Ok { + if response.status_code != GetPageStatusCode::Ok { return Err(anyhow!( "unexpected non-OK response for shard {shard_id}: {} {}", response.status_code, @@ -209,7 +207,7 @@ impl GetPageSplitter { /// Fetches the final, assembled response. #[allow(clippy::result_large_err)] - pub fn get_response(self) -> anyhow::Result { + pub fn get_response(self) -> anyhow::Result { // Check that the response is complete. for (i, page) in self.response.pages.iter().enumerate() { if page.image.is_empty() { diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index b3bc42a55e..f16046657a 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -16,7 +16,8 @@ use anyhow::{Context as _, bail}; use bytes::{Buf as _, BufMut as _, BytesMut}; use chrono::Utc; use futures::future::BoxFuture; -use futures::{FutureExt, Stream}; +use futures::stream::FuturesUnordered; +use futures::{FutureExt, Stream, StreamExt as _}; use itertools::Itertools; use jsonwebtoken::TokenData; use once_cell::sync::OnceCell; @@ -35,8 +36,8 @@ use pageserver_api::pagestream_api::{ }; use pageserver_api::reltag::SlruKind; use pageserver_api::shard::TenantShardId; -use pageserver_page_api as page_api; use pageserver_page_api::proto; +use pageserver_page_api::{self as page_api, GetPageSplitter}; use postgres_backend::{ AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error, }; @@ -3423,18 +3424,6 @@ impl GrpcPageServiceHandler { Ok(CancellableTask { task, cancel }) } - /// Errors if the request is executed on a non-zero shard. Only shard 0 has a complete view of - /// relations and their sizes, as well as SLRU segments and similar data. - #[allow(clippy::result_large_err)] - fn ensure_shard_zero(timeline: &Handle) -> Result<(), tonic::Status> { - match timeline.get_shard_index().shard_number.0 { - 0 => Ok(()), - shard => Err(tonic::Status::invalid_argument(format!( - "request must execute on shard zero (is shard {shard})", - ))), - } - } - /// Generates a PagestreamRequest header from a ReadLsn and request ID. fn make_hdr( read_lsn: page_api::ReadLsn, @@ -3449,30 +3438,72 @@ impl GrpcPageServiceHandler { } } - /// Acquires a timeline handle for the given request. + /// Acquires a timeline handle for the given request. The shard index must match a local shard. /// - /// TODO: during shard splits, the compute may still be sending requests to the parent shard - /// until the entire split is committed and the compute is notified. Consider installing a - /// temporary shard router from the parent to the children while the split is in progress. - /// - /// TODO: consider moving this to a middleware layer; all requests need it. Needs to manage - /// the TimelineHandles lifecycle. - /// - /// TODO: untangle acquisition from TenantManagerWrapper::resolve() and Cache::get(), to avoid - /// the unnecessary overhead. + /// NB: this will fail during shard splits, see comment on [`Self::maybe_split_get_page`]. async fn get_request_timeline( &self, req: &tonic::Request, ) -> Result, GetActiveTimelineError> { - let ttid = *extract::(req); + let TenantTimelineId { + tenant_id, + timeline_id, + } = *extract::(req); let shard_index = *extract::(req); - let shard_selector = ShardSelector::Known(shard_index); + // TODO: untangle acquisition from TenantManagerWrapper::resolve() and Cache::get(), to + // avoid the unnecessary overhead. TimelineHandles::new(self.tenant_manager.clone()) - .get(ttid.tenant_id, ttid.timeline_id, shard_selector) + .get(tenant_id, timeline_id, ShardSelector::Known(shard_index)) .await } + /// Acquires a timeline handle for the given request, which must be for shard zero. Most + /// metadata requests are only valid on shard zero. + /// + /// NB: during an ongoing shard split, the compute will keep talking to the parent shard until + /// the split is committed, but the parent shard may have been removed in the meanwhile. In that + /// case, we reroute the request to the new child shard. See [`Self::maybe_split_get_page`]. + /// + /// TODO: revamp the split protocol to avoid this child routing. + async fn get_request_timeline_shard_zero( + &self, + req: &tonic::Request, + ) -> Result, tonic::Status> { + let TenantTimelineId { + tenant_id, + timeline_id, + } = *extract::(req); + let shard_index = *extract::(req); + + if shard_index.shard_number.0 != 0 { + return Err(tonic::Status::invalid_argument(format!( + "request only valid on shard zero (requested shard {shard_index})", + ))); + } + + // TODO: untangle acquisition from TenantManagerWrapper::resolve() and Cache::get(), to + // avoid the unnecessary overhead. + let mut handles = TimelineHandles::new(self.tenant_manager.clone()); + match handles + .get(tenant_id, timeline_id, ShardSelector::Known(shard_index)) + .await + { + Ok(timeline) => Ok(timeline), + Err(err) => { + // We may be in the middle of a shard split. Try to find a child shard 0. + if let Ok(timeline) = handles + .get(tenant_id, timeline_id, ShardSelector::Zero) + .await + && timeline.get_shard_index().shard_count > shard_index.shard_count + { + return Ok(timeline); + } + Err(err.into()) + } + } + } + /// Starts a SmgrOpTimer at received_at, throttles the request, and records execution start. /// Only errors if the timeline is shutting down. /// @@ -3502,28 +3533,22 @@ impl GrpcPageServiceHandler { /// TODO: get_vectored() currently enforces a batch limit of 32. Postgres will typically send /// batches up to effective_io_concurrency = 100. Either we have to accept large batches, or /// split them up in the client or server. - #[instrument(skip_all, fields(req_id, rel, blkno, blks, req_lsn, mod_lsn))] + #[instrument(skip_all, fields( + req_id = %req.request_id, + rel = %req.rel, + blkno = %req.block_numbers[0], + blks = %req.block_numbers.len(), + lsn = %req.read_lsn, + ))] async fn get_page( ctx: &RequestContext, - timeline: &WeakHandle, - req: proto::GetPageRequest, + timeline: Handle, + req: page_api::GetPageRequest, io_concurrency: IoConcurrency, - ) -> Result { - let received_at = Instant::now(); - let timeline = timeline.upgrade()?; + received_at: Instant, + ) -> Result { let ctx = ctx.with_scope_page_service_pagestream(&timeline); - // Validate the request, decorate the span, and convert it to a Pagestream request. - let req = page_api::GetPageRequest::try_from(req)?; - - span_record!( - req_id = %req.request_id, - rel = %req.rel, - blkno = %req.block_numbers[0], - blks = %req.block_numbers.len(), - lsn = %req.read_lsn, - ); - for &blkno in &req.block_numbers { let shard = timeline.get_shard_identity(); let key = rel_block_to_key(req.rel, blkno); @@ -3611,7 +3636,95 @@ impl GrpcPageServiceHandler { }; } - Ok(resp.into()) + Ok(resp) + } + + /// Processes a GetPage request when there is a potential shard split in progress. We have to + /// reroute the request to any local child shards, and split batch requests that straddle + /// multiple child shards. + /// + /// Parent shards are split and removed incrementally (there may be many parent shards when + /// splitting an already-sharded tenant), but the compute is only notified once the overall + /// split commits, which can take several minutes. In the meanwhile, the compute will be sending + /// requests to the parent shards. + /// + /// TODO: add test infrastructure to provoke this situation frequently and for long periods of + /// time, to properly exercise it. + /// + /// TODO: revamp the split protocol to avoid this, e.g.: + /// * Keep the parent shard until the split commits and the compute is notified. + /// * Notify the compute about each subsplit. + /// * Return an error that updates the compute's shard map. + #[instrument(skip_all)] + #[allow(clippy::too_many_arguments)] + async fn maybe_split_get_page( + ctx: &RequestContext, + handles: &mut TimelineHandles, + tenant_id: TenantId, + timeline_id: TimelineId, + parent: ShardIndex, + req: page_api::GetPageRequest, + io_concurrency: IoConcurrency, + received_at: Instant, + ) -> Result { + // Check the first page to see if we have any child shards at all. Otherwise, the compute is + // just talking to the wrong Pageserver. If the parent has been split, the shard now owning + // the page must have a higher shard count. + let timeline = handles + .get( + tenant_id, + timeline_id, + ShardSelector::Page(rel_block_to_key(req.rel, req.block_numbers[0])), + ) + .await?; + + let shard_id = timeline.get_shard_identity(); + if shard_id.count <= parent.shard_count { + return Err(HandleUpgradeError::ShutDown.into()); // emulate original error + } + + // Fast path: the request fits in a single shard. + if let Some(shard_index) = + GetPageSplitter::for_single_shard(&req, shard_id.count, Some(shard_id.stripe_size)) + .map_err(|err| tonic::Status::internal(err.to_string()))? + { + // We got the shard ID from the first page, so these must be equal. + assert_eq!(shard_index.shard_number, shard_id.number); + assert_eq!(shard_index.shard_count, shard_id.count); + return Self::get_page(ctx, timeline, req, io_concurrency, received_at).await; + } + + // The request spans multiple shards; split it and dispatch parallel requests. All pages + // were originally in the parent shard, and during a split all children are local, so we + // expect to find local shards for all pages. + let mut splitter = GetPageSplitter::split(req, shard_id.count, Some(shard_id.stripe_size)) + .map_err(|err| tonic::Status::internal(err.to_string()))?; + + let mut shard_requests = FuturesUnordered::new(); + for (shard_index, shard_req) in splitter.drain_requests() { + let timeline = handles + .get(tenant_id, timeline_id, ShardSelector::Known(shard_index)) + .await?; + let future = Self::get_page( + ctx, + timeline, + shard_req, + io_concurrency.clone(), + received_at, + ) + .map(move |result| result.map(|resp| (shard_index, resp))); + shard_requests.push(future); + } + + while let Some((shard_index, shard_response)) = shard_requests.next().await.transpose()? { + splitter + .add_response(shard_index, shard_response) + .map_err(|err| tonic::Status::internal(err.to_string()))?; + } + + splitter + .get_response() + .map_err(|err| tonic::Status::internal(err.to_string())) } } @@ -3640,11 +3753,10 @@ impl proto::PageService for GrpcPageServiceHandler { // to be the sweet spot where throughput is saturated. const CHUNK_SIZE: usize = 256 * 1024; - let timeline = self.get_request_timeline(&req).await?; + let timeline = self.get_request_timeline_shard_zero(&req).await?; let ctx = self.ctx.with_scope_timeline(&timeline); // Validate the request and decorate the span. - Self::ensure_shard_zero(&timeline)?; if timeline.is_archived() == Some(true) { return Err(tonic::Status::failed_precondition("timeline is archived")); } @@ -3760,11 +3872,10 @@ impl proto::PageService for GrpcPageServiceHandler { req: tonic::Request, ) -> Result, tonic::Status> { let received_at = extract::(&req).0; - let timeline = self.get_request_timeline(&req).await?; + let timeline = self.get_request_timeline_shard_zero(&req).await?; let ctx = self.ctx.with_scope_page_service_pagestream(&timeline); // Validate the request, decorate the span, and convert it to a Pagestream request. - Self::ensure_shard_zero(&timeline)?; let req: page_api::GetDbSizeRequest = req.into_inner().try_into()?; span_record!(db_oid=%req.db_oid, lsn=%req.read_lsn); @@ -3793,14 +3904,29 @@ impl proto::PageService for GrpcPageServiceHandler { req: tonic::Request>, ) -> Result, tonic::Status> { // Extract the timeline from the request and check that it exists. - let ttid = *extract::(&req); + // + // NB: during shard splits, the compute may still send requests to the parent shard. We'll + // reroute requests to the child shards below, but we also detect the common cases here + // where either the shard exists or no shards exist at all. If we have a child shard, we + // can't acquire a weak handle because we don't know which child shard to use yet. + let TenantTimelineId { + tenant_id, + timeline_id, + } = *extract::(&req); let shard_index = *extract::(&req); - let shard_selector = ShardSelector::Known(shard_index); let mut handles = TimelineHandles::new(self.tenant_manager.clone()); - handles - .get(ttid.tenant_id, ttid.timeline_id, shard_selector) - .await?; + let timeline = match handles + .get(tenant_id, timeline_id, ShardSelector::Known(shard_index)) + .await + { + // The timeline shard exists. Keep a weak handle to reuse for each request. + Ok(timeline) => Some(timeline.downgrade()), + // The shard doesn't exist, but a child shard does. We'll reroute requests later. + Err(_) if self.tenant_manager.has_child_shard(tenant_id, shard_index) => None, + // Failed to fetch the timeline, and no child shard exists. Error out. + Err(err) => return Err(err.into()), + }; // Spawn an IoConcurrency sidecar, if enabled. let gate_guard = self @@ -3817,11 +3943,9 @@ impl proto::PageService for GrpcPageServiceHandler { let mut reqs = req.into_inner(); let resps = async_stream::try_stream! { - let timeline = handles - .get(ttid.tenant_id, ttid.timeline_id, shard_selector) - .await? - .downgrade(); loop { + // Wait for the next client request. + // // NB: Tonic considers the entire stream to be an in-flight request and will wait // for it to complete before shutting down. React to cancellation between requests. let req = tokio::select! { @@ -3834,16 +3958,44 @@ impl proto::PageService for GrpcPageServiceHandler { Err(err) => Err(err), }, }?; + + let received_at = Instant::now(); let req_id = req.request_id.map(page_api::RequestID::from).unwrap_or_default(); - let result = Self::get_page(&ctx, &timeline, req, io_concurrency.clone()) + + // Process the request, using a closure to capture errors. + let process_request = async || { + let req = page_api::GetPageRequest::try_from(req)?; + + // Fast path: use the pre-acquired timeline handle. + if let Some(Ok(timeline)) = timeline.as_ref().map(|t| t.upgrade()) { + return Self::get_page(&ctx, timeline, req, io_concurrency.clone(), received_at) + .instrument(span.clone()) // propagate request span + .await + } + + // The timeline handle is stale. During shard splits, the compute may still be + // sending requests to the parent shard. Try to re-route requests to the child + // shards, and split any batch requests that straddle multiple child shards. + Self::maybe_split_get_page( + &ctx, + &mut handles, + tenant_id, + timeline_id, + shard_index, + req, + io_concurrency.clone(), + received_at, + ) .instrument(span.clone()) // propagate request span - .await; - yield match result { - Ok(resp) => resp, - // Convert per-request errors to GetPageResponses as appropriate, or terminate - // the stream with a tonic::Status. Log the error regardless, since - // ObservabilityLayer can't automatically log stream errors. + .await + }; + + // Return the response. Convert per-request errors to GetPageResponses if + // appropriate, or terminate the stream with a tonic::Status. + yield match process_request().await { + Ok(resp) => resp.into(), Err(status) => { + // Log the error, since ObservabilityLayer won't see stream errors. // TODO: it would be nice if we could propagate the get_page() fields here. span.in_scope(|| { warn!("request failed with {:?}: {}", status.code(), status.message()); @@ -3863,11 +4015,10 @@ impl proto::PageService for GrpcPageServiceHandler { req: tonic::Request, ) -> Result, tonic::Status> { let received_at = extract::(&req).0; - let timeline = self.get_request_timeline(&req).await?; + let timeline = self.get_request_timeline_shard_zero(&req).await?; let ctx = self.ctx.with_scope_page_service_pagestream(&timeline); // Validate the request, decorate the span, and convert it to a Pagestream request. - Self::ensure_shard_zero(&timeline)?; let req: page_api::GetRelSizeRequest = req.into_inner().try_into()?; let allow_missing = req.allow_missing; @@ -3900,11 +4051,10 @@ impl proto::PageService for GrpcPageServiceHandler { req: tonic::Request, ) -> Result, tonic::Status> { let received_at = extract::(&req).0; - let timeline = self.get_request_timeline(&req).await?; + let timeline = self.get_request_timeline_shard_zero(&req).await?; let ctx = self.ctx.with_scope_page_service_pagestream(&timeline); // Validate the request, decorate the span, and convert it to a Pagestream request. - Self::ensure_shard_zero(&timeline)?; let req: page_api::GetSlruSegmentRequest = req.into_inner().try_into()?; span_record!(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn); @@ -3934,6 +4084,10 @@ impl proto::PageService for GrpcPageServiceHandler { &self, req: tonic::Request, ) -> Result, tonic::Status> { + // TODO: this won't work during shard splits, as the request is directed at a specific shard + // but the parent shard is removed before the split commits and the compute is notified + // (which can take several minutes for large tenants). That's also the case for the libpq + // implementation, so we keep the behavior for now. let timeline = self.get_request_timeline(&req).await?; let ctx = self.ctx.with_scope_timeline(&timeline); diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 4432b4bba8..0feba5e9c8 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -826,6 +826,18 @@ impl TenantManager { peek_slot.is_some() } + /// Returns whether a local shard exists that's a child of the given tenant shard. Note that + /// this just checks for any shard with a larger shard count, and it may not be a direct child + /// of the given shard (their keyspace may not overlap). + pub(crate) fn has_child_shard(&self, tenant_id: TenantId, shard_index: ShardIndex) -> bool { + match &*self.tenants.read().unwrap() { + TenantsMap::Initializing => false, + TenantsMap::Open(slots) | TenantsMap::ShuttingDown(slots) => slots + .range(TenantShardId::tenant_range(tenant_id)) + .any(|(tsid, _)| tsid.shard_count > shard_index.shard_count), + } + } + #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))] pub(crate) async fn upsert_location( &self, @@ -1524,9 +1536,10 @@ impl TenantManager { // Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant // // TODO: keeping the parent as InProgress while spawning the children causes read - // unavailability, as we can't acquire a timeline handle for it. The parent should be - // available for reads until the children are ready -- potentially until *all* subsplits - // across all parent shards are complete and the compute has been notified. See: + // unavailability, as we can't acquire a new timeline handle for it (existing handles appear + // to still work though, even downgraded ones). The parent should be available for reads + // until the children are ready -- potentially until *all* subsplits across all parent + // shards are complete and the compute has been notified. See: // . drop(tenant); let mut parent_slot_guard =