diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 07d1618272..3010cb6f57 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -39,6 +39,8 @@ use utils::generation::Generation; use utils::id::{TenantId, TimelineId}; use super::delete::DeleteTenantError; +use super::secondary::SecondaryTenant; +use super::storage_layer::Layer; use super::timeline::delete::DeleteTimelineFlow; use super::TenantSharedResources; @@ -53,7 +55,7 @@ use super::TenantSharedResources; /// having a properly acquired generation (Secondary doesn't need a generation) pub(crate) enum TenantSlot { Attached(Arc), - Secondary, + Secondary(Arc), /// In this state, other administrative operations acting on the TenantId should /// block, or return a retry indicator equivalent to HTTP 503. InProgress(utils::completion::Barrier), @@ -63,7 +65,7 @@ impl std::fmt::Debug for TenantSlot { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Attached(tenant) => write!(f, "Attached({})", tenant.current_state()), - Self::Secondary => write!(f, "Secondary"), + Self::Secondary(_) => write!(f, "Secondary"), Self::InProgress(_) => write!(f, "InProgress"), } } @@ -74,7 +76,7 @@ impl TenantSlot { fn get_attached(&self) -> Option<&Arc> { match self { Self::Attached(t) => Some(t), - Self::Secondary => None, + Self::Secondary(_) => None, Self::InProgress(_) => None, } } @@ -427,7 +429,10 @@ pub async fn init_tenant_mgr( // tenants, because they do no remote writes and hence require no // generation number info!(%tenant_id, "Loaded tenant in secondary mode"); - tenants.insert(tenant_id, TenantSlot::Secondary); + tenants.insert( + tenant_id, + TenantSlot::Secondary(SecondaryTenant::new(tenant_id)), + ); } LocationMode::Attached(_) => { // TODO: augment re-attach API to enable the control plane to @@ -586,8 +591,14 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock) { tenants_to_shut_down.push(t.clone()); shutdown_state.insert(k, TenantSlot::Attached(t)); } - TenantSlot::Secondary => { - shutdown_state.insert(k, TenantSlot::Secondary); + TenantSlot::Secondary(state) => { + // We don't need to wait for this individually per-tenant: the + // downloader task will be waited on eventually, this cancel + // is just to encourage it to drop out if it is doing work + // for this tenant right now. + state.cancel.cancel(); + + shutdown_state.insert(k, TenantSlot::Secondary(state)); } TenantSlot::InProgress(notify) => { // InProgress tenants are not visible in TenantsMap::ShuttingDown: we will @@ -764,6 +775,12 @@ pub(crate) async fn set_new_tenant_config( } impl TenantManager { + /// As a convenience to avoid carrying a configuration reference separately, anyone who + /// as a reference to the TenantManager can access configuration this way. + pub(crate) fn get_conf(&self) -> &'static PageServerConf { + self.conf + } + #[instrument(skip_all, fields(%tenant_id))] pub(crate) async fn upsert_location( &self, @@ -811,43 +828,57 @@ impl TenantManager { // not do significant I/O, and shutdowns should be prompt via cancellation tokens. let mut slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::Any)?; - if let Some(TenantSlot::Attached(tenant)) = slot_guard.get_old_value() { - // The case where we keep a Tenant alive was covered above in the special case - // for Attached->Attached transitions in the same generation. By this point, - // if we see an attached tenant we know it will be discarded and should be - // shut down. - let (_guard, progress) = utils::completion::channel(); + match slot_guard.get_old_value() { + Some(TenantSlot::Attached(tenant)) => { + // The case where we keep a Tenant alive was covered above in the special case + // for Attached->Attached transitions in the same generation. By this point, + // if we see an attached tenant we know it will be discarded and should be + // shut down. + let (_guard, progress) = utils::completion::channel(); - match tenant.get_attach_mode() { - AttachmentMode::Single | AttachmentMode::Multi => { - // Before we leave our state as the presumed holder of the latest generation, - // flush any outstanding deletions to reduce the risk of leaking objects. - self.resources.deletion_queue_client.flush_advisory() - } - AttachmentMode::Stale => { - // If we're stale there's not point trying to flush deletions - } - }; + match tenant.get_attach_mode() { + AttachmentMode::Single | AttachmentMode::Multi => { + // Before we leave our state as the presumed holder of the latest generation, + // flush any outstanding deletions to reduce the risk of leaking objects. + self.resources.deletion_queue_client.flush_advisory() + } + AttachmentMode::Stale => { + // If we're stale there's not point trying to flush deletions + } + }; - info!("Shutting down attached tenant"); - match tenant.shutdown(progress, false).await { - Ok(()) => {} - Err(barrier) => { - info!("Shutdown already in progress, waiting for it to complete"); - barrier.wait().await; + info!("Shutting down attached tenant"); + match tenant.shutdown(progress, false).await { + Ok(()) => {} + Err(barrier) => { + info!("Shutdown already in progress, waiting for it to complete"); + barrier.wait().await; + } } + slot_guard.drop_old_value().expect("We just shut it down"); + } + Some(TenantSlot::Secondary(state)) => { + info!("Shutting down secondary tenant"); + state.shutdown().await; + } + Some(TenantSlot::InProgress(_)) => { + // This should never happen: acquire_slot should error out + // if the contents of a slot were InProgress. + anyhow::bail!("Acquired an InProgress slot, this is a bug.") + } + None => { + // Slot was vacant, nothing needs shutting down. } - slot_guard.drop_old_value().expect("We just shut it down"); } let tenant_path = self.conf.tenant_path(&tenant_id); + let timelines_path = self.conf.timelines_path(&tenant_id); let new_slot = match &new_location_config.mode { LocationMode::Secondary(_) => { - let tenant_path = self.conf.tenant_path(&tenant_id); // Directory doesn't need to be fsync'd because if we crash it can // safely be recreated next time this tenant location is configured. - unsafe_create_dir_all(&tenant_path) + unsafe_create_dir_all(&timelines_path) .await .with_context(|| format!("Creating {tenant_path}"))?; @@ -855,11 +886,9 @@ impl TenantManager { .await .map_err(SetNewTenantConfigError::Persist)?; - TenantSlot::Secondary + TenantSlot::Secondary(SecondaryTenant::new(tenant_id)) } LocationMode::Attached(_attach_config) => { - let timelines_path = self.conf.timelines_path(&tenant_id); - // Directory doesn't need to be fsync'd because we do not depend on // it to exist after crashes: it may be recreated when tenant is // re-attached, see https://github.com/neondatabase/neon/issues/5550 @@ -891,6 +920,97 @@ impl TenantManager { Ok(()) } + + // Do some synchronous work for all tenant slots in Secondary state. The provided + // callback should be small and fast, as it will be called inside the global + // TenantsMap lock. + pub(crate) fn foreach_secondary_tenants(&self, mut func: F) + where + // TODO: let the callback return a hint to drop out of the loop early + F: FnMut(&TenantId, &Arc), + { + let locked = self.tenants.read().unwrap(); + + let map = match &*locked { + TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => return, + TenantsMap::Open(m) => m, + }; + + for (tenant_id, slot) in map { + if let TenantSlot::Secondary(state) = slot { + // Only expose secondary tenants that are not currently shutting down + if !state.cancel.is_cancelled() { + func(tenant_id, state) + } + } + } + } + + pub(crate) fn get_attached_tenants(&self) -> Vec> { + let locked = self.tenants.read().unwrap(); + + let map = match &*locked { + TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => return Vec::new(), + TenantsMap::Open(m) => m, + }; + + map.iter() + .filter_map(|(_k, v)| match v { + TenantSlot::Attached(t) => Some(t.clone()), + _ => None, + }) + .collect() + } + + /// Having planned some evictions for a tenant, attempt to execute them. + /// + /// Execution will not occur if the TenantSlot for this tenant is not in + /// a state suitable to execute. + // TODO: is Layer really needed here? Maybe we should have reduced to a LayerFileName by this point. + pub(crate) async fn evict_tenant_layers( + &self, + tenant_id: &TenantId, + timeline_layers: Vec<(TimelineId, Layer)>, + ) { + // TODO: unify with how we evict for attached tenants. They should also + // pass through here, to avoid attached tenant evictions racing with + // the lifetime of secondary locations for the same tenant ID. + + let state = { + let locked = self.tenants.read().unwrap(); + let map = match &*locked { + TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => return, + TenantsMap::Open(m) => m, + }; + + match map.get(tenant_id) { + Some(TenantSlot::Secondary(secondary_state)) => { + // Found a secondary as expected + secondary_state.clone() + } + _ => { + // A location configuration change raced with this eviction + tracing::info!( + "Dropping {} layer evictions, tenant not in suitable state", + timeline_layers.len() + ); + return; + } + } + }; + + // Concurrency: downloads might have been going on while we deleted layers. However, + // we are only deleting layers that the SecondaryTenant already thought were on disk, + // so we won't be deleting anything that it is _currently_ downloading. All deletions + // of SecondaryTenant layers flow through this function, so there is no risk that the + // layer we're evicting is no longer present in-memory. + state + .evict_layers(self.conf, tenant_id, timeline_layers) + .instrument(tracing::info_span!("evict_layers", + %tenant_id + )) + .await; + } } #[derive(Debug, thiserror::Error)] @@ -937,7 +1057,7 @@ pub(crate) fn get_tenant( } }, Some(TenantSlot::InProgress(_)) => Err(GetTenantError::NotActive(tenant_id)), - None | Some(TenantSlot::Secondary) => Err(GetTenantError::NotFound(tenant_id)), + None | Some(TenantSlot::Secondary(_)) => Err(GetTenantError::NotFound(tenant_id)), } } @@ -996,7 +1116,7 @@ pub(crate) async fn get_active_tenant_with_timeout( _ => WaitFor::Tenant(tenant.clone()), } } - Some(TenantSlot::Secondary) => { + Some(TenantSlot::Secondary(_)) => { return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive( tenant_id, ))) @@ -1303,7 +1423,7 @@ pub(crate) async fn list_tenants() -> Result, Tenan Ok(m.iter() .filter_map(|(id, tenant)| match tenant { TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())), - TenantSlot::Secondary => None, + TenantSlot::Secondary(_) => None, TenantSlot::InProgress(_) => None, }) .collect()) @@ -1553,11 +1673,7 @@ impl SlotGuard { fn old_value_is_shutdown(&self) -> bool { match self.old_value.as_ref() { Some(TenantSlot::Attached(tenant)) => tenant.gate.close_complete(), - Some(TenantSlot::Secondary) => { - // TODO: when adding secondary mode tenants, this will check for shutdown - // in the same way that we do for `Tenant` above - true - } + Some(TenantSlot::Secondary(secondary_tenant)) => secondary_tenant.gate.close_complete(), Some(TenantSlot::InProgress(_)) => { // A SlotGuard cannot be constructed for a slot that was already InProgress unreachable!() @@ -1761,26 +1877,19 @@ where let mut slot_guard = tenant_map_acquire_slot_impl(&tenant_id, tenants, TenantSlotAcquireMode::MustExist)?; - // The SlotGuard allows us to manipulate the Tenant object without fear of some - // concurrent API request doing something else for the same tenant ID. - let attached_tenant = match slot_guard.get_old_value() { - Some(TenantSlot::Attached(t)) => Some(t), - _ => None, - }; - // allow pageserver shutdown to await for our completion let (_guard, progress) = completion::channel(); - // If the tenant was attached, shut it down gracefully. For secondary - // locations this part is not necessary - match &attached_tenant { - Some(attached_tenant) => { + // The SlotGuard allows us to manipulate the Tenant object without fear of some + // concurrent API request doing something else for the same tenant ID. + let attached_tenant = match slot_guard.get_old_value() { + Some(TenantSlot::Attached(tenant)) => { // whenever we remove a tenant from memory, we don't want to flush and wait for upload let freeze_and_flush = false; // shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so // that we can continue safely to cleanup. - match attached_tenant.shutdown(progress, freeze_and_flush).await { + match tenant.shutdown(progress, freeze_and_flush).await { Ok(()) => {} Err(_other) => { // if pageserver shutdown or other detach/ignore is already ongoing, we don't want to @@ -1789,11 +1898,19 @@ where return Err(TenantStateError::IsStopping(tenant_id)); } } + Some(tenant) } - None => { - // Nothing to wait on when not attached, proceed. + Some(TenantSlot::Secondary(secondary_state)) => { + tracing::info!("Shutting down in secondary mode"); + secondary_state.shutdown().await; + None } - } + Some(TenantSlot::InProgress(_)) => { + // Acquiring a slot guarantees its old value was not InProgress + unreachable!(); + } + None => None, + }; match tenant_cleanup .await