From 02f94edb60a7f4d290cbaf424436245a179b1859 Mon Sep 17 00:00:00 2001 From: Trung Dinh Date: Tue, 10 Jun 2025 02:26:40 -0700 Subject: [PATCH] Remove global static TENANTS (#12169) ## Problem There is this TODO in code: https://github.com/neondatabase/neon/blob/main/pageserver/src/tenant/mgr.rs#L300-L302 This is an old TODO by @jcsp. ## Summary of changes This PR addresses the TODO. Specifically, it removes a global static `TENANTS`. Instead the `TenantManager` now directly manages the tenant map. Enhancing abstraction. Essentially, this PR moves all module-level methods to inside the implementation of `TenantManager`. --- pageserver/src/bin/pageserver.rs | 7 +- pageserver/src/tenant/mgr.rs | 725 ++++++++++++++++--------------- 2 files changed, 388 insertions(+), 344 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 5cd865f53e..417503089a 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -573,7 +573,8 @@ fn start_pageserver( tokio::sync::mpsc::unbounded_channel(); let deletion_queue_client = deletion_queue.new_client(); let background_purges = mgr::BackgroundPurges::default(); - let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr( + + let tenant_manager = mgr::init( conf, background_purges.clone(), TenantSharedResources { @@ -584,10 +585,10 @@ fn start_pageserver( basebackup_prepare_sender, feature_resolver, }, - order, shutdown_pageserver.clone(), - ))?; + ); let tenant_manager = Arc::new(tenant_manager); + BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(tenant_manager.clone(), order))?; let basebackup_cache = BasebackupCache::spawn( BACKGROUND_RUNTIME.handle(), diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 186e0f4cdb..4aa459e923 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -12,7 +12,6 @@ use anyhow::Context; use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf}; use futures::StreamExt; use itertools::Itertools; -use once_cell::sync::Lazy; use pageserver_api::key::Key; use pageserver_api::models::{DetachBehavior, LocationConfigMode}; use pageserver_api::shard::{ @@ -103,7 +102,7 @@ pub(crate) enum TenantsMap { /// [`init_tenant_mgr`] is not done yet. Initializing, /// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded. - /// New tenants can be added using [`tenant_map_acquire_slot`]. + /// New tenants can be added using [`TenantManager::tenant_map_acquire_slot`]. Open(BTreeMap), /// The pageserver has entered shutdown mode via [`TenantManager::shutdown`]. /// Existing tenants are still accessible, but no new tenants can be created. @@ -284,9 +283,6 @@ impl BackgroundPurges { } } -static TENANTS: Lazy> = - Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing)); - /// Responsible for storing and mutating the collection of all tenants /// that this pageserver has state for. /// @@ -297,10 +293,7 @@ static TENANTS: Lazy> = /// and attached modes concurrently. pub struct TenantManager { conf: &'static PageServerConf, - // TODO: currently this is a &'static pointing to TENANTs. When we finish refactoring - // out of that static variable, the TenantManager can own this. - // See https://github.com/neondatabase/neon/issues/5796 - tenants: &'static std::sync::RwLock, + tenants: std::sync::RwLock, resources: TenantSharedResources, // Long-running operations that happen outside of a [`Tenant`] lifetime should respect this token. @@ -479,21 +472,43 @@ pub(crate) enum DeleteTenantError { Other(#[from] anyhow::Error), } -/// Initialize repositories with locally available timelines. +/// Initialize repositories at `Initializing` state. +pub fn init( + conf: &'static PageServerConf, + background_purges: BackgroundPurges, + resources: TenantSharedResources, + cancel: CancellationToken, +) -> TenantManager { + TenantManager { + conf, + tenants: std::sync::RwLock::new(TenantsMap::Initializing), + resources, + cancel, + background_purges, + } +} + +/// Transition repositories from `Initializing` state to `Open` state with locally available timelines. /// Timelines that are only partially available locally (remote storage has more data than this pageserver) /// are scheduled for download and added to the tenant once download is completed. #[instrument(skip_all)] pub async fn init_tenant_mgr( - conf: &'static PageServerConf, - background_purges: BackgroundPurges, - resources: TenantSharedResources, + tenant_manager: Arc, init_order: InitializationOrder, - cancel: CancellationToken, -) -> anyhow::Result { +) -> anyhow::Result<()> { + debug_assert!(matches!( + *tenant_manager.tenants.read().unwrap(), + TenantsMap::Initializing + )); let mut tenants = BTreeMap::new(); let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn); + let conf = tenant_manager.conf; + let resources = &tenant_manager.resources; + let cancel = &tenant_manager.cancel; + let background_purges = &tenant_manager.background_purges; + // Initialize dynamic limits that depend on system resources let system_memory = sysinfo::System::new_with_specifics(sysinfo::RefreshKind::new().with_memory()) @@ -512,7 +527,7 @@ pub async fn init_tenant_mgr( let tenant_configs = init_load_tenant_configs(conf).await; // Determine which tenants are to be secondary or attached, and in which generation - let tenant_modes = init_load_generations(conf, &tenant_configs, &resources, &cancel).await?; + let tenant_modes = init_load_generations(conf, &tenant_configs, resources, cancel).await?; tracing::info!( "Attaching {} tenants at startup, warming up {} at a time", @@ -669,18 +684,10 @@ pub async fn init_tenant_mgr( info!("Processed {} local tenants at startup", tenants.len()); - let mut tenants_map = TENANTS.write().unwrap(); - assert!(matches!(&*tenants_map, &TenantsMap::Initializing)); + let mut tenant_map = tenant_manager.tenants.write().unwrap(); + *tenant_map = TenantsMap::Open(tenants); - *tenants_map = TenantsMap::Open(tenants); - - Ok(TenantManager { - conf, - tenants: &TENANTS, - resources, - cancel: CancellationToken::new(), - background_purges, - }) + Ok(()) } /// Wrapper for Tenant::spawn that checks invariants before running @@ -719,142 +726,6 @@ fn tenant_spawn( ) } -async fn shutdown_all_tenants0(tenants: &std::sync::RwLock) { - let mut join_set = JoinSet::new(); - - #[cfg(all(debug_assertions, not(test)))] - { - // Check that our metrics properly tracked the size of the tenants map. This is a convenient location to check, - // as it happens implicitly at the end of tests etc. - let m = tenants.read().unwrap(); - debug_assert_eq!(METRICS.slots_total(), m.len() as u64); - } - - // Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants. - let (total_in_progress, total_attached) = { - let mut m = tenants.write().unwrap(); - match &mut *m { - TenantsMap::Initializing => { - *m = TenantsMap::ShuttingDown(BTreeMap::default()); - info!("tenants map is empty"); - return; - } - TenantsMap::Open(tenants) => { - let mut shutdown_state = BTreeMap::new(); - let mut total_in_progress = 0; - let mut total_attached = 0; - - for (tenant_shard_id, v) in std::mem::take(tenants).into_iter() { - match v { - TenantSlot::Attached(t) => { - shutdown_state.insert(tenant_shard_id, TenantSlot::Attached(t.clone())); - join_set.spawn( - async move { - let res = { - let (_guard, shutdown_progress) = completion::channel(); - t.shutdown(shutdown_progress, ShutdownMode::FreezeAndFlush).await - }; - - if let Err(other_progress) = res { - // join the another shutdown in progress - other_progress.wait().await; - } - - // we cannot afford per tenant logging here, because if s3 is degraded, we are - // going to log too many lines - debug!("tenant successfully stopped"); - } - .instrument(info_span!("shutdown", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug())), - ); - - total_attached += 1; - } - TenantSlot::Secondary(state) => { - // We don't need to wait for this individually per-tenant: the - // downloader task will be waited on eventually, this cancel - // is just to encourage it to drop out if it is doing work - // for this tenant right now. - state.cancel.cancel(); - - shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary(state)); - } - TenantSlot::InProgress(notify) => { - // InProgress tenants are not visible in TenantsMap::ShuttingDown: we will - // wait for their notifications to fire in this function. - join_set.spawn(async move { - notify.wait().await; - }); - - total_in_progress += 1; - } - } - } - *m = TenantsMap::ShuttingDown(shutdown_state); - (total_in_progress, total_attached) - } - TenantsMap::ShuttingDown(_) => { - error!( - "already shutting down, this function isn't supposed to be called more than once" - ); - return; - } - } - }; - - let started_at = std::time::Instant::now(); - - info!( - "Waiting for {} InProgress tenants and {} Attached tenants to shut down", - total_in_progress, total_attached - ); - - let total = join_set.len(); - let mut panicked = 0; - let mut buffering = true; - const BUFFER_FOR: std::time::Duration = std::time::Duration::from_millis(500); - let mut buffered = std::pin::pin!(tokio::time::sleep(BUFFER_FOR)); - - while !join_set.is_empty() { - tokio::select! { - Some(joined) = join_set.join_next() => { - match joined { - Ok(()) => {}, - Err(join_error) if join_error.is_cancelled() => { - unreachable!("we are not cancelling any of the tasks"); - } - Err(join_error) if join_error.is_panic() => { - // cannot really do anything, as this panic is likely a bug - panicked += 1; - } - Err(join_error) => { - warn!("unknown kind of JoinError: {join_error}"); - } - } - if !buffering { - // buffer so that every 500ms since the first update (or starting) we'll log - // how far away we are; this is because we will get SIGKILL'd at 10s, and we - // are not able to log *then*. - buffering = true; - buffered.as_mut().reset(tokio::time::Instant::now() + BUFFER_FOR); - } - }, - _ = &mut buffered, if buffering => { - buffering = false; - info!(remaining = join_set.len(), total, elapsed_ms = started_at.elapsed().as_millis(), "waiting for tenants to shutdown"); - } - } - } - - if panicked > 0 { - warn!( - panicked, - total, "observed panicks while shutting down tenants" - ); - } - - // caller will log how long we took -} - #[derive(thiserror::Error, Debug)] pub(crate) enum UpsertLocationError { #[error("Bad config request: {0}")] @@ -1056,7 +927,8 @@ impl TenantManager { // the tenant is inaccessible to the outside world while we are doing this, but that is sensible: // the state is ill-defined while we're in transition. Transitions are async, but fast: we do // not do significant I/O, and shutdowns should be prompt via cancellation tokens. - let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any) + let mut slot_guard = self + .tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any) .map_err(|e| match e { TenantSlotError::NotFound(_) => { unreachable!("Called with mode Any") @@ -1223,6 +1095,75 @@ impl TenantManager { } } + fn tenant_map_acquire_slot( + &self, + tenant_shard_id: &TenantShardId, + mode: TenantSlotAcquireMode, + ) -> Result { + use TenantSlotAcquireMode::*; + METRICS.tenant_slot_writes.inc(); + + let mut locked = self.tenants.write().unwrap(); + let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()); + let _guard = span.enter(); + + let m = match &mut *locked { + TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()), + TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown.into()), + TenantsMap::Open(m) => m, + }; + + use std::collections::btree_map::Entry; + + let entry = m.entry(*tenant_shard_id); + + match entry { + Entry::Vacant(v) => match mode { + MustExist => { + tracing::debug!("Vacant && MustExist: return NotFound"); + Err(TenantSlotError::NotFound(*tenant_shard_id)) + } + _ => { + let (completion, barrier) = utils::completion::channel(); + let inserting = TenantSlot::InProgress(barrier); + METRICS.slot_inserted(&inserting); + v.insert(inserting); + tracing::debug!("Vacant, inserted InProgress"); + Ok(SlotGuard::new( + *tenant_shard_id, + None, + completion, + &self.tenants, + )) + } + }, + Entry::Occupied(mut o) => { + // Apply mode-driven checks + match (o.get(), mode) { + (TenantSlot::InProgress(_), _) => { + tracing::debug!("Occupied, failing for InProgress"); + Err(TenantSlotError::InProgress) + } + _ => { + // Happy case: the slot was not in any state that violated our mode + let (completion, barrier) = utils::completion::channel(); + let in_progress = TenantSlot::InProgress(barrier); + METRICS.slot_inserted(&in_progress); + let old_value = o.insert(in_progress); + METRICS.slot_removed(&old_value); + tracing::debug!("Occupied, replaced with InProgress"); + Ok(SlotGuard::new( + *tenant_shard_id, + Some(old_value), + completion, + &self.tenants, + )) + } + } + } + } + } + /// Resetting a tenant is equivalent to detaching it, then attaching it again with the same /// LocationConf that was last used to attach it. Optionally, the local file cache may be /// dropped before re-attaching. @@ -1239,7 +1180,8 @@ impl TenantManager { drop_cache: bool, ctx: &RequestContext, ) -> anyhow::Result<()> { - let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?; + let mut slot_guard = + self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?; let Some(old_slot) = slot_guard.get_old_value() else { anyhow::bail!("Tenant not found when trying to reset"); }; @@ -1388,7 +1330,8 @@ impl TenantManager { Ok(()) } - let slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?; + let slot_guard = + self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?; match &slot_guard.old_value { Some(TenantSlot::Attached(tenant)) => { // Legacy deletion flow: the tenant remains attached, goes to Stopping state, and @@ -1539,7 +1482,7 @@ impl TenantManager { // Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant drop(tenant); let mut parent_slot_guard = - tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?; + self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?; let parent = match parent_slot_guard.get_old_value() { Some(TenantSlot::Attached(t)) => t, Some(TenantSlot::Secondary(_)) => anyhow::bail!("Tenant location in secondary mode"), @@ -1843,7 +1786,145 @@ impl TenantManager { pub(crate) async fn shutdown(&self) { self.cancel.cancel(); - shutdown_all_tenants0(self.tenants).await + self.shutdown_all_tenants0().await + } + + async fn shutdown_all_tenants0(&self) { + let mut join_set = JoinSet::new(); + + #[cfg(all(debug_assertions, not(test)))] + { + // Check that our metrics properly tracked the size of the tenants map. This is a convenient location to check, + // as it happens implicitly at the end of tests etc. + let m = self.tenants.read().unwrap(); + debug_assert_eq!(METRICS.slots_total(), m.len() as u64); + } + + // Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants. + let (total_in_progress, total_attached) = { + let mut m = self.tenants.write().unwrap(); + match &mut *m { + TenantsMap::Initializing => { + *m = TenantsMap::ShuttingDown(BTreeMap::default()); + info!("tenants map is empty"); + return; + } + TenantsMap::Open(tenants) => { + let mut shutdown_state = BTreeMap::new(); + let mut total_in_progress = 0; + let mut total_attached = 0; + + for (tenant_shard_id, v) in std::mem::take(tenants).into_iter() { + match v { + TenantSlot::Attached(t) => { + shutdown_state + .insert(tenant_shard_id, TenantSlot::Attached(t.clone())); + join_set.spawn( + async move { + let res = { + let (_guard, shutdown_progress) = completion::channel(); + t.shutdown(shutdown_progress, ShutdownMode::FreezeAndFlush).await + }; + + if let Err(other_progress) = res { + // join the another shutdown in progress + other_progress.wait().await; + } + + // we cannot afford per tenant logging here, because if s3 is degraded, we are + // going to log too many lines + debug!("tenant successfully stopped"); + } + .instrument(info_span!("shutdown", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug())), + ); + + total_attached += 1; + } + TenantSlot::Secondary(state) => { + // We don't need to wait for this individually per-tenant: the + // downloader task will be waited on eventually, this cancel + // is just to encourage it to drop out if it is doing work + // for this tenant right now. + state.cancel.cancel(); + + shutdown_state + .insert(tenant_shard_id, TenantSlot::Secondary(state)); + } + TenantSlot::InProgress(notify) => { + // InProgress tenants are not visible in TenantsMap::ShuttingDown: we will + // wait for their notifications to fire in this function. + join_set.spawn(async move { + notify.wait().await; + }); + + total_in_progress += 1; + } + } + } + *m = TenantsMap::ShuttingDown(shutdown_state); + (total_in_progress, total_attached) + } + TenantsMap::ShuttingDown(_) => { + error!( + "already shutting down, this function isn't supposed to be called more than once" + ); + return; + } + } + }; + + let started_at = std::time::Instant::now(); + + info!( + "Waiting for {} InProgress tenants and {} Attached tenants to shut down", + total_in_progress, total_attached + ); + + let total = join_set.len(); + let mut panicked = 0; + let mut buffering = true; + const BUFFER_FOR: std::time::Duration = std::time::Duration::from_millis(500); + let mut buffered = std::pin::pin!(tokio::time::sleep(BUFFER_FOR)); + + while !join_set.is_empty() { + tokio::select! { + Some(joined) = join_set.join_next() => { + match joined { + Ok(()) => {}, + Err(join_error) if join_error.is_cancelled() => { + unreachable!("we are not cancelling any of the tasks"); + } + Err(join_error) if join_error.is_panic() => { + // cannot really do anything, as this panic is likely a bug + panicked += 1; + } + Err(join_error) => { + warn!("unknown kind of JoinError: {join_error}"); + } + } + if !buffering { + // buffer so that every 500ms since the first update (or starting) we'll log + // how far away we are; this is because we will get SIGKILL'd at 10s, and we + // are not able to log *then*. + buffering = true; + buffered.as_mut().reset(tokio::time::Instant::now() + BUFFER_FOR); + } + }, + _ = &mut buffered, if buffering => { + buffering = false; + info!(remaining = join_set.len(), total, elapsed_ms = started_at.elapsed().as_millis(), "waiting for tenants to shutdown"); + } + } + } + + if panicked > 0 { + warn!( + panicked, + total, "observed panicks while shutting down tenants" + ); + } + + // caller will log how long we took } /// Detaches a tenant, and removes its local files asynchronously. @@ -1889,12 +1970,12 @@ impl TenantManager { .map(Some) }; - let mut removal_result = remove_tenant_from_memory( - self.tenants, - tenant_shard_id, - tenant_dir_rename_operation(tenant_shard_id), - ) - .await; + let mut removal_result = self + .remove_tenant_from_memory( + tenant_shard_id, + tenant_dir_rename_operation(tenant_shard_id), + ) + .await; // If the tenant was not found, it was likely already removed. Attempt to remove the tenant // directory on disk anyway. For example, during shard splits, we shut down and remove the @@ -1948,17 +2029,16 @@ impl TenantManager { ) -> Result, detach_ancestor::Error> { use detach_ancestor::Error; - let slot_guard = - tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist).map_err( - |e| { - use TenantSlotError::*; + let slot_guard = self + .tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist) + .map_err(|e| { + use TenantSlotError::*; - match e { - MapState(TenantMapError::ShuttingDown) => Error::ShuttingDown, - NotFound(_) | InProgress | MapState(_) => Error::DetachReparent(e.into()), - } - }, - )?; + match e { + MapState(TenantMapError::ShuttingDown) => Error::ShuttingDown, + NotFound(_) | InProgress | MapState(_) => Error::DetachReparent(e.into()), + } + })?; let tenant = { let old_slot = slot_guard @@ -2291,6 +2371,80 @@ impl TenantManager { other => ApiError::InternalServerError(anyhow::anyhow!(other)), }) } + + /// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise. + /// Allows to remove other tenant resources manually, via `tenant_cleanup`. + /// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal + async fn remove_tenant_from_memory( + &self, + tenant_shard_id: TenantShardId, + tenant_cleanup: F, + ) -> Result + where + F: std::future::Future>, + { + let mut slot_guard = + self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?; + + // allow pageserver shutdown to await for our completion + let (_guard, progress) = completion::channel(); + + // The SlotGuard allows us to manipulate the Tenant object without fear of some + // concurrent API request doing something else for the same tenant ID. + let attached_tenant = match slot_guard.get_old_value() { + Some(TenantSlot::Attached(tenant)) => { + // whenever we remove a tenant from memory, we don't want to flush and wait for upload + let shutdown_mode = ShutdownMode::Hard; + + // shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so + // that we can continue safely to cleanup. + match tenant.shutdown(progress, shutdown_mode).await { + Ok(()) => {} + Err(_other) => { + // if pageserver shutdown or other detach/ignore is already ongoing, we don't want to + // wait for it but return an error right away because these are distinct requests. + slot_guard.revert(); + return Err(TenantStateError::IsStopping(tenant_shard_id)); + } + } + Some(tenant) + } + Some(TenantSlot::Secondary(secondary_state)) => { + tracing::info!("Shutting down in secondary mode"); + secondary_state.shutdown().await; + None + } + Some(TenantSlot::InProgress(_)) => { + // Acquiring a slot guarantees its old value was not InProgress + unreachable!(); + } + None => None, + }; + + match tenant_cleanup + .await + .with_context(|| format!("Failed to run cleanup for tenant {tenant_shard_id}")) + { + Ok(hook_value) => { + // Success: drop the old TenantSlot::Attached. + slot_guard + .drop_old_value() + .expect("We just called shutdown"); + + Ok(hook_value) + } + Err(e) => { + // If we had a Tenant, set it to Broken and put it back in the TenantsMap + if let Some(attached_tenant) = attached_tenant { + attached_tenant.set_broken(e.to_string()).await; + } + // Leave the broken tenant in the map + slot_guard.revert(); + + Err(TenantStateError::Other(e)) + } + } + } } #[derive(Debug, thiserror::Error)] @@ -2455,7 +2609,7 @@ pub(crate) enum TenantMapError { /// this tenant to retry later, or wait for the InProgress state to end. /// /// This structure enforces the important invariant that we do not have overlapping -/// tasks that will try use local storage for a the same tenant ID: we enforce that +/// tasks that will try to use local storage for a the same tenant ID: we enforce that /// the previous contents of a slot have been shut down before the slot can be /// left empty or used for something else /// @@ -2468,7 +2622,7 @@ pub(crate) enum TenantMapError { /// The `old_value` may be dropped before the SlotGuard is dropped, by calling /// `drop_old_value`. It is an error to call this without shutting down /// the conents of `old_value`. -pub(crate) struct SlotGuard { +pub(crate) struct SlotGuard<'a> { tenant_shard_id: TenantShardId, old_value: Option, upserted: bool, @@ -2476,19 +2630,23 @@ pub(crate) struct SlotGuard { /// [`TenantSlot::InProgress`] carries the corresponding Barrier: it will /// release any waiters as soon as this SlotGuard is dropped. completion: utils::completion::Completion, + + tenants: &'a std::sync::RwLock, } -impl SlotGuard { +impl<'a> SlotGuard<'a> { fn new( tenant_shard_id: TenantShardId, old_value: Option, completion: utils::completion::Completion, + tenants: &'a std::sync::RwLock, ) -> Self { Self { tenant_shard_id, old_value, upserted: false, completion, + tenants, } } @@ -2512,8 +2670,8 @@ impl SlotGuard { )); } - let replaced = { - let mut locked = TENANTS.write().unwrap(); + let replaced: Option = { + let mut locked = self.tenants.write().unwrap(); if let TenantSlot::InProgress(_) = new_value { // It is never expected to try and upsert InProgress via this path: it should @@ -2621,7 +2779,7 @@ impl SlotGuard { } } -impl Drop for SlotGuard { +impl<'a> Drop for SlotGuard<'a> { fn drop(&mut self) { if self.upserted { return; @@ -2629,7 +2787,7 @@ impl Drop for SlotGuard { // Our old value is already shutdown, or it never existed: it is safe // for us to fully release the TenantSlot back into an empty state - let mut locked = TENANTS.write().unwrap(); + let mut locked = self.tenants.write().unwrap(); let m = match &mut *locked { TenantsMap::Initializing => { @@ -2711,151 +2869,6 @@ enum TenantSlotAcquireMode { MustExist, } -fn tenant_map_acquire_slot( - tenant_shard_id: &TenantShardId, - mode: TenantSlotAcquireMode, -) -> Result { - tenant_map_acquire_slot_impl(tenant_shard_id, &TENANTS, mode) -} - -fn tenant_map_acquire_slot_impl( - tenant_shard_id: &TenantShardId, - tenants: &std::sync::RwLock, - mode: TenantSlotAcquireMode, -) -> Result { - use TenantSlotAcquireMode::*; - METRICS.tenant_slot_writes.inc(); - - let mut locked = tenants.write().unwrap(); - let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()); - let _guard = span.enter(); - - let m = match &mut *locked { - TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()), - TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown.into()), - TenantsMap::Open(m) => m, - }; - - use std::collections::btree_map::Entry; - - let entry = m.entry(*tenant_shard_id); - - match entry { - Entry::Vacant(v) => match mode { - MustExist => { - tracing::debug!("Vacant && MustExist: return NotFound"); - Err(TenantSlotError::NotFound(*tenant_shard_id)) - } - _ => { - let (completion, barrier) = utils::completion::channel(); - let inserting = TenantSlot::InProgress(barrier); - METRICS.slot_inserted(&inserting); - v.insert(inserting); - tracing::debug!("Vacant, inserted InProgress"); - Ok(SlotGuard::new(*tenant_shard_id, None, completion)) - } - }, - Entry::Occupied(mut o) => { - // Apply mode-driven checks - match (o.get(), mode) { - (TenantSlot::InProgress(_), _) => { - tracing::debug!("Occupied, failing for InProgress"); - Err(TenantSlotError::InProgress) - } - _ => { - // Happy case: the slot was not in any state that violated our mode - let (completion, barrier) = utils::completion::channel(); - let in_progress = TenantSlot::InProgress(barrier); - METRICS.slot_inserted(&in_progress); - let old_value = o.insert(in_progress); - METRICS.slot_removed(&old_value); - tracing::debug!("Occupied, replaced with InProgress"); - Ok(SlotGuard::new( - *tenant_shard_id, - Some(old_value), - completion, - )) - } - } - } - } -} - -/// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise. -/// Allows to remove other tenant resources manually, via `tenant_cleanup`. -/// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal -/// operation would be needed to remove it. -async fn remove_tenant_from_memory( - tenants: &std::sync::RwLock, - tenant_shard_id: TenantShardId, - tenant_cleanup: F, -) -> Result -where - F: std::future::Future>, -{ - let mut slot_guard = - tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?; - - // allow pageserver shutdown to await for our completion - let (_guard, progress) = completion::channel(); - - // The SlotGuard allows us to manipulate the Tenant object without fear of some - // concurrent API request doing something else for the same tenant ID. - let attached_tenant = match slot_guard.get_old_value() { - Some(TenantSlot::Attached(tenant)) => { - // whenever we remove a tenant from memory, we don't want to flush and wait for upload - let shutdown_mode = ShutdownMode::Hard; - - // shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so - // that we can continue safely to cleanup. - match tenant.shutdown(progress, shutdown_mode).await { - Ok(()) => {} - Err(_other) => { - // if pageserver shutdown or other detach/ignore is already ongoing, we don't want to - // wait for it but return an error right away because these are distinct requests. - slot_guard.revert(); - return Err(TenantStateError::IsStopping(tenant_shard_id)); - } - } - Some(tenant) - } - Some(TenantSlot::Secondary(secondary_state)) => { - tracing::info!("Shutting down in secondary mode"); - secondary_state.shutdown().await; - None - } - Some(TenantSlot::InProgress(_)) => { - // Acquiring a slot guarantees its old value was not InProgress - unreachable!(); - } - None => None, - }; - - match tenant_cleanup - .await - .with_context(|| format!("Failed to run cleanup for tenant {tenant_shard_id}")) - { - Ok(hook_value) => { - // Success: drop the old TenantSlot::Attached. - slot_guard - .drop_old_value() - .expect("We just called shutdown"); - - Ok(hook_value) - } - Err(e) => { - // If we had a Tenant, set it to Broken and put it back in the TenantsMap - if let Some(attached_tenant) = attached_tenant { - attached_tenant.set_broken(e.to_string()).await; - } - // Leave the broken tenant in the map - slot_guard.revert(); - - Err(TenantStateError::Other(e)) - } - } -} - use http_utils::error::ApiError; use pageserver_api::models::TimelineGcRequest; @@ -2866,11 +2879,15 @@ mod tests { use std::collections::BTreeMap; use std::sync::Arc; + use storage_broker::BrokerClientChannel; use tracing::Instrument; use super::super::harness::TenantHarness; use super::TenantsMap; - use crate::tenant::mgr::TenantSlot; + use crate::tenant::{ + TenantSharedResources, + mgr::{BackgroundPurges, TenantManager, TenantSlot}, + }; #[tokio::test(start_paused = true)] async fn shutdown_awaits_in_progress_tenant() { @@ -2891,23 +2908,47 @@ mod tests { let _e = span.enter(); let tenants = BTreeMap::from([(id, TenantSlot::Attached(t.clone()))]); - let tenants = Arc::new(std::sync::RwLock::new(TenantsMap::Open(tenants))); // Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually // permit it to proceed: that will stick the tenant in InProgress + let (basebackup_prepare_sender, _) = tokio::sync::mpsc::unbounded_channel::< + crate::basebackup_cache::BasebackupPrepareRequest, + >(); + + let tenant_manager = TenantManager { + tenants: std::sync::RwLock::new(TenantsMap::Open(tenants)), + conf: h.conf, + resources: TenantSharedResources { + broker_client: BrokerClientChannel::connect_lazy("foobar.com") + .await + .unwrap(), + remote_storage: h.remote_storage.clone(), + deletion_queue_client: h.deletion_queue.new_client(), + l0_flush_global_state: crate::l0_flush::L0FlushGlobalState::new( + h.conf.l0_flush.clone(), + ), + basebackup_prepare_sender, + feature_resolver: crate::feature_resolver::FeatureResolver::new_disabled(), + }, + cancel: tokio_util::sync::CancellationToken::new(), + background_purges: BackgroundPurges::default(), + }; + + let tenant_manager = Arc::new(tenant_manager); + let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel(); let (until_cleanup_started, cleanup_started) = utils::completion::channel(); let mut remove_tenant_from_memory_task = { + let tenant_manager = tenant_manager.clone(); let jh = tokio::spawn({ - let tenants = tenants.clone(); async move { let cleanup = async move { drop(until_cleanup_started); can_complete_cleanup.wait().await; anyhow::Ok(()) }; - super::remove_tenant_from_memory(&tenants, id, cleanup).await + tenant_manager.remove_tenant_from_memory(id, cleanup).await } .instrument(h.span()) }); @@ -2920,9 +2961,11 @@ mod tests { let mut shutdown_task = { let (until_shutdown_started, shutdown_started) = utils::completion::channel(); + let tenant_manager = tenant_manager.clone(); + let shutdown_task = tokio::spawn(async move { drop(until_shutdown_started); - super::shutdown_all_tenants0(&tenants).await; + tenant_manager.shutdown_all_tenants0().await; }); shutdown_started.wait().await;