hack: allow using async inside Tenant::activate

This commit is contained in:
Christian Schwarz
2023-05-10 19:24:11 +02:00
parent ef7d20f582
commit f670caa4d8

View File

@@ -12,10 +12,12 @@
//!
use anyhow::{bail, Context};
use either::Either;
use futures::FutureExt;
use pageserver_api::models::TimelineState;
use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use std::future::Future;
use tokio::sync::watch;
use tokio::task::JoinSet;
use tracing::*;
@@ -1592,32 +1594,32 @@ impl Tenant {
debug_assert_current_span_has_tenant_id();
let mut result = Ok(());
self.state.send_modify(|current_state| {
Self::state_send_modify_async(&self.state, |current_state| {
match &*current_state {
TenantState::Active => {
// activate() was called on an already Active tenant. Shouldn't happen.
result = Err(anyhow::anyhow!("Tenant is already active"));
Either::Left(None)
}
TenantState::Broken { reason, .. } => {
// This shouldn't happen either
result = Err(anyhow::anyhow!(
"Could not activate tenant because it is in broken state due to: {reason}",
));
Either::Left(None)
}
TenantState::Stopping => {
// The tenant was detached, or system shutdown was requested, while we were
// loading or attaching the tenant.
info!("Tenant is already in Stopping state, skipping activation");
Either::Left(None)
}
TenantState::Loading | TenantState::Attaching => {
*current_state = TenantState::Active;
debug!(tenant_id = %self.tenant_id, "Activating tenant");
let span = tracing::Span::current();
std::thread::scope(move |scope| {
scope.spawn(move || {
let _entered = span.enter();
let mut post_state = TenantState::Active;
Either::Right(
async move {
debug!(tenant_id = %self.tenant_id, "Activating tenant");
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines =
timelines_accessor.values().filter(|timeline| {
@@ -1645,7 +1647,7 @@ impl Tenant {
timeline.timeline_id, e
);
timeline.set_state(TimelineState::Broken);
*current_state = TenantState::broken_from_reason(format!(
post_state = TenantState::broken_from_reason(format!(
"failed to activate timeline {}: {}",
timeline.timeline_id, e
));
@@ -1666,11 +1668,14 @@ impl Tenant {
activated_timelines,
timelines_broken_during_activation,
total_timelines,
post_state = <&'static str>::from(&*current_state),
post_state = <&'static str>::from(&post_state),
"activation attempt finished"
);
});
});
Some(post_state)
}
.in_current_span(),
)
}
}
});
@@ -1679,30 +1684,34 @@ impl Tenant {
/// Change tenant status to Stopping, to mark that it is being shut down
pub fn set_stopping(&self) {
self.state.send_modify(|current_state| {
Self::state_send_modify_async(&self.state, |current_state| {
match current_state {
TenantState::Active | TenantState::Loading | TenantState::Attaching => {
*current_state = TenantState::Stopping;
// FIXME: If the tenant is still Loading or Attaching, new timelines
// might be created after this. That's harmless, as the Timelines
// won't be accessible to anyone, when the Tenant is in Stopping
// state.
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
for timeline in not_broken_timelines {
timeline.set_state(TimelineState::Stopping);
}
Either::Right(async move {
// FIXME: If the tenant is still Loading or Attaching, new timelines
// might be created after this. That's harmless, as the Timelines
// won't be accessible to anyone, when the Tenant is in Stopping
// state.
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
for timeline in not_broken_timelines {
timeline.set_state(TimelineState::Stopping);
}
None
})
}
TenantState::Broken { reason, .. } => {
info!("Cannot set tenant to Stopping state, it is in Broken state due to: {reason}");
Either::Left(None)
}
TenantState::Stopping => {
// The tenant was detached, or system shutdown was requested, while we were
// loading or attaching the tenant.
info!("Tenant is already in Stopping state");
Either::Left(None)
}
}
});
@@ -1739,6 +1748,40 @@ impl Tenant {
});
}
fn state_send_modify_async<MakeFut, Fut, T>(
watch_sender: &tokio::sync::watch::Sender<T>,
async_clos: MakeFut,
) where
MakeFut: FnOnce(&mut T) -> Either<Option<T>, Fut> + Send,
Fut: Future<Output = Option<T>> + Send,
T: Send,
{
let rt = tokio::runtime::Handle::current();
let span = tracing::Span::current();
watch_sender.send_modify(|current_state| {
match async_clos(current_state) {
Either::Left(None) => {},
Either::Left(Some(update)) => {
*current_state = update;
}
Either::Right(fut) => {
let maybe_update = std::thread::scope(|scope| {
let jh = scope.spawn(|| {
rt.block_on(fut.instrument(span))
});
jh.join().expect(
"the thread that executes the closure panicked, likely self.state is poisoned now",
)
});
match maybe_update {
None => {},
Some(update) => {*current_state = update},
}
}
}
});
}
pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TenantState> {
self.state.subscribe()
}