mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-16 04:30:38 +00:00
refactor: introduce TenantState::Activating to avoid holding timelines lock inside Tenant::activate
This commit is contained in:
@@ -37,6 +37,8 @@ pub enum TenantState {
|
||||
Loading,
|
||||
/// This tenant is being downloaded from cloud storage.
|
||||
Attaching,
|
||||
/// The tenant is transitioning from Loading/Attaching to Active.
|
||||
Activating,
|
||||
/// Tenant is fully operational
|
||||
Active,
|
||||
/// A tenant is recognized by pageserver, but it is being detached or the
|
||||
@@ -60,6 +62,7 @@ impl TenantState {
|
||||
// tenant mgr startup distinguishes attaching from loading via marker file.
|
||||
// If it's loading, there is no attach marker file, i.e., attach had finished in the past.
|
||||
Self::Loading => Attached,
|
||||
Self::Activating => todo!(),
|
||||
// We only reach Active after successful load / attach.
|
||||
// So, call atttachment status Attached.
|
||||
Self::Active => Attached,
|
||||
|
||||
@@ -831,7 +831,7 @@ async fn handle_tenant_break(r: Request<Body>) -> Result<Response<Body>, ApiErro
|
||||
.await
|
||||
.map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?;
|
||||
|
||||
tenant.set_broken("broken from test".to_owned());
|
||||
tenant.set_broken("broken from test".to_owned()).await;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
@@ -625,7 +625,7 @@ impl Tenant {
|
||||
match doit.await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
tenant_clone.set_broken(e.to_string());
|
||||
tenant_clone.set_broken(e.to_string()).await;
|
||||
error!("error attaching tenant: {:?}", e);
|
||||
}
|
||||
}
|
||||
@@ -894,7 +894,7 @@ impl Tenant {
|
||||
match doit.await {
|
||||
Ok(()) => {}
|
||||
Err(err) => {
|
||||
tenant_clone.set_broken(err.to_string());
|
||||
tenant_clone.set_broken(err.to_string()).await;
|
||||
error!("could not load tenant {tenant_id}: {err:?}");
|
||||
}
|
||||
}
|
||||
@@ -1609,8 +1609,13 @@ impl Tenant {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let mut result = Ok(());
|
||||
let mut activating = false;
|
||||
self.state.send_modify(|current_state| {
|
||||
match &*current_state {
|
||||
TenantState::Activating => {
|
||||
// activate() was called on an already Activating tenant. Shouldn't happen.
|
||||
result = Err(anyhow::anyhow!("Tenant is already activating"));
|
||||
}
|
||||
TenantState::Active => {
|
||||
// activate() was called on an already Active tenant. Shouldn't happen.
|
||||
result = Err(anyhow::anyhow!("Tenant is already active"));
|
||||
@@ -1627,49 +1632,75 @@ impl Tenant {
|
||||
info!("Tenant is already in Stopping state, skipping activation");
|
||||
}
|
||||
TenantState::Loading | TenantState::Attaching => {
|
||||
*current_state = TenantState::Active;
|
||||
|
||||
*current_state = TenantState::Activating;
|
||||
debug!(tenant_id = %self.tenant_id, "Activating tenant");
|
||||
|
||||
let timelines_accessor = self.timelines.lock().unwrap();
|
||||
let not_broken_timelines = timelines_accessor
|
||||
.values()
|
||||
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
|
||||
|
||||
// Spawn gc and compaction loops. The loops will shut themselves
|
||||
// down when they notice that the tenant is inactive.
|
||||
tasks::start_background_loops(self.tenant_id);
|
||||
|
||||
let mut activated_timelines = 0;
|
||||
|
||||
for timeline in not_broken_timelines {
|
||||
timeline.activate(broker_client, ctx);
|
||||
activated_timelines += 1;
|
||||
}
|
||||
|
||||
let elapsed = self.loading_started_at.elapsed();
|
||||
let total_timelines = timelines_accessor.len();
|
||||
|
||||
// log a lot of stuff, because some tenants sometimes suffer from user-visible
|
||||
// times to activate. see https://github.com/neondatabase/neon/issues/4025
|
||||
info!(
|
||||
since_creation_millis = elapsed.as_millis(),
|
||||
tenant_id = %self.tenant_id,
|
||||
activated_timelines,
|
||||
total_timelines,
|
||||
post_state = <&'static str>::from(&*current_state),
|
||||
"activation attempt finished"
|
||||
);
|
||||
activating = true;
|
||||
// Continue outside the closure. We need to grab timelines.lock()
|
||||
// and we plan to turn it into a tokio::sync::Mutex in a future patch.
|
||||
}
|
||||
}
|
||||
});
|
||||
result
|
||||
if let Err(e) = result {
|
||||
assert!(!activating, "transition into Activating is infallible");
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
if activating {
|
||||
let timelines_accessor = self.timelines.lock().unwrap();
|
||||
let not_broken_timelines = timelines_accessor
|
||||
.values()
|
||||
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
|
||||
|
||||
// Spawn gc and compaction loops. The loops will shut themselves
|
||||
// down when they notice that the tenant is inactive.
|
||||
tasks::start_background_loops(self.tenant_id);
|
||||
|
||||
let mut activated_timelines = 0;
|
||||
|
||||
for timeline in not_broken_timelines {
|
||||
timeline.activate(broker_client, ctx);
|
||||
activated_timelines += 1;
|
||||
}
|
||||
|
||||
self.state.send_modify(move |current_state| {
|
||||
assert!(
|
||||
*current_state == TenantState::Activating,
|
||||
"set_stopping and set_broken wait for us to leave Activating state",
|
||||
);
|
||||
*current_state = TenantState::Active;
|
||||
|
||||
let elapsed = self.loading_started_at.elapsed();
|
||||
let total_timelines = timelines_accessor.len();
|
||||
|
||||
// log a lot of stuff, because some tenants sometimes suffer from user-visible
|
||||
// times to activate. see https://github.com/neondatabase/neon/issues/4025
|
||||
info!(
|
||||
since_creation_millis = elapsed.as_millis(),
|
||||
tenant_id = %self.tenant_id,
|
||||
activated_timelines,
|
||||
total_timelines,
|
||||
post_state = <&'static str>::from(&*current_state),
|
||||
"activation attempt finished"
|
||||
);
|
||||
});
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Change tenant status to Stopping, to mark that it is being shut down
|
||||
pub fn set_stopping(&self) {
|
||||
pub async fn set_stopping(&self) {
|
||||
// Get the rx before checking state inside send_if_modified.
|
||||
// This way, when we later rx.changed().await, we won't have missed
|
||||
// any state changes.
|
||||
let mut rx = self.state.subscribe();
|
||||
while *rx.borrow() == TenantState::Activating {
|
||||
rx.changed()
|
||||
.await
|
||||
.expect("we're a method on Tenant, so, we're keeping self.state alive here");
|
||||
}
|
||||
self.state.send_modify(|current_state| {
|
||||
match current_state {
|
||||
TenantState::Activating => unreachable!("we checked above and never transition back into Activating state"),
|
||||
TenantState::Active | TenantState::Loading | TenantState::Attaching => {
|
||||
*current_state = TenantState::Stopping;
|
||||
|
||||
@@ -1694,12 +1725,21 @@ impl Tenant {
|
||||
info!("Tenant is already in Stopping state");
|
||||
}
|
||||
}
|
||||
});
|
||||
})
|
||||
}
|
||||
|
||||
pub fn set_broken(&self, reason: String) {
|
||||
pub async fn set_broken(&self, reason: String) {
|
||||
let mut rx = self.state.subscribe();
|
||||
while *rx.borrow() == TenantState::Activating {
|
||||
rx.changed()
|
||||
.await
|
||||
.expect("we're a method on Tenant, so, we're keeping self.state alive here");
|
||||
}
|
||||
self.state.send_modify(|current_state| {
|
||||
match *current_state {
|
||||
TenantState::Activating => {
|
||||
unreachable!("we checked above and never transition back into Activating state")
|
||||
}
|
||||
TenantState::Active => {
|
||||
// Broken tenants can currently only used for fatal errors that happen
|
||||
// while loading or attaching a tenant. A tenant that has already been
|
||||
@@ -1737,7 +1777,7 @@ impl Tenant {
|
||||
loop {
|
||||
let current_state = receiver.borrow_and_update().clone();
|
||||
match current_state {
|
||||
TenantState::Loading | TenantState::Attaching => {
|
||||
TenantState::Loading | TenantState::Attaching | TenantState::Activating => {
|
||||
// in these states, there's a chance that we can reach ::Active
|
||||
receiver.changed().await?;
|
||||
}
|
||||
|
||||
@@ -248,7 +248,7 @@ pub async fn shutdown_all_tenants() {
|
||||
for (_, tenant) in tenants_to_shut_down {
|
||||
if tenant.is_active() {
|
||||
// updates tenant state, forbidding new GC and compaction iterations from starting
|
||||
tenant.set_stopping();
|
||||
tenant.set_stopping().await;
|
||||
tenants_to_freeze_and_flush.push(tenant);
|
||||
}
|
||||
}
|
||||
@@ -575,8 +575,9 @@ where
|
||||
Some(tenant) => match tenant.current_state() {
|
||||
TenantState::Attaching
|
||||
| TenantState::Loading
|
||||
| TenantState::Activating
|
||||
| TenantState::Broken { .. }
|
||||
| TenantState::Active => tenant.set_stopping(),
|
||||
| TenantState::Active => tenant.set_stopping().await,
|
||||
TenantState::Stopping => return Err(TenantStateError::IsStopping(tenant_id)),
|
||||
},
|
||||
None => return Err(TenantStateError::NotFound(tenant_id)),
|
||||
@@ -603,7 +604,7 @@ where
|
||||
let tenants_accessor = TENANTS.read().await;
|
||||
match tenants_accessor.get(&tenant_id) {
|
||||
Some(tenant) => {
|
||||
tenant.set_broken(e.to_string());
|
||||
tenant.set_broken(e.to_string()).await;
|
||||
}
|
||||
None => {
|
||||
warn!("Tenant {tenant_id} got removed from memory");
|
||||
|
||||
Reference in New Issue
Block a user