diff --git a/libs/pageserver_api/src/shard.rs b/libs/pageserver_api/src/shard.rs index e259742c68..ec4c6cd2fc 100644 --- a/libs/pageserver_api/src/shard.rs +++ b/libs/pageserver_api/src/shard.rs @@ -88,6 +88,12 @@ impl TenantShardId { pub fn is_unsharded(&self) -> bool { self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0) } + pub fn to_index(&self) -> ShardIndex { + ShardIndex { + shard_number: self.shard_number, + shard_count: self.shard_count, + } + } } /// Formatting helper diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 9b4b333a92..a296dde07d 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -13,7 +13,10 @@ use anyhow::Context; use async_compression::tokio::write::GzipEncoder; use bytes::Buf; use bytes::Bytes; +use futures::stream::FuturesUnordered; use futures::Stream; +use futures::StreamExt; +use pageserver_api::key::Key; use pageserver_api::models::TenantState; use pageserver_api::models::{ PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, @@ -21,11 +24,14 @@ use pageserver_api::models::{ PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse, PagestreamNblocksRequest, PagestreamNblocksResponse, }; +use pageserver_api::shard::ShardIndex; +use pageserver_api::shard::{ShardCount, ShardNumber}; use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, QueryError}; use pq_proto::framed::ConnectionError; use pq_proto::FeStartupPacket; use pq_proto::{BeMessage, FeMessage, RowDescriptor}; use std::borrow::Cow; +use std::collections::HashMap; use std::io; use std::net::TcpListener; use std::pin::pin; @@ -40,6 +46,7 @@ use tokio_util::sync::CancellationToken; use tracing::field; use tracing::*; use utils::id::ConnectionId; +use utils::sync::gate::GateGuard; use utils::{ auth::{Claims, Scope, SwappableJwtAuth}, id::{TenantId, TimelineId}, @@ -274,6 +281,13 @@ async fn page_service_conn_main( } } +/// While a handler holds a reference to a Timeline, it also holds a the +/// timeline's Gate open. +struct HandlerTimeline { + timeline: Arc, + _guard: GateGuard, +} + struct PageServerHandler { _conf: &'static PageServerConf, broker_client: storage_broker::BrokerClientChannel, @@ -285,6 +299,14 @@ struct PageServerHandler { /// For each query received over the connection, /// `process_query` creates a child context from this one. connection_ctx: RequestContext, + + /// See [`Self::cache_timeline`] for usage. + /// + /// Note on size: the typical size of this map is 1. The largest size we expect + /// to see is the number of shards divided by the number of pageservers (typically < 2), + /// or the ratio used when splitting shards (i.e. how many children created from one) + /// parent shard, where a "large" number might be ~8. + shard_timelines: HashMap, } #[derive(thiserror::Error, Debug)] @@ -358,13 +380,46 @@ impl PageServerHandler { auth, claims: None, connection_ctx, + shard_timelines: HashMap::new(), } } - /// Wrap PostgresBackend::flush to respect our CancellationToken: it is important to use - /// this rather than naked flush() in order to shut down promptly. Without this, we would - /// block shutdown of a tenant if a postgres client was failing to consume bytes we send - /// in the flush. + /// Analogous to calling cancelled() on a Timeline's cancellation token: waits for cancellation. + /// + /// We use many Timeline objects, and hold GateGuards on all of them. We must therefore respect + /// all of their cancellation tokens. + async fn timeline_cancelled(&self) { + // A short wait before we expend the cycles to walk our timeline map. This avoids incurring + // that cost every time we check for cancellation. + tokio::time::sleep(Duration::from_millis(10)).await; + + // This function is never called concurrently with code that adds timelines to shard_timelines, + // which is enforced by the borrow checker (the future returned by this function carries the + // immutable &self). So it's fine to evaluate shard_timelines after the sleep, we don't risk + // missing any inserts to the map. + + let mut futs = self + .shard_timelines + .values() + .map(|ht| ht.timeline.cancel.cancelled()) + .collect::>(); + + futs.next().await; + } + + /// Analogous to calling is_cancelled() on a Timeline's cancellation token + fn timeline_is_cancelled(&self) -> bool { + self.shard_timelines + .values() + .any(|ht| ht.timeline.cancel.is_cancelled() || ht.timeline.is_stopping()) + } + + /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in + /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect + /// cancellation if there aren't any timelines in the cache. + /// + /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the + /// timeline cancellation token. async fn flush_cancellable( &self, pgb: &mut PostgresBackend, @@ -377,6 +432,9 @@ impl PageServerHandler { flush_r = pgb.flush() => { Ok(flush_r?) }, + _ = self.timeline_cancelled() => { + Err(QueryError::Shutdown) + } _ = cancel.cancelled() => { Err(QueryError::Shutdown) } @@ -452,7 +510,7 @@ impl PageServerHandler { #[instrument(skip_all)] async fn handle_pagerequests( - &self, + &mut self, pgb: &mut PostgresBackend, tenant_id: TenantId, timeline_id: TimelineId, @@ -463,10 +521,6 @@ impl PageServerHandler { { debug_assert_current_span_has_tenant_and_timeline_id(); - // Note that since one connection may contain getpage requests that target different - // shards (e.g. during splitting when the compute is not yet aware of the split), the tenant - // that we look up here may not be the one that serves all the actual requests: we will double - // check the mapping of key->shard later before calling into Timeline for getpage requests. let tenant = mgr::get_active_tenant_with_timeout( tenant_id, ShardSelector::First, @@ -487,19 +541,9 @@ impl PageServerHandler { None }; - // Check that the timeline exists - let timeline = tenant - .get_timeline(timeline_id, true) - .map_err(|e| QueryError::NotFound(format!("{e}").into()))?; - - // Avoid starting new requests if the timeline has already started shutting down, - // and block timeline shutdown until this request is complete, or drops out due - // to cancellation. - let _timeline_guard = timeline.gate.enter().map_err(|_| QueryError::Shutdown)?; - // switch client to COPYBOTH pgb.write_message_noflush(&BeMessage::CopyBothResponse)?; - self.flush_cancellable(pgb, &timeline.cancel).await?; + self.flush_cancellable(pgb, &tenant.cancel).await?; let metrics = metrics::SmgrQueryTimePerTimeline::new(&tenant_id, &timeline_id); @@ -507,7 +551,7 @@ impl PageServerHandler { let msg = tokio::select! { biased; - _ = timeline.cancel.cancelled() => { + _ = self.timeline_cancelled() => { // We were requested to shut down. info!("shutdown request received in page handler"); return Err(QueryError::Shutdown) @@ -544,7 +588,7 @@ impl PageServerHandler { let _timer = metrics.start_timer(metrics::SmgrQueryType::GetRelExists); let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.lsn); ( - self.handle_get_rel_exists_request(&timeline, &req, &ctx) + self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx) .instrument(span.clone()) .await, span, @@ -554,7 +598,7 @@ impl PageServerHandler { let _timer = metrics.start_timer(metrics::SmgrQueryType::GetRelSize); let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.lsn); ( - self.handle_get_nblocks_request(&timeline, &req, &ctx) + self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx) .instrument(span.clone()) .await, span, @@ -564,7 +608,7 @@ impl PageServerHandler { let _timer = metrics.start_timer(metrics::SmgrQueryType::GetPageAtLsn); let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn); ( - self.handle_get_page_at_lsn_request(&timeline, &req, &ctx) + self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx) .instrument(span.clone()) .await, span, @@ -574,7 +618,7 @@ impl PageServerHandler { let _timer = metrics.start_timer(metrics::SmgrQueryType::GetDbSize); let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.lsn); ( - self.handle_db_size_request(&timeline, &req, &ctx) + self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx) .instrument(span.clone()) .await, span, @@ -594,7 +638,7 @@ impl PageServerHandler { span.in_scope(|| info!("handler requested reconnect: {reason}")); return Err(QueryError::Reconnect); } - Err(e) if timeline.cancel.is_cancelled() || timeline.is_stopping() => { + Err(e) if self.timeline_is_cancelled() => { // This branch accomodates code within request handlers that returns an anyhow::Error instead of a clean // shutdown error, this may be buried inside a PageReconstructError::Other for example. // @@ -617,7 +661,7 @@ impl PageServerHandler { }); pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?; - self.flush_cancellable(pgb, &timeline.cancel).await?; + self.flush_cancellable(pgb, &tenant.cancel).await?; } } } @@ -814,11 +858,14 @@ impl PageServerHandler { } async fn handle_get_rel_exists_request( - &self, - timeline: &Timeline, + &mut self, + tenant_id: TenantId, + timeline_id: TimelineId, req: &PagestreamExistsRequest, ctx: &RequestContext, ) -> Result { + let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?; + let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx) @@ -834,11 +881,13 @@ impl PageServerHandler { } async fn handle_get_nblocks_request( - &self, - timeline: &Timeline, + &mut self, + tenant_id: TenantId, + timeline_id: TimelineId, req: &PagestreamNblocksRequest, ctx: &RequestContext, ) -> Result { + let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?; let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx) @@ -854,11 +903,13 @@ impl PageServerHandler { } async fn handle_db_size_request( - &self, - timeline: &Timeline, + &mut self, + tenant_id: TenantId, + timeline_id: TimelineId, req: &PagestreamDbSizeRequest, ctx: &RequestContext, ) -> Result { + let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?; let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx) @@ -880,16 +931,160 @@ impl PageServerHandler { })) } - async fn do_handle_get_page_at_lsn_request( - &self, - timeline: &Timeline, + /// For most getpage requests, we will already have a Timeline to serve the request: this function + /// looks up such a Timeline synchronously and without touching any global state. + fn get_cached_timeline_for_page( + &mut self, + req: &PagestreamGetPageRequest, + ) -> Result<&Arc, Key> { + let key = if let Some((first_idx, first_timeline)) = self.shard_timelines.iter().next() { + // Fastest path: single sharded case + if first_idx.shard_count < ShardCount(2) { + return Ok(&first_timeline.timeline); + } + + let key = rel_block_to_key(req.rel, req.blkno); + let shard_num = first_timeline + .timeline + .get_shard_identity() + .get_shard_number(&key); + + // Fast path: matched the first timeline in our local handler map. This case is common if + // only one shard per tenant is attached to this pageserver. + if first_timeline.timeline.get_shard_identity().number == shard_num { + return Ok(&first_timeline.timeline); + } + + let shard_index = ShardIndex { + shard_number: shard_num, + shard_count: first_timeline.timeline.get_shard_identity().count, + }; + + // Fast-ish path: timeline is in the connection handler's local cache + if let Some(found) = self.shard_timelines.get(&shard_index) { + return Ok(&found.timeline); + } + + key + } else { + rel_block_to_key(req.rel, req.blkno) + }; + + Err(key) + } + + /// Having looked up the [`Timeline`] instance for a particular shard, cache it to enable + /// use in future requests without having to traverse [`crate::tenant::mgr::TenantManager`] + /// again. + /// + /// Note that all the Timelines in this cache are for the same timeline_id: they're differ + /// in which shard they belong to. When we serve a getpage@lsn request, we choose a shard + /// based on key. + /// + /// The typical size of this cache is 1, as we generally create shards to distribute work + /// across pageservers, so don't tend to have multiple shards for the same tenant on the + /// same pageserver. + fn cache_timeline( + &mut self, + timeline: Arc, + ) -> Result<&Arc, GetActiveTimelineError> { + let gate_guard = timeline + .gate + .enter() + .map_err(|_| GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled))?; + + let shard_index = timeline.tenant_shard_id.to_index(); + let entry = self + .shard_timelines + .entry(shard_index) + .or_insert(HandlerTimeline { + timeline, + _guard: gate_guard, + }); + + Ok(&entry.timeline) + } + + /// If [`Self::get_cached_timeline_for_page`] missed, then this function is used to populate the cache with + /// a Timeline to serve requests for this key, if such a Timeline is present on this pageserver. If no such + /// Timeline is found, then we will return an error (this indicates that the client is talking to the wrong node). + async fn load_timeline_for_page( + &mut self, + tenant_id: TenantId, + timeline_id: TimelineId, + key: Key, + ) -> anyhow::Result<&Arc, GetActiveTimelineError> { + // Slow path: we must call out to the TenantManager to find the timeline for this Key + let timeline = self + .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Page(key)) + .await?; + + self.cache_timeline(timeline) + } + + async fn get_timeline_shard_zero( + &mut self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> anyhow::Result<&Arc, GetActiveTimelineError> { + // This is a borrow-checker workaround: we can't return from inside of the `if let Some` because + // that would be an immutable-borrow-self return, whereas later in the function we will use a mutable + // ref to salf. So instead, we first build a bool, and then return while not borrowing self. + let have_cached = if let Some((idx, _tl)) = self.shard_timelines.iter().next() { + idx.shard_number == ShardNumber(0) + } else { + false + }; + + if have_cached { + let entry = self.shard_timelines.iter().next().unwrap(); + Ok(&entry.1.timeline) + } else { + let timeline = self + .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero) + .await?; + Ok(self.cache_timeline(timeline)?) + } + } + + async fn handle_get_page_at_lsn_request( + &mut self, + tenant_id: TenantId, + timeline_id: TimelineId, req: &PagestreamGetPageRequest, ctx: &RequestContext, ) -> Result { + let timeline = match self.get_cached_timeline_for_page(req) { + Ok(tl) => tl, + Err(key) => { + match self + .load_timeline_for_page(tenant_id, timeline_id, key) + .await + { + Ok(t) => t, + Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => { + // We already know this tenant exists in general, because we resolved it at + // start of connection. Getting a NotFound here indicates that the shard containing + // the requested page is not present on this node: the client's knowledge of shard->pageserver + // mapping is out of date. + // + // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via + // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration + // and talk to a different pageserver. + return Err(PageStreamError::Reconnect( + "getpage@lsn request routed to wrong shard".into(), + )); + } + Err(e) => return Err(e.into()), + } + } + }; + let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx) .await?; + let page = timeline .get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx) .await?; @@ -899,60 +1094,6 @@ impl PageServerHandler { })) } - async fn handle_get_page_at_lsn_request( - &self, - timeline: &Timeline, - req: &PagestreamGetPageRequest, - ctx: &RequestContext, - ) -> Result { - let key = rel_block_to_key(req.rel, req.blkno); - if timeline.get_shard_identity().is_key_local(&key) { - self.do_handle_get_page_at_lsn_request(timeline, req, ctx) - .await - } else { - // The Tenant shard we looked up at connection start does not hold this particular - // key: look for other shards in this tenant. This scenario occurs if a pageserver - // has multiple shards for the same tenant. - // - // TODO: optimize this (https://github.com/neondatabase/neon/pull/6037) - let timeline = match self - .get_active_tenant_timeline( - timeline.tenant_shard_id.tenant_id, - timeline.timeline_id, - ShardSelector::Page(key), - ) - .await - { - Ok(t) => t, - Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => { - // We already know this tenant exists in general, because we resolved it at - // start of connection. Getting a NotFound here indicates that the shard containing - // the requested page is not present on this node: the client's knowledge of shard->pageserver - // mapping is out of date. - tracing::info!("Page request routed to wrong shard: my identity {:?}, should go to shard {}, key {}", - timeline.get_shard_identity(), timeline.get_shard_identity().get_shard_number(&key).0, key); - // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via - // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration - // and talk to a different pageserver. - return Err(PageStreamError::Reconnect( - "getpage@lsn request routed to wrong shard".into(), - )); - } - Err(e) => return Err(e.into()), - }; - - // Take a GateGuard for the duration of this request. If we were using our main Timeline object, - // the GateGuard was already held over the whole connection. - let _timeline_guard = timeline - .gate - .enter() - .map_err(|_| PageStreamError::Shutdown)?; - - self.do_handle_get_page_at_lsn_request(&timeline, req, ctx) - .await - } - } - #[allow(clippy::too_many_arguments)] #[instrument(skip_all, fields(?lsn, ?prev_lsn, %full_backup))] async fn handle_basebackup_request(