Compare commits

...

2 Commits

Author SHA1 Message Date
Christian Schwarz
f670caa4d8 hack: allow using async inside Tenant::activate 2023-05-22 11:40:55 +02:00
Christian Schwarz
ef7d20f582 refactor: prepare to allow async code inside Tenant::state.send_modify() 2023-05-22 11:39:50 +02:00

View File

@@ -12,10 +12,12 @@
//!
use anyhow::{bail, Context};
use either::Either;
use futures::FutureExt;
use pageserver_api::models::TimelineState;
use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use std::future::Future;
use tokio::sync::watch;
use tokio::task::JoinSet;
use tracing::*;
@@ -1592,78 +1594,88 @@ impl Tenant {
debug_assert_current_span_has_tenant_id();
let mut result = Ok(());
self.state.send_modify(|current_state| {
Self::state_send_modify_async(&self.state, |current_state| {
match &*current_state {
TenantState::Active => {
// activate() was called on an already Active tenant. Shouldn't happen.
result = Err(anyhow::anyhow!("Tenant is already active"));
Either::Left(None)
}
TenantState::Broken { reason, .. } => {
// This shouldn't happen either
result = Err(anyhow::anyhow!(
"Could not activate tenant because it is in broken state due to: {reason}",
));
Either::Left(None)
}
TenantState::Stopping => {
// The tenant was detached, or system shutdown was requested, while we were
// loading or attaching the tenant.
info!("Tenant is already in Stopping state, skipping activation");
Either::Left(None)
}
TenantState::Loading | TenantState::Attaching => {
*current_state = TenantState::Active;
let mut post_state = TenantState::Active;
Either::Right(
async move {
debug!(tenant_id = %self.tenant_id, "Activating tenant");
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines =
timelines_accessor.values().filter(|timeline| {
timeline.current_state() != TimelineState::Broken
});
debug!(tenant_id = %self.tenant_id, "Activating tenant");
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
tasks::start_background_loops(self.tenant_id);
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
let mut activated_timelines = 0;
let mut timelines_broken_during_activation = 0;
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
tasks::start_background_loops(self.tenant_id);
for timeline in not_broken_timelines {
match timeline
.activate(ctx)
.context("timeline activation for activating tenant")
{
Ok(()) => {
activated_timelines += 1;
}
Err(e) => {
error!(
"Failed to activate timeline {}: {:#}",
timeline.timeline_id, e
);
timeline.set_state(TimelineState::Broken);
post_state = TenantState::broken_from_reason(format!(
"failed to activate timeline {}: {}",
timeline.timeline_id, e
));
let mut activated_timelines = 0;
let mut timelines_broken_during_activation = 0;
for timeline in not_broken_timelines {
match timeline
.activate(ctx)
.context("timeline activation for activating tenant")
{
Ok(()) => {
activated_timelines += 1;
timelines_broken_during_activation += 1;
}
}
}
Err(e) => {
error!(
"Failed to activate timeline {}: {:#}",
timeline.timeline_id, e
);
timeline.set_state(TimelineState::Broken);
*current_state = TenantState::broken_from_reason(format!(
"failed to activate timeline {}: {}",
timeline.timeline_id, e
));
timelines_broken_during_activation += 1;
}
let elapsed = self.loading_started_at.elapsed();
let total_timelines = timelines_accessor.len();
// log a lot of stuff, because some tenants sometimes suffer from user-visible
// times to activate. see https://github.com/neondatabase/neon/issues/4025
info!(
since_creation_millis = elapsed.as_millis(),
tenant_id = %self.tenant_id,
activated_timelines,
timelines_broken_during_activation,
total_timelines,
post_state = <&'static str>::from(&post_state),
"activation attempt finished"
);
Some(post_state)
}
}
let elapsed = self.loading_started_at.elapsed();
let total_timelines = timelines_accessor.len();
// log a lot of stuff, because some tenants sometimes suffer from user-visible
// times to activate. see https://github.com/neondatabase/neon/issues/4025
info!(
since_creation_millis = elapsed.as_millis(),
tenant_id = %self.tenant_id,
activated_timelines,
timelines_broken_during_activation,
total_timelines,
post_state = <&'static str>::from(&*current_state),
"activation attempt finished"
);
.in_current_span(),
)
}
}
});
@@ -1672,30 +1684,34 @@ impl Tenant {
/// Change tenant status to Stopping, to mark that it is being shut down
pub fn set_stopping(&self) {
self.state.send_modify(|current_state| {
Self::state_send_modify_async(&self.state, |current_state| {
match current_state {
TenantState::Active | TenantState::Loading | TenantState::Attaching => {
*current_state = TenantState::Stopping;
// FIXME: If the tenant is still Loading or Attaching, new timelines
// might be created after this. That's harmless, as the Timelines
// won't be accessible to anyone, when the Tenant is in Stopping
// state.
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
for timeline in not_broken_timelines {
timeline.set_state(TimelineState::Stopping);
}
Either::Right(async move {
// FIXME: If the tenant is still Loading or Attaching, new timelines
// might be created after this. That's harmless, as the Timelines
// won't be accessible to anyone, when the Tenant is in Stopping
// state.
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
for timeline in not_broken_timelines {
timeline.set_state(TimelineState::Stopping);
}
None
})
}
TenantState::Broken { reason, .. } => {
info!("Cannot set tenant to Stopping state, it is in Broken state due to: {reason}");
Either::Left(None)
}
TenantState::Stopping => {
// The tenant was detached, or system shutdown was requested, while we were
// loading or attaching the tenant.
info!("Tenant is already in Stopping state");
Either::Left(None)
}
}
});
@@ -1732,6 +1748,40 @@ impl Tenant {
});
}
fn state_send_modify_async<MakeFut, Fut, T>(
watch_sender: &tokio::sync::watch::Sender<T>,
async_clos: MakeFut,
) where
MakeFut: FnOnce(&mut T) -> Either<Option<T>, Fut> + Send,
Fut: Future<Output = Option<T>> + Send,
T: Send,
{
let rt = tokio::runtime::Handle::current();
let span = tracing::Span::current();
watch_sender.send_modify(|current_state| {
match async_clos(current_state) {
Either::Left(None) => {},
Either::Left(Some(update)) => {
*current_state = update;
}
Either::Right(fut) => {
let maybe_update = std::thread::scope(|scope| {
let jh = scope.spawn(|| {
rt.block_on(fut.instrument(span))
});
jh.join().expect(
"the thread that executes the closure panicked, likely self.state is poisoned now",
)
});
match maybe_update {
None => {},
Some(update) => {*current_state = update},
}
}
}
});
}
pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TenantState> {
self.state.subscribe()
}