From f670caa4d83ce72f185cb505765d89fb0fdd5281 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 10 May 2023 19:24:11 +0200 Subject: [PATCH] hack: allow using async inside Tenant::activate --- pageserver/src/tenant.rs | 93 +++++++++++++++++++++++++++++----------- 1 file changed, 68 insertions(+), 25 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 98ed363386..2eb61b185c 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -12,10 +12,12 @@ //! use anyhow::{bail, Context}; +use either::Either; use futures::FutureExt; use pageserver_api::models::TimelineState; use remote_storage::DownloadError; use remote_storage::GenericRemoteStorage; +use std::future::Future; use tokio::sync::watch; use tokio::task::JoinSet; use tracing::*; @@ -1592,32 +1594,32 @@ impl Tenant { debug_assert_current_span_has_tenant_id(); let mut result = Ok(()); - self.state.send_modify(|current_state| { + Self::state_send_modify_async(&self.state, |current_state| { match &*current_state { TenantState::Active => { // activate() was called on an already Active tenant. Shouldn't happen. result = Err(anyhow::anyhow!("Tenant is already active")); + Either::Left(None) } TenantState::Broken { reason, .. } => { // This shouldn't happen either result = Err(anyhow::anyhow!( "Could not activate tenant because it is in broken state due to: {reason}", )); + Either::Left(None) } TenantState::Stopping => { // The tenant was detached, or system shutdown was requested, while we were // loading or attaching the tenant. info!("Tenant is already in Stopping state, skipping activation"); + Either::Left(None) } TenantState::Loading | TenantState::Attaching => { *current_state = TenantState::Active; - - debug!(tenant_id = %self.tenant_id, "Activating tenant"); - - let span = tracing::Span::current(); - std::thread::scope(move |scope| { - scope.spawn(move || { - let _entered = span.enter(); + let mut post_state = TenantState::Active; + Either::Right( + async move { + debug!(tenant_id = %self.tenant_id, "Activating tenant"); let timelines_accessor = self.timelines.lock().unwrap(); let not_broken_timelines = timelines_accessor.values().filter(|timeline| { @@ -1645,7 +1647,7 @@ impl Tenant { timeline.timeline_id, e ); timeline.set_state(TimelineState::Broken); - *current_state = TenantState::broken_from_reason(format!( + post_state = TenantState::broken_from_reason(format!( "failed to activate timeline {}: {}", timeline.timeline_id, e )); @@ -1666,11 +1668,14 @@ impl Tenant { activated_timelines, timelines_broken_during_activation, total_timelines, - post_state = <&'static str>::from(&*current_state), + post_state = <&'static str>::from(&post_state), "activation attempt finished" ); - }); - }); + + Some(post_state) + } + .in_current_span(), + ) } } }); @@ -1679,30 +1684,34 @@ impl Tenant { /// Change tenant status to Stopping, to mark that it is being shut down pub fn set_stopping(&self) { - self.state.send_modify(|current_state| { + Self::state_send_modify_async(&self.state, |current_state| { match current_state { TenantState::Active | TenantState::Loading | TenantState::Attaching => { *current_state = TenantState::Stopping; - - // FIXME: If the tenant is still Loading or Attaching, new timelines - // might be created after this. That's harmless, as the Timelines - // won't be accessible to anyone, when the Tenant is in Stopping - // state. - let timelines_accessor = self.timelines.lock().unwrap(); - let not_broken_timelines = timelines_accessor - .values() - .filter(|timeline| timeline.current_state() != TimelineState::Broken); - for timeline in not_broken_timelines { - timeline.set_state(TimelineState::Stopping); - } + Either::Right(async move { + // FIXME: If the tenant is still Loading or Attaching, new timelines + // might be created after this. That's harmless, as the Timelines + // won't be accessible to anyone, when the Tenant is in Stopping + // state. + let timelines_accessor = self.timelines.lock().unwrap(); + let not_broken_timelines = timelines_accessor + .values() + .filter(|timeline| timeline.current_state() != TimelineState::Broken); + for timeline in not_broken_timelines { + timeline.set_state(TimelineState::Stopping); + } + None + }) } TenantState::Broken { reason, .. } => { info!("Cannot set tenant to Stopping state, it is in Broken state due to: {reason}"); + Either::Left(None) } TenantState::Stopping => { // The tenant was detached, or system shutdown was requested, while we were // loading or attaching the tenant. info!("Tenant is already in Stopping state"); + Either::Left(None) } } }); @@ -1739,6 +1748,40 @@ impl Tenant { }); } + fn state_send_modify_async( + watch_sender: &tokio::sync::watch::Sender, + async_clos: MakeFut, + ) where + MakeFut: FnOnce(&mut T) -> Either, Fut> + Send, + Fut: Future> + Send, + T: Send, + { + let rt = tokio::runtime::Handle::current(); + let span = tracing::Span::current(); + watch_sender.send_modify(|current_state| { + match async_clos(current_state) { + Either::Left(None) => {}, + Either::Left(Some(update)) => { + *current_state = update; + } + Either::Right(fut) => { + let maybe_update = std::thread::scope(|scope| { + let jh = scope.spawn(|| { + rt.block_on(fut.instrument(span)) + }); + jh.join().expect( + "the thread that executes the closure panicked, likely self.state is poisoned now", + ) + }); + match maybe_update { + None => {}, + Some(update) => {*current_state = update}, + } + } + } + }); + } + pub fn subscribe_for_state_updates(&self) -> watch::Receiver { self.state.subscribe() }