From 887e94d7dadad312f31f8f9d11ef56d4872e2489 Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 16 Jan 2024 09:39:19 +0000 Subject: [PATCH] page_service: more efficient page_service -> shard lookup (#6037) ## Problem In #5980 the page service connection handler gets a simple piece of logic for finding the right Timeline: at connection time, it picks an arbitrary Timeline, and then when handling individual page requests it checks if the original timeline is the correct shard, and if not looks one up. This is pretty slow in the case where we have to go look up the other timeline, because we take the big tenants manager lock. ## Summary of changes - Add a `shard_timelines` map of ShardIndex to Timeline on the page service connection handler - When looking up a Timeline for a particular ShardIndex, consult `shard_timelines` to avoid hitting the TenantsManager unless we really need to. - Re-work the CancellationToken handling, because the handler now holds gateguards on multiple timelines, and so must respect cancellation of _any_ timeline it has in its cache, not just the timeline related to the request it is currently servicing. --------- Co-authored-by: Vlad Lazar --- libs/pageserver_api/src/shard.rs | 6 + pageserver/src/page_service.rs | 321 ++++++++++++++++++++++--------- 2 files changed, 237 insertions(+), 90 deletions(-) diff --git a/libs/pageserver_api/src/shard.rs b/libs/pageserver_api/src/shard.rs index e259742c68..ec4c6cd2fc 100644 --- a/libs/pageserver_api/src/shard.rs +++ b/libs/pageserver_api/src/shard.rs @@ -88,6 +88,12 @@ impl TenantShardId { pub fn is_unsharded(&self) -> bool { self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0) } + pub fn to_index(&self) -> ShardIndex { + ShardIndex { + shard_number: self.shard_number, + shard_count: self.shard_count, + } + } } /// Formatting helper diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 9b4b333a92..a296dde07d 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -13,7 +13,10 @@ use anyhow::Context; use async_compression::tokio::write::GzipEncoder; use bytes::Buf; use bytes::Bytes; +use futures::stream::FuturesUnordered; use futures::Stream; +use futures::StreamExt; +use pageserver_api::key::Key; use pageserver_api::models::TenantState; use pageserver_api::models::{ PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, @@ -21,11 +24,14 @@ use pageserver_api::models::{ PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse, PagestreamNblocksRequest, PagestreamNblocksResponse, }; +use pageserver_api::shard::ShardIndex; +use pageserver_api::shard::{ShardCount, ShardNumber}; use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, QueryError}; use pq_proto::framed::ConnectionError; use pq_proto::FeStartupPacket; use pq_proto::{BeMessage, FeMessage, RowDescriptor}; use std::borrow::Cow; +use std::collections::HashMap; use std::io; use std::net::TcpListener; use std::pin::pin; @@ -40,6 +46,7 @@ use tokio_util::sync::CancellationToken; use tracing::field; use tracing::*; use utils::id::ConnectionId; +use utils::sync::gate::GateGuard; use utils::{ auth::{Claims, Scope, SwappableJwtAuth}, id::{TenantId, TimelineId}, @@ -274,6 +281,13 @@ async fn page_service_conn_main( } } +/// While a handler holds a reference to a Timeline, it also holds a the +/// timeline's Gate open. +struct HandlerTimeline { + timeline: Arc, + _guard: GateGuard, +} + struct PageServerHandler { _conf: &'static PageServerConf, broker_client: storage_broker::BrokerClientChannel, @@ -285,6 +299,14 @@ struct PageServerHandler { /// For each query received over the connection, /// `process_query` creates a child context from this one. connection_ctx: RequestContext, + + /// See [`Self::cache_timeline`] for usage. + /// + /// Note on size: the typical size of this map is 1. The largest size we expect + /// to see is the number of shards divided by the number of pageservers (typically < 2), + /// or the ratio used when splitting shards (i.e. how many children created from one) + /// parent shard, where a "large" number might be ~8. + shard_timelines: HashMap, } #[derive(thiserror::Error, Debug)] @@ -358,13 +380,46 @@ impl PageServerHandler { auth, claims: None, connection_ctx, + shard_timelines: HashMap::new(), } } - /// Wrap PostgresBackend::flush to respect our CancellationToken: it is important to use - /// this rather than naked flush() in order to shut down promptly. Without this, we would - /// block shutdown of a tenant if a postgres client was failing to consume bytes we send - /// in the flush. + /// Analogous to calling cancelled() on a Timeline's cancellation token: waits for cancellation. + /// + /// We use many Timeline objects, and hold GateGuards on all of them. We must therefore respect + /// all of their cancellation tokens. + async fn timeline_cancelled(&self) { + // A short wait before we expend the cycles to walk our timeline map. This avoids incurring + // that cost every time we check for cancellation. + tokio::time::sleep(Duration::from_millis(10)).await; + + // This function is never called concurrently with code that adds timelines to shard_timelines, + // which is enforced by the borrow checker (the future returned by this function carries the + // immutable &self). So it's fine to evaluate shard_timelines after the sleep, we don't risk + // missing any inserts to the map. + + let mut futs = self + .shard_timelines + .values() + .map(|ht| ht.timeline.cancel.cancelled()) + .collect::>(); + + futs.next().await; + } + + /// Analogous to calling is_cancelled() on a Timeline's cancellation token + fn timeline_is_cancelled(&self) -> bool { + self.shard_timelines + .values() + .any(|ht| ht.timeline.cancel.is_cancelled() || ht.timeline.is_stopping()) + } + + /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in + /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect + /// cancellation if there aren't any timelines in the cache. + /// + /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the + /// timeline cancellation token. async fn flush_cancellable( &self, pgb: &mut PostgresBackend, @@ -377,6 +432,9 @@ impl PageServerHandler { flush_r = pgb.flush() => { Ok(flush_r?) }, + _ = self.timeline_cancelled() => { + Err(QueryError::Shutdown) + } _ = cancel.cancelled() => { Err(QueryError::Shutdown) } @@ -452,7 +510,7 @@ impl PageServerHandler { #[instrument(skip_all)] async fn handle_pagerequests( - &self, + &mut self, pgb: &mut PostgresBackend, tenant_id: TenantId, timeline_id: TimelineId, @@ -463,10 +521,6 @@ impl PageServerHandler { { debug_assert_current_span_has_tenant_and_timeline_id(); - // Note that since one connection may contain getpage requests that target different - // shards (e.g. during splitting when the compute is not yet aware of the split), the tenant - // that we look up here may not be the one that serves all the actual requests: we will double - // check the mapping of key->shard later before calling into Timeline for getpage requests. let tenant = mgr::get_active_tenant_with_timeout( tenant_id, ShardSelector::First, @@ -487,19 +541,9 @@ impl PageServerHandler { None }; - // Check that the timeline exists - let timeline = tenant - .get_timeline(timeline_id, true) - .map_err(|e| QueryError::NotFound(format!("{e}").into()))?; - - // Avoid starting new requests if the timeline has already started shutting down, - // and block timeline shutdown until this request is complete, or drops out due - // to cancellation. - let _timeline_guard = timeline.gate.enter().map_err(|_| QueryError::Shutdown)?; - // switch client to COPYBOTH pgb.write_message_noflush(&BeMessage::CopyBothResponse)?; - self.flush_cancellable(pgb, &timeline.cancel).await?; + self.flush_cancellable(pgb, &tenant.cancel).await?; let metrics = metrics::SmgrQueryTimePerTimeline::new(&tenant_id, &timeline_id); @@ -507,7 +551,7 @@ impl PageServerHandler { let msg = tokio::select! { biased; - _ = timeline.cancel.cancelled() => { + _ = self.timeline_cancelled() => { // We were requested to shut down. info!("shutdown request received in page handler"); return Err(QueryError::Shutdown) @@ -544,7 +588,7 @@ impl PageServerHandler { let _timer = metrics.start_timer(metrics::SmgrQueryType::GetRelExists); let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.lsn); ( - self.handle_get_rel_exists_request(&timeline, &req, &ctx) + self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx) .instrument(span.clone()) .await, span, @@ -554,7 +598,7 @@ impl PageServerHandler { let _timer = metrics.start_timer(metrics::SmgrQueryType::GetRelSize); let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.lsn); ( - self.handle_get_nblocks_request(&timeline, &req, &ctx) + self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx) .instrument(span.clone()) .await, span, @@ -564,7 +608,7 @@ impl PageServerHandler { let _timer = metrics.start_timer(metrics::SmgrQueryType::GetPageAtLsn); let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn); ( - self.handle_get_page_at_lsn_request(&timeline, &req, &ctx) + self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx) .instrument(span.clone()) .await, span, @@ -574,7 +618,7 @@ impl PageServerHandler { let _timer = metrics.start_timer(metrics::SmgrQueryType::GetDbSize); let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.lsn); ( - self.handle_db_size_request(&timeline, &req, &ctx) + self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx) .instrument(span.clone()) .await, span, @@ -594,7 +638,7 @@ impl PageServerHandler { span.in_scope(|| info!("handler requested reconnect: {reason}")); return Err(QueryError::Reconnect); } - Err(e) if timeline.cancel.is_cancelled() || timeline.is_stopping() => { + Err(e) if self.timeline_is_cancelled() => { // This branch accomodates code within request handlers that returns an anyhow::Error instead of a clean // shutdown error, this may be buried inside a PageReconstructError::Other for example. // @@ -617,7 +661,7 @@ impl PageServerHandler { }); pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?; - self.flush_cancellable(pgb, &timeline.cancel).await?; + self.flush_cancellable(pgb, &tenant.cancel).await?; } } } @@ -814,11 +858,14 @@ impl PageServerHandler { } async fn handle_get_rel_exists_request( - &self, - timeline: &Timeline, + &mut self, + tenant_id: TenantId, + timeline_id: TimelineId, req: &PagestreamExistsRequest, ctx: &RequestContext, ) -> Result { + let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?; + let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx) @@ -834,11 +881,13 @@ impl PageServerHandler { } async fn handle_get_nblocks_request( - &self, - timeline: &Timeline, + &mut self, + tenant_id: TenantId, + timeline_id: TimelineId, req: &PagestreamNblocksRequest, ctx: &RequestContext, ) -> Result { + let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?; let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx) @@ -854,11 +903,13 @@ impl PageServerHandler { } async fn handle_db_size_request( - &self, - timeline: &Timeline, + &mut self, + tenant_id: TenantId, + timeline_id: TimelineId, req: &PagestreamDbSizeRequest, ctx: &RequestContext, ) -> Result { + let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?; let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx) @@ -880,16 +931,160 @@ impl PageServerHandler { })) } - async fn do_handle_get_page_at_lsn_request( - &self, - timeline: &Timeline, + /// For most getpage requests, we will already have a Timeline to serve the request: this function + /// looks up such a Timeline synchronously and without touching any global state. + fn get_cached_timeline_for_page( + &mut self, + req: &PagestreamGetPageRequest, + ) -> Result<&Arc, Key> { + let key = if let Some((first_idx, first_timeline)) = self.shard_timelines.iter().next() { + // Fastest path: single sharded case + if first_idx.shard_count < ShardCount(2) { + return Ok(&first_timeline.timeline); + } + + let key = rel_block_to_key(req.rel, req.blkno); + let shard_num = first_timeline + .timeline + .get_shard_identity() + .get_shard_number(&key); + + // Fast path: matched the first timeline in our local handler map. This case is common if + // only one shard per tenant is attached to this pageserver. + if first_timeline.timeline.get_shard_identity().number == shard_num { + return Ok(&first_timeline.timeline); + } + + let shard_index = ShardIndex { + shard_number: shard_num, + shard_count: first_timeline.timeline.get_shard_identity().count, + }; + + // Fast-ish path: timeline is in the connection handler's local cache + if let Some(found) = self.shard_timelines.get(&shard_index) { + return Ok(&found.timeline); + } + + key + } else { + rel_block_to_key(req.rel, req.blkno) + }; + + Err(key) + } + + /// Having looked up the [`Timeline`] instance for a particular shard, cache it to enable + /// use in future requests without having to traverse [`crate::tenant::mgr::TenantManager`] + /// again. + /// + /// Note that all the Timelines in this cache are for the same timeline_id: they're differ + /// in which shard they belong to. When we serve a getpage@lsn request, we choose a shard + /// based on key. + /// + /// The typical size of this cache is 1, as we generally create shards to distribute work + /// across pageservers, so don't tend to have multiple shards for the same tenant on the + /// same pageserver. + fn cache_timeline( + &mut self, + timeline: Arc, + ) -> Result<&Arc, GetActiveTimelineError> { + let gate_guard = timeline + .gate + .enter() + .map_err(|_| GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled))?; + + let shard_index = timeline.tenant_shard_id.to_index(); + let entry = self + .shard_timelines + .entry(shard_index) + .or_insert(HandlerTimeline { + timeline, + _guard: gate_guard, + }); + + Ok(&entry.timeline) + } + + /// If [`Self::get_cached_timeline_for_page`] missed, then this function is used to populate the cache with + /// a Timeline to serve requests for this key, if such a Timeline is present on this pageserver. If no such + /// Timeline is found, then we will return an error (this indicates that the client is talking to the wrong node). + async fn load_timeline_for_page( + &mut self, + tenant_id: TenantId, + timeline_id: TimelineId, + key: Key, + ) -> anyhow::Result<&Arc, GetActiveTimelineError> { + // Slow path: we must call out to the TenantManager to find the timeline for this Key + let timeline = self + .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Page(key)) + .await?; + + self.cache_timeline(timeline) + } + + async fn get_timeline_shard_zero( + &mut self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> anyhow::Result<&Arc, GetActiveTimelineError> { + // This is a borrow-checker workaround: we can't return from inside of the `if let Some` because + // that would be an immutable-borrow-self return, whereas later in the function we will use a mutable + // ref to salf. So instead, we first build a bool, and then return while not borrowing self. + let have_cached = if let Some((idx, _tl)) = self.shard_timelines.iter().next() { + idx.shard_number == ShardNumber(0) + } else { + false + }; + + if have_cached { + let entry = self.shard_timelines.iter().next().unwrap(); + Ok(&entry.1.timeline) + } else { + let timeline = self + .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero) + .await?; + Ok(self.cache_timeline(timeline)?) + } + } + + async fn handle_get_page_at_lsn_request( + &mut self, + tenant_id: TenantId, + timeline_id: TimelineId, req: &PagestreamGetPageRequest, ctx: &RequestContext, ) -> Result { + let timeline = match self.get_cached_timeline_for_page(req) { + Ok(tl) => tl, + Err(key) => { + match self + .load_timeline_for_page(tenant_id, timeline_id, key) + .await + { + Ok(t) => t, + Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => { + // We already know this tenant exists in general, because we resolved it at + // start of connection. Getting a NotFound here indicates that the shard containing + // the requested page is not present on this node: the client's knowledge of shard->pageserver + // mapping is out of date. + // + // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via + // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration + // and talk to a different pageserver. + return Err(PageStreamError::Reconnect( + "getpage@lsn request routed to wrong shard".into(), + )); + } + Err(e) => return Err(e.into()), + } + } + }; + let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx) .await?; + let page = timeline .get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx) .await?; @@ -899,60 +1094,6 @@ impl PageServerHandler { })) } - async fn handle_get_page_at_lsn_request( - &self, - timeline: &Timeline, - req: &PagestreamGetPageRequest, - ctx: &RequestContext, - ) -> Result { - let key = rel_block_to_key(req.rel, req.blkno); - if timeline.get_shard_identity().is_key_local(&key) { - self.do_handle_get_page_at_lsn_request(timeline, req, ctx) - .await - } else { - // The Tenant shard we looked up at connection start does not hold this particular - // key: look for other shards in this tenant. This scenario occurs if a pageserver - // has multiple shards for the same tenant. - // - // TODO: optimize this (https://github.com/neondatabase/neon/pull/6037) - let timeline = match self - .get_active_tenant_timeline( - timeline.tenant_shard_id.tenant_id, - timeline.timeline_id, - ShardSelector::Page(key), - ) - .await - { - Ok(t) => t, - Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => { - // We already know this tenant exists in general, because we resolved it at - // start of connection. Getting a NotFound here indicates that the shard containing - // the requested page is not present on this node: the client's knowledge of shard->pageserver - // mapping is out of date. - tracing::info!("Page request routed to wrong shard: my identity {:?}, should go to shard {}, key {}", - timeline.get_shard_identity(), timeline.get_shard_identity().get_shard_number(&key).0, key); - // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via - // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration - // and talk to a different pageserver. - return Err(PageStreamError::Reconnect( - "getpage@lsn request routed to wrong shard".into(), - )); - } - Err(e) => return Err(e.into()), - }; - - // Take a GateGuard for the duration of this request. If we were using our main Timeline object, - // the GateGuard was already held over the whole connection. - let _timeline_guard = timeline - .gate - .enter() - .map_err(|_| PageStreamError::Shutdown)?; - - self.do_handle_get_page_at_lsn_request(&timeline, req, ctx) - .await - } - } - #[allow(clippy::too_many_arguments)] #[instrument(skip_all, fields(?lsn, ?prev_lsn, %full_backup))] async fn handle_basebackup_request(