From b2c240e445db5bf678495b8a915863f503f05ada Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 5 Jul 2024 13:29:44 +0000 Subject: [PATCH] page_service: pagestream sub-protocol: only hold GateGuard while handling requests --- pageserver/src/page_service.rs | 234 ++++++++++++++++++++++++--------- 1 file changed, 172 insertions(+), 62 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index a440ad6378..394d5145ba 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -28,6 +28,7 @@ use std::borrow::Cow; use std::collections::HashMap; use std::io; use std::net::TcpListener; +use std::ops::Deref; use std::pin::pin; use std::str; use std::str::FromStr; @@ -41,7 +42,6 @@ use tokio_util::io::StreamReader; use tokio_util::sync::CancellationToken; use tracing::*; use utils::id::ConnectionId; -use utils::sync::gate::GateGuard; use utils::{ auth::{Claims, Scope, SwappableJwtAuth}, id::{TenantId, TimelineId}, @@ -291,11 +291,77 @@ async fn page_service_conn_main( } } -/// While a handler holds a reference to a Timeline, it also holds a the -/// timeline's Gate open. -struct HandlerTimeline { - timeline: Arc, - _guard: GateGuard, +/// Wrapper around a [`std::sync::Weak`] to enforce separation of the +/// lifecycles of page_service connection and timeline. +/// +/// Use [`HandlerTimeline::upgrade`] to get an [`handler_timeline::Entered`] instance. +/// This will fail if the Timeline has been cancelled via cancellation +/// token or has already been deallocated. +/// On success, the returned [`handler_timeline::Entered`] holds the Timeline's gate open. +mod handler_timeline { + use tokio_util::sync::CancellationToken; + use utils::sync::gate::{GateError, GateGuard}; + + use crate::tenant::Timeline; + use std::sync::{Arc, Weak}; + + pub(super) struct HandlerTimeline { + timeline: Weak, + /// Child token of [`Timeline::cancel`]. + cancel: CancellationToken, + } + pub(super) enum Error { + Cancelled, + } + pub(super) struct Entered { + timeline: Arc, + _gate_guard: GateGuard, + } + impl std::ops::Deref for Entered { + type Target = Timeline; + fn deref(&self) -> &Self::Target { + &self.timeline + } + } + impl HandlerTimeline { + pub(super) fn from(timeline: Arc) -> Result { + Ok(HandlerTimeline { + timeline: Arc::downgrade(&timeline), + cancel: timeline.cancel.child_token(), + }) + } + pub(super) fn upgrade(&self) -> Result { + // checking our child token instead of the parent token in the timeline + // avoids cache contention on `Timeline::cancel` + if self.cancel.is_cancelled() { + return Err(Error::Cancelled); + } + let timeline = Weak::upgrade(&self.timeline).ok_or(Error::Cancelled)?; + let _gate_guard = match timeline.gate.enter() { + Ok(guard) => guard, + Err(GateError::GateClosed) => return Err(Error::Cancelled), + }; + + Ok(Entered { + timeline, + _gate_guard, + }) + } + + pub(super) async fn cancelled(&self) { + self.cancel.cancelled().await; + } + + pub(super) fn is_cancelled(&self) -> bool { + self.cancel.is_cancelled() + } + } +} +use handler_timeline::HandlerTimeline; + +enum GetCachedTimelineForPage { + Cancelled, + AskLoadTimelineForPage(Key), } struct PageServerHandler { @@ -437,7 +503,7 @@ impl PageServerHandler { cancellation_sources.extend( self.shard_timelines .values() - .map(|ht| Either::Right(ht.timeline.cancel.cancelled())), + .map(|ht| Either::Right(ht.cancelled())), ); FuturesUnordered::from_iter(cancellation_sources) .next() @@ -447,10 +513,16 @@ impl PageServerHandler { /// Checking variant of [`Self::await_connection_cancelled`]. fn is_connection_cancelled(&self) -> bool { task_mgr::is_shutdown_requested() - || self - .shard_timelines - .values() - .any(|ht| ht.timeline.cancel.is_cancelled() || ht.timeline.is_stopping()) + || self.shard_timelines.values().any(|ht| { + if ht.is_cancelled() { + return true; + } + let ht = match ht.upgrade() { + Ok(ht) => ht, + Err(handler_timeline::Error::Cancelled) => return true, + }; + ht.is_stopping() + }) } /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in @@ -967,7 +1039,7 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( - timeline, + &timeline, req.request_lsn, req.not_modified_since, &latest_gc_cutoff_lsn, @@ -1000,7 +1072,7 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( - timeline, + &timeline, req.request_lsn, req.not_modified_since, &latest_gc_cutoff_lsn, @@ -1033,7 +1105,7 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( - timeline, + &timeline, req.request_lsn, req.not_modified_since, &latest_gc_cutoff_lsn, @@ -1056,33 +1128,53 @@ impl PageServerHandler { fn get_cached_timeline_for_page( &mut self, req: &PagestreamGetPageRequest, - ) -> Result<&Arc, Key> { + ) -> Result { let key = if let Some((first_idx, first_timeline)) = self.shard_timelines.iter().next() { // Fastest path: single sharded case if first_idx.shard_count.count() == 1 { - return Ok(&first_timeline.timeline); + match first_timeline.upgrade() { + Ok(tl) => return Ok(tl), + Err(handler_timeline::Error::Cancelled) => { + let first_idx = *first_idx; + self.shard_timelines.remove(&first_idx); + return Err(GetCachedTimelineForPage::Cancelled); + } + }; } let key = rel_block_to_key(req.rel, req.blkno); - let shard_num = first_timeline - .timeline - .get_shard_identity() - .get_shard_number(&key); + + let first_timeline = match first_timeline.upgrade() { + Ok(tl) => tl, + Err(handler_timeline::Error::Cancelled) => { + let first_idx = *first_idx; + self.shard_timelines.remove(&first_idx); + return Err(GetCachedTimelineForPage::Cancelled); + } + }; + + let shard_num = first_timeline.get_shard_identity().get_shard_number(&key); // Fast path: matched the first timeline in our local handler map. This case is common if // only one shard per tenant is attached to this pageserver. - if first_timeline.timeline.get_shard_identity().number == shard_num { - return Ok(&first_timeline.timeline); + if first_timeline.get_shard_identity().number == shard_num { + return Ok(first_timeline); } let shard_index = ShardIndex { shard_number: shard_num, - shard_count: first_timeline.timeline.get_shard_identity().count, + shard_count: first_timeline.get_shard_identity().count, }; // Fast-ish path: timeline is in the connection handler's local cache if let Some(found) = self.shard_timelines.get(&shard_index) { - return Ok(&found.timeline); + match found.upgrade() { + Ok(tl) => return Ok(tl), + Err(handler_timeline::Error::Cancelled) => { + self.shard_timelines.remove(&shard_index); + return Err(GetCachedTimelineForPage::Cancelled); + } + } } key @@ -1090,7 +1182,7 @@ impl PageServerHandler { rel_block_to_key(req.rel, req.blkno) }; - Err(key) + Err(GetCachedTimelineForPage::AskLoadTimelineForPage(key)) } /// Having looked up the [`Timeline`] instance for a particular shard, cache it to enable @@ -1107,22 +1199,14 @@ impl PageServerHandler { fn cache_timeline( &mut self, timeline: Arc, - ) -> Result<&Arc, GetActiveTimelineError> { - let gate_guard = timeline - .gate - .enter() - .map_err(|_| GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled))?; - + ) -> Result { let shard_index = timeline.tenant_shard_id.to_index(); let entry = self .shard_timelines .entry(shard_index) - .or_insert(HandlerTimeline { - timeline, - _guard: gate_guard, - }); + .or_insert(HandlerTimeline::from(timeline)?); - Ok(&entry.timeline) + Ok(entry.upgrade()?) } /// If [`Self::get_cached_timeline_for_page`] missed, then this function is used to populate the cache with @@ -1133,7 +1217,7 @@ impl PageServerHandler { tenant_id: TenantId, timeline_id: TimelineId, key: Key, - ) -> anyhow::Result<&Arc, GetActiveTimelineError> { + ) -> Result { // Slow path: we must call out to the TenantManager to find the timeline for this Key let timeline = self .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Page(key)) @@ -1146,25 +1230,39 @@ impl PageServerHandler { &mut self, tenant_id: TenantId, timeline_id: TimelineId, - ) -> anyhow::Result<&Arc, GetActiveTimelineError> { - // This is a borrow-checker workaround: we can't return from inside of the `if let Some` because - // that would be an immutable-borrow-self return, whereas later in the function we will use a mutable - // ref to salf. So instead, we first build a bool, and then return while not borrowing self. - let have_cached = if let Some((idx, _tl)) = self.shard_timelines.iter().next() { - idx.shard_number == ShardNumber(0) - } else { - false + ) -> Result, GetActiveTimelineError> { + let mut cancelled = vec![]; + let timeline = { + let mut iter = self.shard_timelines.iter(); + loop { + let Some((idx, tl)) = iter.next() else { + break None; + }; + if idx.shard_number != ShardNumber(0) { + continue; + } + match tl.upgrade() { + Ok(tl) => break Some(tl), + Err(handler_timeline::Error::Cancelled) => { + cancelled.push(*idx); + continue; + } + } + } }; - - if have_cached { - let entry = self.shard_timelines.iter().next().unwrap(); - Ok(&entry.1.timeline) - } else { - let timeline = self - .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero) - .await?; - Ok(self.cache_timeline(timeline)?) + for idx in cancelled { + self.shard_timelines.remove(&idx); } + let timeline = match timeline { + Some(timeline) => timeline, + None => { + let timeline = self + .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero) + .await?; + self.cache_timeline(timeline)? + } + }; + Ok(timeline) } #[instrument(skip_all, fields(shard_id))] @@ -1177,10 +1275,13 @@ impl PageServerHandler { ) -> Result { let timeline = match self.get_cached_timeline_for_page(req) { Ok(tl) => { - set_tracing_field_shard_id(tl); + set_tracing_field_shard_id(&tl); tl } - Err(key) => { + Err(GetCachedTimelineForPage::Cancelled) => { + return Err(PageStreamError::Shutdown); + } + Err(GetCachedTimelineForPage::AskLoadTimelineForPage(key)) => { match self .load_timeline_for_page(tenant_id, timeline_id, key) .await @@ -1210,7 +1311,7 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( - timeline, + &timeline, req.request_lsn, req.not_modified_since, &latest_gc_cutoff_lsn, @@ -1243,7 +1344,7 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( - timeline, + &timeline, req.request_lsn, req.not_modified_since, &latest_gc_cutoff_lsn, @@ -1289,10 +1390,8 @@ impl PageServerHandler { let started = std::time::Instant::now(); - // check that the timeline exists - let timeline = self - .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero) - .await?; + let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?; + let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); if let Some(lsn) = lsn { // Backup was requested at a particular LSN. Wait for it to arrive. @@ -1962,6 +2061,17 @@ impl From for QueryError { } } +impl From for GetActiveTimelineError { + fn from(e: handler_timeline::Error) -> Self { + match e { + handler_timeline::Error::Cancelled => { + // XXX GetActiveTimelineError should have a Cancelled variant + GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) + } + } + } +} + fn set_tracing_field_shard_id(timeline: &Timeline) { debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); tracing::Span::current().record(