From c5300e55f665654d4f7b26f3eec2f20d080dac18 Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 10 Oct 2023 12:34:48 +0100 Subject: [PATCH] pageserver: add InProgress top level state & make TenantsMap lock synchronous --- pageserver/src/consumption_metrics.rs | 2 +- pageserver/src/consumption_metrics/metrics.rs | 1 - pageserver/src/deletion_queue/check.log | 2 + pageserver/src/disk_usage_eviction_task.rs | 2 +- pageserver/src/http/routes.rs | 77 +- pageserver/src/page_service.rs | 2 +- pageserver/src/tenant.rs | 18 +- pageserver/src/tenant/delete.rs | 38 +- pageserver/src/tenant/mgr.rs | 902 +++++++++++------- .../src/tenant/timeline/eviction_task.rs | 15 +- 10 files changed, 654 insertions(+), 405 deletions(-) create mode 100644 pageserver/src/deletion_queue/check.log diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index 061045eb76..9e8377c1f1 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -266,7 +266,7 @@ async fn calculate_synthetic_size_worker( continue; } - if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await { + if let Ok(tenant) = mgr::get_tenant(tenant_id, true) { // TODO should we use concurrent_background_tasks_rate_limit() here, like the other background tasks? // We can put in some prioritization for consumption metrics. // Same for the loop that fetches computed metrics. diff --git a/pageserver/src/consumption_metrics/metrics.rs b/pageserver/src/consumption_metrics/metrics.rs index 652dd98683..4986d38c1a 100644 --- a/pageserver/src/consumption_metrics/metrics.rs +++ b/pageserver/src/consumption_metrics/metrics.rs @@ -206,7 +206,6 @@ pub(super) async fn collect_all_metrics( None } else { crate::tenant::mgr::get_tenant(id, true) - .await .ok() .map(|tenant| (id, tenant)) } diff --git a/pageserver/src/deletion_queue/check.log b/pageserver/src/deletion_queue/check.log new file mode 100644 index 0000000000..c75c3092b2 --- /dev/null +++ b/pageserver/src/deletion_queue/check.log @@ -0,0 +1,2 @@ + Checking pageserver v0.1.0 (/home/neon/neon/pageserver) + Finished dev [optimized + debuginfo] target(s) in 7.62s diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index 413c941bc4..36476275e9 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -545,7 +545,7 @@ async fn collect_eviction_candidates( if cancel.is_cancelled() { return Ok(EvictionCandidates::Cancelled); } - let tenant = match tenant::mgr::get_tenant(*tenant_id, true).await { + let tenant = match tenant::mgr::get_tenant(*tenant_id, true) { Ok(tenant) => tenant, Err(e) => { // this can happen if tenant has lifecycle transition after we fetched it diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 2c46d733d6..591e20ae9a 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -36,7 +36,8 @@ use crate::pgdatadir_mapping::LsnForTimestamp; use crate::task_mgr::TaskKind; use crate::tenant::config::{LocationConf, TenantConfOpt}; use crate::tenant::mgr::{ - GetTenantError, SetNewTenantConfigError, TenantMapInsertError, TenantStateError, + GetTenantError, SetNewTenantConfigError, TenantMapError, TenantMapInsertError, TenantSlotError, + TenantSlotUpsertError, TenantStateError, }; use crate::tenant::size::ModelInputs; use crate::tenant::storage_layer::LayerAccessStatsReset; @@ -147,28 +148,60 @@ impl From for ApiError { impl From for ApiError { fn from(tmie: TenantMapInsertError) -> ApiError { match tmie { - TenantMapInsertError::StillInitializing | TenantMapInsertError::ShuttingDown => { - ApiError::ResourceUnavailable(format!("{tmie}").into()) - } - TenantMapInsertError::TenantAlreadyExists(id, state) => { - ApiError::Conflict(format!("tenant {id} already exists, state: {state:?}")) - } - TenantMapInsertError::TenantExistsSecondary(id) => { - ApiError::Conflict(format!("tenant {id} already exists as secondary")) - } + TenantMapInsertError::SlotError(e) => e.into(), + TenantMapInsertError::SlotUpsertError(e) => e.into(), TenantMapInsertError::Other(e) => ApiError::InternalServerError(e), } } } +impl From for ApiError { + fn from(e: TenantSlotError) -> ApiError { + use TenantSlotError::*; + match e { + NotFound(tenant_id) => { + ApiError::NotFound(anyhow::anyhow!("NotFound: tenant {tenant_id}").into()) + } + e @ AlreadyExists(_, _) => ApiError::Conflict(format!("{e}")), + e @ Conflict(_) => ApiError::Conflict(format!("{e}")), + InProgress => { + ApiError::ResourceUnavailable("Tenant is being modified concurrently".into()) + } + MapState(e) => e.into(), + } + } +} + +impl From for ApiError { + fn from(e: TenantSlotUpsertError) -> ApiError { + use TenantSlotUpsertError::*; + match e { + InternalError(e) => ApiError::InternalServerError(anyhow::anyhow!("{e}")), + MapState(e) => e.into(), + } + } +} + +impl From for ApiError { + fn from(e: TenantMapError) -> ApiError { + use TenantMapError::*; + match e { + StillInitializing | ShuttingDown => { + ApiError::ResourceUnavailable(format!("{e}").into()) + } + } + } +} + impl From for ApiError { fn from(tse: TenantStateError) -> ApiError { match tse { - TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()), TenantStateError::IsStopping(_) => { ApiError::ResourceUnavailable("Tenant is stopping".into()) } - _ => ApiError::InternalServerError(anyhow::Error::new(tse)), + TenantStateError::SlotError(e) => e.into(), + TenantStateError::SlotUpsertError(e) => e.into(), + TenantStateError::Other(e) => ApiError::InternalServerError(anyhow!(e)), } } } @@ -243,6 +276,9 @@ impl From for ApiError { Get(g) => ApiError::from(g), e @ AlreadyInProgress => ApiError::Conflict(e.to_string()), Timeline(t) => ApiError::from(t), + NotAttached => ApiError::NotFound(anyhow::anyhow!("Tenant is not attached").into()), + SlotError(e) => e.into(), + SlotUpsertError(e) => e.into(), Other(o) => ApiError::InternalServerError(o), e @ InvalidState(_) => ApiError::PreconditionFailed(e.to_string().into_boxed_str()), } @@ -369,7 +405,7 @@ async fn timeline_create_handler( let state = get_state(&request); async { - let tenant = mgr::get_tenant(tenant_id, true).await?; + let tenant = mgr::get_tenant(tenant_id, true)?; match tenant.create_timeline( new_timeline_id, request_data.ancestor_timeline_id.map(TimelineId::from), @@ -416,7 +452,7 @@ async fn timeline_list_handler( let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let response_data = async { - let tenant = mgr::get_tenant(tenant_id, true).await?; + let tenant = mgr::get_tenant(tenant_id, true)?; let timelines = tenant.list_timelines(); let mut response_data = Vec::with_capacity(timelines.len()); @@ -455,7 +491,7 @@ async fn timeline_detail_handler( let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let timeline_info = async { - let tenant = mgr::get_tenant(tenant_id, true).await?; + let tenant = mgr::get_tenant(tenant_id, true)?; let timeline = tenant .get_timeline(timeline_id, false) @@ -713,7 +749,7 @@ async fn tenant_status( check_permission(&request, Some(tenant_id))?; let tenant_info = async { - let tenant = mgr::get_tenant(tenant_id, false).await?; + let tenant = mgr::get_tenant(tenant_id, false)?; // Calculate total physical size of all timelines let mut current_physical_size = 0; @@ -776,7 +812,7 @@ async fn tenant_size_handler( let headers = request.headers(); let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); - let tenant = mgr::get_tenant(tenant_id, true).await?; + let tenant = mgr::get_tenant(tenant_id, true)?; // this can be long operation let inputs = tenant @@ -1035,7 +1071,7 @@ async fn get_tenant_config_handler( let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; check_permission(&request, Some(tenant_id))?; - let tenant = mgr::get_tenant(tenant_id, false).await?; + let tenant = mgr::get_tenant(tenant_id, false)?; let response = HashMap::from([ ( @@ -1094,7 +1130,7 @@ async fn put_tenant_location_config_handler( .await { match e { - TenantStateError::NotFound(_) => { + TenantStateError::SlotError(TenantSlotError::NotFound(_)) => { // This API is idempotent: a NotFound on a detach is fine. } _ => return Err(e.into()), @@ -1132,7 +1168,6 @@ async fn handle_tenant_break( let tenant_id: TenantId = parse_request_param(&r, "tenant_id")?; let tenant = crate::tenant::mgr::get_tenant(tenant_id, true) - .await .map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?; tenant.set_broken("broken from test".to_owned()).await; @@ -1437,7 +1472,7 @@ async fn active_timeline_of_active_tenant( tenant_id: TenantId, timeline_id: TimelineId, ) -> Result, ApiError> { - let tenant = mgr::get_tenant(tenant_id, true).await?; + let tenant = mgr::get_tenant(tenant_id, true)?; tenant .get_timeline(timeline_id, true) .map_err(|e| ApiError::NotFound(e.into())) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 536334d051..4d265a8c6f 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1314,7 +1314,7 @@ async fn get_active_tenant_with_timeout( tenant_id: TenantId, _ctx: &RequestContext, /* require get a context to support cancellation in the future */ ) -> Result, GetActiveTenantError> { - let tenant = match mgr::get_tenant(tenant_id, false).await { + let tenant = match mgr::get_tenant(tenant_id, false) { Ok(tenant) => tenant, Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)), Err(GetTenantError::NotActive(_)) => { diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 3a426ac87b..62da6b618e 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -254,6 +254,12 @@ pub struct Tenant { pub(crate) delete_progress: Arc>, } +impl std::fmt::Debug for Tenant { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} ({})", self.tenant_id, self.current_state()) + } +} + pub(crate) enum WalRedoManager { Prod(PostgresRedoManager), #[cfg(test)] @@ -526,7 +532,7 @@ impl Tenant { resources: TenantSharedResources, attached_conf: AttachedTenantConf, init_order: Option, - tenants: &'static tokio::sync::RwLock, + tenants: &'static std::sync::RwLock, mode: SpawnMode, ctx: &RequestContext, ) -> anyhow::Result> { @@ -1833,6 +1839,7 @@ impl Tenant { } Err(SetStoppingError::AlreadyStopping(other)) => { // give caller the option to wait for this this shutdown + info!("Tenant::shutdown: AlreadyStopping"); return Err(other); } }; @@ -2110,6 +2117,9 @@ where } impl Tenant { + pub fn get_tenant_id(&self) -> TenantId { + self.tenant_id + } pub fn tenant_specific_overrides(&self) -> TenantConfOpt { self.tenant_conf.read().unwrap().tenant_conf } @@ -4236,11 +4246,7 @@ mod tests { metadata_bytes[8] ^= 1; std::fs::write(metadata_path, metadata_bytes)?; - let err = harness - .try_load_local(&ctx) - .await - .err() - .expect("should fail"); + let err = harness.try_load_local(&ctx).await.expect_err("should fail"); // get all the stack with all .context, not only the last one let message = format!("{err:#}"); let expected = "failed to load metadata"; diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index 87b48e4bee..7344dd1d92 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -21,7 +21,7 @@ use crate::{ }; use super::{ - mgr::{GetTenantError, TenantsMap}, + mgr::{GetTenantError, TenantSlotError, TenantSlotUpsertError, TenantsMap}, remote_timeline_client::{FAILED_REMOTE_OP_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD}, span, timeline::delete::DeleteTimelineFlow, @@ -33,12 +33,21 @@ pub(crate) enum DeleteTenantError { #[error("GetTenant {0}")] Get(#[from] GetTenantError), + #[error("Tenant not attached")] + NotAttached, + #[error("Invalid state {0}. Expected Active or Broken")] InvalidState(TenantState), #[error("Tenant deletion is already in progress")] AlreadyInProgress, + #[error("Tenant map slot error {0}")] + SlotError(#[from] TenantSlotError), + + #[error("Tenant map slot upsert error {0}")] + SlotUpsertError(#[from] TenantSlotUpsertError), + #[error("Timeline {0}")] Timeline(#[from] DeleteTimelineError), @@ -273,12 +282,12 @@ impl DeleteTenantFlow { pub(crate) async fn run( conf: &'static PageServerConf, remote_storage: Option, - tenants: &'static tokio::sync::RwLock, - tenant_id: TenantId, + tenants: &'static std::sync::RwLock, + tenant: Arc, ) -> Result<(), DeleteTenantError> { span::debug_assert_current_span_has_tenant_id(); - let (tenant, mut guard) = Self::prepare(tenants, tenant_id).await?; + let mut guard = Self::prepare(&tenant).await?; if let Err(e) = Self::run_inner(&mut guard, conf, remote_storage.as_ref(), &tenant).await { tenant.set_broken(format!("{e:#}")).await; @@ -378,7 +387,7 @@ impl DeleteTenantFlow { guard: DeletionGuard, tenant: &Arc, preload: Option, - tenants: &'static tokio::sync::RwLock, + tenants: &'static std::sync::RwLock, init_order: Option, ctx: &RequestContext, ) -> Result<(), DeleteTenantError> { @@ -405,15 +414,8 @@ impl DeleteTenantFlow { } async fn prepare( - tenants: &tokio::sync::RwLock, - tenant_id: TenantId, - ) -> Result<(Arc, tokio::sync::OwnedMutexGuard), DeleteTenantError> { - let m = tenants.read().await; - - let tenant = m - .get(&tenant_id) - .ok_or(GetTenantError::NotFound(tenant_id))?; - + tenant: &Arc, + ) -> Result, DeleteTenantError> { // FIXME: unsure about active only. Our init jobs may not be cancellable properly, // so at least for now allow deletions only for active tenants. TODO recheck // Broken and Stopping is needed for retries. @@ -447,14 +449,14 @@ impl DeleteTenantFlow { ))); } - Ok((Arc::clone(tenant), guard)) + Ok(guard) } fn schedule_background( guard: OwnedMutexGuard, conf: &'static PageServerConf, remote_storage: Option, - tenants: &'static tokio::sync::RwLock, + tenants: &'static std::sync::RwLock, tenant: Arc, ) { let tenant_id = tenant.tenant_id; @@ -487,7 +489,7 @@ impl DeleteTenantFlow { mut guard: OwnedMutexGuard, conf: &PageServerConf, remote_storage: Option, - tenants: &'static tokio::sync::RwLock, + tenants: &'static std::sync::RwLock, tenant: &Arc, ) -> Result<(), DeleteTenantError> { // Tree sort timelines, schedule delete for them. Mention retries from the console side. @@ -535,7 +537,7 @@ impl DeleteTenantFlow { .await .context("cleanup_remaining_fs_traces")?; - let mut locked = tenants.write().await; + let mut locked = tenants.write().unwrap(); if locked.remove(&tenant.tenant_id).is_none() { warn!("Tenant got removed from tenants map during deletion"); }; diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 33fdc76f8d..d6e8513444 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -3,13 +3,14 @@ use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf}; use rand::{distributions::Alphanumeric, Rng}; -use std::collections::{hash_map, HashMap}; +use std::borrow::Cow; +use std::collections::HashMap; +use std::ops::Deref; use std::sync::Arc; use tokio::fs; use anyhow::Context; use once_cell::sync::Lazy; -use tokio::sync::RwLock; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::*; @@ -47,10 +48,22 @@ use super::TenantSharedResources; /// that way we avoid having to carefully switch a tenant's ingestion etc on and off during /// its lifetime, and we can preserve some important safety invariants like `Tenant` always /// having a properly acquired generation (Secondary doesn't need a generation) -#[derive(Clone)] pub(crate) enum TenantSlot { Attached(Arc), Secondary, + /// In this state, other administrative operations acting on the TenantId should + /// block, or return a retry indicator equivalent to HTTP 503. + InProgress(utils::completion::Barrier), +} + +impl std::fmt::Debug for TenantSlot { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Attached(tenant) => write!(f, "Attached({})", tenant.current_state()), + Self::Secondary => write!(f, "Secondary"), + Self::InProgress(_) => write!(f, "InProgress"), + } + } } impl TenantSlot { @@ -59,14 +72,7 @@ impl TenantSlot { match self { Self::Attached(t) => Some(t), Self::Secondary => None, - } - } - - /// Consume self and return the `Tenant` that was in this slot if attached, else None - fn into_attached(self) -> Option> { - match self { - Self::Attached(t) => Some(t), - Self::Secondary => None, + Self::InProgress(_) => None, } } } @@ -77,7 +83,7 @@ pub(crate) enum TenantsMap { /// [`init_tenant_mgr`] is not done yet. Initializing, /// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded. - /// New tenants can be added using [`tenant_map_insert`]. + /// New tenants can be added using [`tenant_map_acquire_slot`]. Open(HashMap), /// The pageserver has entered shutdown mode via [`shutdown_all_tenants`]. /// Existing tenants are still accessible, but no new tenants can be created. @@ -97,19 +103,10 @@ impl TenantsMap { } } - /// Get the contents of the map at this tenant ID, even if it is in secondary state. - pub(crate) fn get_slot(&self, tenant_id: &TenantId) -> Option<&TenantSlot> { + pub(crate) fn remove(&mut self, tenant_id: &TenantId) -> Option { match self { TenantsMap::Initializing => None, - TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.get(tenant_id), - } - } - pub(crate) fn remove(&mut self, tenant_id: &TenantId) -> Option> { - match self { - TenantsMap::Initializing => None, - TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => { - m.remove(tenant_id).and_then(TenantSlot::into_attached) - } + TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id), } } } @@ -147,7 +144,8 @@ async fn safe_rename_tenant_dir(path: impl AsRef) -> std::io::Result> = Lazy::new(|| RwLock::new(TenantsMap::Initializing)); +static TENANTS: Lazy> = + Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing)); /// Create a directory, including parents. This does no fsyncs and makes /// no guarantees about the persistence of the resulting metadata: for @@ -456,7 +454,7 @@ pub async fn init_tenant_mgr( info!("Processed {} local tenants at startup", tenants.len()); - let mut tenants_map = TENANTS.write().await; + let mut tenants_map = TENANTS.write().unwrap(); assert!(matches!(&*tenants_map, &TenantsMap::Initializing)); *tenants_map = TenantsMap::Open(tenants); Ok(()) @@ -472,7 +470,7 @@ pub(crate) fn tenant_spawn( resources: TenantSharedResources, location_conf: AttachedTenantConf, init_order: Option, - tenants: &'static tokio::sync::RwLock, + tenants: &'static std::sync::RwLock, mode: SpawnMode, ctx: &RequestContext, ) -> anyhow::Result> { @@ -533,12 +531,13 @@ pub(crate) async fn shutdown_all_tenants() { shutdown_all_tenants0(&TENANTS).await } -async fn shutdown_all_tenants0(tenants: &tokio::sync::RwLock) { +async fn shutdown_all_tenants0(tenants: &std::sync::RwLock) { use utils::completion; - // Prevent new tenants from being created. - let tenants_to_shut_down = { - let mut m = tenants.write().await; + // Under write lock (prevent any new tenants being created), extract the list + // of tenants to shut down. + let (in_progress_ops, tenants_to_shut_down) = { + let mut m = tenants.write().unwrap(); match &mut *m { TenantsMap::Initializing => { *m = TenantsMap::ShuttingDown(HashMap::default()); @@ -546,9 +545,28 @@ async fn shutdown_all_tenants0(tenants: &tokio::sync::RwLock) { return; } TenantsMap::Open(tenants) => { - let tenants_clone = tenants.clone(); - *m = TenantsMap::ShuttingDown(std::mem::take(tenants)); - tenants_clone + let mut shutdown_state = HashMap::new(); + let mut in_progress_ops = Vec::new(); + let mut tenants_to_shut_down = Vec::new(); + + for (k, v) in tenants.drain() { + match v { + TenantSlot::Attached(t) => { + tenants_to_shut_down.push(t.clone()); + shutdown_state.insert(k, TenantSlot::Attached(t)); + } + TenantSlot::Secondary => { + shutdown_state.insert(k, TenantSlot::Secondary); + } + TenantSlot::InProgress(notify) => { + // InProgress tenants are not visible in TenantsMap::ShuttingDown: we will + // wait for their notifications to fire in this function. + in_progress_ops.push(notify); + } + } + } + *m = TenantsMap::ShuttingDown(shutdown_state); + (in_progress_ops, tenants_to_shut_down) } TenantsMap::ShuttingDown(_) => { // TODO: it is possible that detach and shutdown happen at the same time. as a @@ -559,25 +577,29 @@ async fn shutdown_all_tenants0(tenants: &tokio::sync::RwLock) { } }; + info!( + "Waiting for {} InProgress tenants to complete...", + in_progress_ops.len() + ); + for barrier in in_progress_ops { + barrier.wait().await; + } + + info!( + "Waiting for {} attached tenants to shut down...", + tenants_to_shut_down.len() + ); let started_at = std::time::Instant::now(); let mut join_set = JoinSet::new(); - for (tenant_id, tenant) in tenants_to_shut_down { + for tenant in tenants_to_shut_down { + let tenant_id = tenant.get_tenant_id(); join_set.spawn( async move { let freeze_and_flush = true; let res = { let (_guard, shutdown_progress) = completion::channel(); - match tenant { - TenantSlot::Attached(t) => { - t.shutdown(shutdown_progress, freeze_and_flush).await - } - TenantSlot::Secondary => { - // TODO: once secondary mode downloads are implemented, - // ensure they have all stopped before we reach this point. - Ok(()) - } - } + tenant.shutdown(shutdown_progress, freeze_and_flush).await }; if let Err(other_progress) = res { @@ -649,42 +671,35 @@ pub(crate) async fn create_tenant( resources: TenantSharedResources, ctx: &RequestContext, ) -> Result, TenantMapInsertError> { - tenant_map_insert(tenant_id, || async { - let location_conf = LocationConf::attached_single(tenant_conf, generation); + let location_conf = LocationConf::attached_single(tenant_conf, generation); - // We're holding the tenants lock in write mode while doing local IO. - // If this section ever becomes contentious, introduce a new `TenantState::Creating` - // and do the work in that state. - super::create_tenant_files(conf, &location_conf, &tenant_id).await?; + let tenant_guard = tenant_map_acquire_slot(&tenant_id, Some(false))?; + let tenant_path = super::create_tenant_files(conf, &location_conf, &tenant_id).await?; - // TODO: tenant directory remains on disk if we bail out from here on. - // See https://github.com/neondatabase/neon/issues/4233 + let created_tenant = tenant_spawn( + conf, + tenant_id, + &tenant_path, + resources, + AttachedTenantConf::try_from(location_conf)?, + None, + &TENANTS, + SpawnMode::Create, + ctx, + )?; + // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. + // See https://github.com/neondatabase/neon/issues/4233 - let tenant_path = conf.tenant_path(&tenant_id); + let created_tenant_id = created_tenant.tenant_id(); + if tenant_id != created_tenant_id { + return Err(TenantMapInsertError::Other(anyhow::anyhow!( + "loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {created_tenant_id})", + ))); + } - let created_tenant = tenant_spawn( - conf, - tenant_id, - &tenant_path, - resources, - AttachedTenantConf::try_from(location_conf)?, - None, - &TENANTS, - SpawnMode::Create, - ctx, - )?; - // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. - // See https://github.com/neondatabase/neon/issues/4233 + tenant_guard.upsert(TenantSlot::Attached(created_tenant.clone()))?; - let crated_tenant_id = created_tenant.tenant_id(); - anyhow::ensure!( - tenant_id == crated_tenant_id, - "loaded created tenant has unexpected tenant id \ - (expect {tenant_id} != actual {crated_tenant_id})", - ); - Ok(created_tenant) - }) - .await + Ok(created_tenant) } #[derive(Debug, thiserror::Error)] @@ -701,7 +716,7 @@ pub(crate) async fn set_new_tenant_config( tenant_id: TenantId, ) -> Result<(), SetNewTenantConfigError> { info!("configuring tenant {tenant_id}"); - let tenant = get_tenant(tenant_id, true).await?; + let tenant = get_tenant(tenant_id, true)?; // This is a legacy API that only operates on attached tenants: the preferred // API to use is the location_config/ endpoint, which lets the caller provide @@ -727,32 +742,49 @@ pub(crate) async fn upsert_location( ) -> Result<(), anyhow::Error> { info!("configuring tenant location {tenant_id} to state {new_location_config:?}"); - let mut existing_tenant = match get_tenant(tenant_id, false).await { - Ok(t) => Some(t), - Err(GetTenantError::NotFound(_)) => None, - Err(e) => anyhow::bail!(e), - }; + // Special case fast-path for updates to Tenant: if our upsert is only updating configuration, + // then we do not need to set the slot to InProgress, we can just call into the + // existng tenant. + { + let locked = TENANTS.read().unwrap(); + let peek_slot = tenant_map_peek_slot(&locked, &tenant_id)?; + match (&new_location_config.mode, peek_slot) { + (LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => { + if attach_conf.generation == tenant.generation { + // A transition from Attached to Attached in the same generation, we may + // take our fast path and just provide the updated configuration + // to the tenant. + tenant.set_new_location_config(AttachedTenantConf::try_from( + new_location_config, + )?); - // If we need to shut down a Tenant, do that first - let shutdown_tenant = match (&new_location_config.mode, &existing_tenant) { - (LocationMode::Secondary(_), Some(t)) => Some(t), - (LocationMode::Attached(attach_conf), Some(t)) => { - if attach_conf.generation != t.generation { - Some(t) - } else { - None + // Persist the new config in the background, to avoid holding up any + // locks while we do so. + // TODO + + return Ok(()); + } else { + // Different generations, fall through to general case + } + } + _ => { + // Not an Attached->Attached transition, fall through to general case } } - _ => None, - }; + } - // TODO: currently we risk concurrent operations interfering with the tenant - // while we await shutdown, but we also should not hold the TenantsMap lock - // across the whole operation. Before we start using this function in production, - // a follow-on change will revise how concurrency is handled in TenantsMap. - // (https://github.com/neondatabase/neon/issues/5378) + // General case for upserts to TenantsMap, excluding the case above: we will substitute an + // InProgress value to the slot while we make whatever changes are required. The state for + // the tenant is inaccessible to the outside world while we are doing this, but that is sensible: + // the state is ill-defined while we're in transition. Transitions are async, but fast: we do + // not do significant I/O, and shutdowns should be prompt via cancellation tokens. + let mut tenant_guard = tenant_map_acquire_slot(&tenant_id, None)?; - if let Some(tenant) = shutdown_tenant { + if let Some(TenantSlot::Attached(tenant)) = tenant_guard.take_value() { + // The case where we keep a Tenant alive was covered above in the special case + // for Attached->Attached transitions in the same generation. By this point, + // if we see an attached tenant we know it will be discarded and should be + // shut down. let (_guard, progress) = utils::completion::channel(); match tenant.get_attach_mode() { @@ -774,82 +806,61 @@ pub(crate) async fn upsert_location( barrier.wait().await; } } - existing_tenant = None; } - if let Some(tenant) = existing_tenant { - // Update the existing tenant - Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config) - .await - .map_err(SetNewTenantConfigError::Persist)?; - tenant.set_new_location_config(AttachedTenantConf::try_from(new_location_config)?); - } else { - // Upsert a fresh TenantSlot into TenantsMap. Do it within the map write lock, - // and re-check that the state of anything we are replacing is as expected. - tenant_map_upsert_slot(tenant_id, |old_value| async move { - if let Some(TenantSlot::Attached(t)) = old_value { - if !matches!(t.current_state(), TenantState::Stopping { .. }) { - anyhow::bail!("Tenant state changed during location configuration update"); - } - } + let tenant_path = conf.tenant_path(&tenant_id); + let new_slot = match &new_location_config.mode { + LocationMode::Secondary(_) => { let tenant_path = conf.tenant_path(&tenant_id); + // Directory doesn't need to be fsync'd because if we crash it can + // safely be recreated next time this tenant location is configured. + unsafe_create_dir_all(&tenant_path) + .await + .with_context(|| format!("Creating {tenant_path}"))?; - let new_slot = match &new_location_config.mode { - LocationMode::Secondary(_) => { - // Directory doesn't need to be fsync'd because if we crash it can - // safely be recreated next time this tenant location is configured. - unsafe_create_dir_all(&tenant_path) - .await - .with_context(|| format!("Creating {tenant_path}"))?; + Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config) + .await + .map_err(SetNewTenantConfigError::Persist)?; - Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config) - .await - .map_err(SetNewTenantConfigError::Persist)?; + TenantSlot::Secondary + } + LocationMode::Attached(_attach_config) => { + let timelines_path = conf.timelines_path(&tenant_id); - TenantSlot::Secondary - } - LocationMode::Attached(_attach_config) => { - // FIXME: should avoid doing this disk I/O inside the TenantsMap lock, - // we have the same problem in load_tenant/attach_tenant. Probably - // need a lock in TenantSlot to fix this. - let timelines_path = conf.timelines_path(&tenant_id); + // Directory doesn't need to be fsync'd because we do not depend on + // it to exist after crashes: it may be recreated when tenant is + // re-attached, see https://github.com/neondatabase/neon/issues/5550 + unsafe_create_dir_all(&timelines_path) + .await + .with_context(|| format!("Creating {timelines_path}"))?; - // Directory doesn't need to be fsync'd because we do not depend on - // it to exist after crashes: it may be recreated when tenant is - // re-attached, see https://github.com/neondatabase/neon/issues/5550 - unsafe_create_dir_all(&timelines_path) - .await - .with_context(|| format!("Creating {timelines_path}"))?; + Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config) + .await + .map_err(SetNewTenantConfigError::Persist)?; - Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config) - .await - .map_err(SetNewTenantConfigError::Persist)?; + let tenant = tenant_spawn( + conf, + tenant_id, + &tenant_path, + TenantSharedResources { + broker_client, + remote_storage, + deletion_queue_client, + }, + AttachedTenantConf::try_from(new_location_config)?, + None, + &TENANTS, + SpawnMode::Normal, + ctx, + )?; - let tenant = tenant_spawn( - conf, - tenant_id, - &tenant_path, - TenantSharedResources { - broker_client, - remote_storage, - deletion_queue_client, - }, - AttachedTenantConf::try_from(new_location_config)?, - None, - &TENANTS, - SpawnMode::Normal, - ctx, - )?; + TenantSlot::Attached(tenant) + } + }; - TenantSlot::Attached(tenant) - } - }; + tenant_guard.upsert(new_slot)?; - Ok(new_slot) - }) - .await?; - } Ok(()) } @@ -870,11 +881,11 @@ pub(crate) enum GetTenantError { /// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants. /// /// This method is cancel-safe. -pub(crate) async fn get_tenant( +pub(crate) fn get_tenant( tenant_id: TenantId, active_only: bool, ) -> Result, GetTenantError> { - let m = TENANTS.read().await; + let m = TENANTS.read().unwrap(); let tenant = m .get(&tenant_id) .ok_or(GetTenantError::NotFound(tenant_id))?; @@ -900,7 +911,35 @@ pub(crate) async fn delete_tenant( remote_storage: Option, tenant_id: TenantId, ) -> Result<(), DeleteTenantError> { - DeleteTenantFlow::run(conf, remote_storage, &TENANTS, tenant_id).await + // We acquire a SlotGuard during this function to protect against concurrent + // changes while the ::prepare phase of DeleteTenantFlow executes, but then + // have to return the Tenant to the map while the background deletion runs. + // + // TODO: refactor deletion to happen outside the lifetime of a Tenant. + // Currently, deletion requires a reference to the tenants map in order to + // keep the Tenant in the map until deletion is complete, and then remove + // it at the end. + // + // See https://github.com/neondatabase/neon/issues/5080 + + let mut slot_guard = tenant_map_acquire_slot(&tenant_id, Some(true))?; + + // unwrap is safe because we used expect_exist=true when acquiring the slot + let slot = slot_guard.take_value().unwrap(); + + let tenant = match &slot { + TenantSlot::Attached(tenant) => tenant.clone(), + _ => { + // Express "not attached" as equivalent to "not found" + return Err(DeleteTenantError::NotAttached); + } + }; + + let result = DeleteTenantFlow::run(conf, remote_storage, &TENANTS, tenant).await; + + // Replace our InProgress marker with the Tenant in attached state, after the prepare phase of deletion is done + slot_guard.upsert(slot)?; + result } #[derive(Debug, thiserror::Error)] @@ -917,18 +956,20 @@ pub(crate) async fn delete_timeline( timeline_id: TimelineId, _ctx: &RequestContext, ) -> Result<(), DeleteTimelineError> { - let tenant = get_tenant(tenant_id, true).await?; + let tenant = get_tenant(tenant_id, true)?; DeleteTimelineFlow::run(&tenant, timeline_id, false).await?; Ok(()) } #[derive(Debug, thiserror::Error)] pub(crate) enum TenantStateError { - #[error("Tenant {0} not found")] - NotFound(TenantId), #[error("Tenant {0} is stopping")] IsStopping(TenantId), #[error(transparent)] + SlotError(#[from] TenantSlotError), + #[error(transparent)] + SlotUpsertError(#[from] TenantSlotUpsertError), + #[error(transparent)] Other(#[from] anyhow::Error), } @@ -967,7 +1008,7 @@ pub(crate) async fn detach_tenant( async fn detach_tenant0( conf: &'static PageServerConf, - tenants: &tokio::sync::RwLock, + tenants: &std::sync::RwLock, tenant_id: TenantId, detach_ignored: bool, deletion_queue_client: &DeletionQueueClient, @@ -988,7 +1029,12 @@ async fn detach_tenant0( // Ignored tenants are not present in memory and will bail the removal from memory operation. // Before returning the error, check for ignored tenant removal case — we only need to clean its local files then. - if detach_ignored && matches!(removal_result, Err(TenantStateError::NotFound(_))) { + if detach_ignored + && matches!( + removal_result, + Err(TenantStateError::SlotError(TenantSlotError::NotFound(_))) + ) + { let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id); if tenant_ignore_mark.exists() { info!("Detaching an ignored tenant"); @@ -1011,31 +1057,44 @@ pub(crate) async fn load_tenant( deletion_queue_client: DeletionQueueClient, ctx: &RequestContext, ) -> Result<(), TenantMapInsertError> { - tenant_map_insert(tenant_id, || async { - let tenant_path = conf.tenant_path(&tenant_id); - let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id); - if tenant_ignore_mark.exists() { - std::fs::remove_file(&tenant_ignore_mark) - .with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?; - } + let tenant_guard = tenant_map_acquire_slot(&tenant_id, Some(false))?; + let tenant_path = conf.tenant_path(&tenant_id); - let resources = TenantSharedResources { - broker_client, - remote_storage, - deletion_queue_client - }; + let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id); + if tenant_ignore_mark.exists() { + std::fs::remove_file(&tenant_ignore_mark).with_context(|| { + format!( + "Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading" + ) + })?; + } - let mut location_conf = Tenant::load_tenant_config(conf, &tenant_id).map_err( TenantMapInsertError::Other)?; - location_conf.attach_in_generation(generation); - Tenant::persist_tenant_config(conf, &tenant_id, &location_conf).await?; + let resources = TenantSharedResources { + broker_client, + remote_storage, + deletion_queue_client, + }; - let new_tenant = tenant_spawn(conf, tenant_id, &tenant_path, resources, AttachedTenantConf::try_from(location_conf)?, None, &TENANTS, SpawnMode::Normal, ctx) - .with_context(|| { - format!("Failed to schedule tenant processing in path {tenant_path:?}") - })?; + let mut location_conf = + Tenant::load_tenant_config(conf, &tenant_id).map_err(TenantMapInsertError::Other)?; + location_conf.attach_in_generation(generation); - Ok(new_tenant) - }).await?; + Tenant::persist_tenant_config(conf, &tenant_id, &location_conf).await?; + + let new_tenant = tenant_spawn( + conf, + tenant_id, + &tenant_path, + resources, + AttachedTenantConf::try_from(location_conf)?, + None, + &TENANTS, + SpawnMode::Normal, + ctx, + ) + .with_context(|| format!("Failed to schedule tenant processing in path {tenant_path:?}"))?; + + tenant_guard.upsert(TenantSlot::Attached(new_tenant))?; Ok(()) } @@ -1048,7 +1107,7 @@ pub(crate) async fn ignore_tenant( async fn ignore_tenant0( conf: &'static PageServerConf, - tenants: &tokio::sync::RwLock, + tenants: &std::sync::RwLock, tenant_id: TenantId, ) -> Result<(), TenantStateError> { remove_tenant_from_memory(tenants, tenant_id, async { @@ -1076,7 +1135,7 @@ pub(crate) enum TenantMapListError { /// Get list of tenants, for the mgmt API /// pub(crate) async fn list_tenants() -> Result, TenantMapListError> { - let tenants = TENANTS.read().await; + let tenants = TENANTS.read().unwrap(); let m = match &*tenants { TenantsMap::Initializing => return Err(TenantMapListError::Initializing), TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m, @@ -1085,6 +1144,7 @@ pub(crate) async fn list_tenants() -> Result, Tenan .filter_map(|(id, tenant)| match tenant { TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())), TenantSlot::Secondary => None, + TenantSlot::InProgress(_) => None, }) .collect()) } @@ -1101,101 +1161,302 @@ pub(crate) async fn attach_tenant( resources: TenantSharedResources, ctx: &RequestContext, ) -> Result<(), TenantMapInsertError> { - tenant_map_insert(tenant_id, || async { - let location_conf = LocationConf::attached_single(tenant_conf, generation); - let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_id).await?; - // TODO: tenant directory remains on disk if we bail out from here on. - // See https://github.com/neondatabase/neon/issues/4233 + let tenant_guard = tenant_map_acquire_slot(&tenant_id, Some(false))?; + let location_conf = LocationConf::attached_single(tenant_conf, generation); + let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_id).await?; + // TODO: tenant directory remains on disk if we bail out from here on. + // See https://github.com/neondatabase/neon/issues/4233 - let attached_tenant = tenant_spawn(conf, tenant_id, &tenant_dir, - resources, AttachedTenantConf::try_from(location_conf)?, None, &TENANTS, SpawnMode::Normal, ctx)?; - // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. - // See https://github.com/neondatabase/neon/issues/4233 + let attached_tenant = tenant_spawn( + conf, + tenant_id, + &tenant_dir, + resources, + AttachedTenantConf::try_from(location_conf)?, + None, + &TENANTS, + SpawnMode::Normal, + ctx, + )?; + // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. + // See https://github.com/neondatabase/neon/issues/4233 - let attached_tenant_id = attached_tenant.tenant_id(); - anyhow::ensure!( - tenant_id == attached_tenant_id, + let attached_tenant_id = attached_tenant.tenant_id(); + if tenant_id != attached_tenant_id { + return Err(TenantMapInsertError::Other(anyhow::anyhow!( "loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {attached_tenant_id})", - ); - Ok(attached_tenant) - }) - .await?; + ))); + } + + tenant_guard.upsert(TenantSlot::Attached(attached_tenant))?; Ok(()) } #[derive(Debug, thiserror::Error)] pub(crate) enum TenantMapInsertError { - #[error("tenant map is still initializing")] - StillInitializing, - #[error("tenant map is shutting down")] - ShuttingDown, - #[error("tenant {0} already exists, state: {1:?}")] - TenantAlreadyExists(TenantId, TenantState), - #[error("tenant {0} already exists in secondary state")] - TenantExistsSecondary(TenantId), + #[error(transparent)] + SlotError(#[from] TenantSlotError), + #[error(transparent)] + SlotUpsertError(#[from] TenantSlotUpsertError), #[error(transparent)] Other(#[from] anyhow::Error), } -/// Give the given closure access to the tenants map entry for the given `tenant_id`, iff that -/// entry is vacant. The closure is responsible for creating the tenant object and inserting -/// it into the tenants map through the vacnt entry that it receives as argument. -/// -/// NB: the closure should return quickly because the current implementation of tenants map -/// serializes access through an `RwLock`. -async fn tenant_map_insert( +/// Superset of TenantMapError: issues that can occur when acquiring a slot +/// for a particular tenant ID. +#[derive(Debug, thiserror::Error)] +pub enum TenantSlotError { + /// When acquiring a slot with the expectation that the tenant already exists. + #[error("Tenant {0} not found")] + NotFound(TenantId), + + /// When acquiring a slot with the expectation that the tenant does not already exist. + #[error("tenant {0} already exists, state: {1:?}")] + AlreadyExists(TenantId, TenantState), + + #[error("tenant {0} already exists in but is not attached")] + Conflict(TenantId), + + // Tried to read a slot that is currently being mutated by another administrative + // operation. + #[error("tenant has a state change in progress, try again later")] + InProgress, + + #[error(transparent)] + MapState(#[from] TenantMapError), +} + +/// Superset of TenantMapError: issues that can occur when using a SlotGuard +/// to insert a new value. +#[derive(Debug, thiserror::Error)] +pub enum TenantSlotUpsertError { + /// An error where the slot is in an unexpected state, indicating a code bug + #[error("Internal error updating Tenant")] + InternalError(Cow<'static, str>), + + #[error(transparent)] + MapState(#[from] TenantMapError), +} + +/// Errors that can happen any time we are walking the tenant map to try and acquire +/// the TenantSlot for a particular tenant. +#[derive(Debug, thiserror::Error)] +pub enum TenantMapError { + // Tried to read while initializing + #[error("tenant map is still initializing")] + StillInitializing, + + // Tried to read while shutting down + #[error("tenant map is shutting down")] + ShuttingDown, +} + +/// Guards a particular tenant_id's content in the TenantsMap. While this +/// structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`] +/// for this tenant, which acts as a marker for any operations targeting +/// this tenant to retry later, or wait for the InProgress state to end. +pub struct SlotGuard { tenant_id: TenantId, - insert_fn: F, -) -> Result, TenantMapInsertError> -where - F: FnOnce() -> R, - R: std::future::Future>>, -{ - let mut guard = TENANTS.write().await; - let m = match &mut *guard { - TenantsMap::Initializing => return Err(TenantMapInsertError::StillInitializing), - TenantsMap::ShuttingDown(_) => return Err(TenantMapInsertError::ShuttingDown), - TenantsMap::Open(m) => m, - }; - match m.entry(tenant_id) { - hash_map::Entry::Occupied(e) => match e.get() { - TenantSlot::Attached(t) => Err(TenantMapInsertError::TenantAlreadyExists( - tenant_id, - t.current_state(), - )), - TenantSlot::Secondary => Err(TenantMapInsertError::TenantExistsSecondary(tenant_id)), - }, - hash_map::Entry::Vacant(v) => match insert_fn().await { - Ok(tenant) => { - v.insert(TenantSlot::Attached(tenant.clone())); - Ok(tenant) + old_value: Option, + upserted: bool, + + /// [`TenantSlot::InProgress`] carries the corresponding Barrier: it will + /// release any waiters as soon as this SlotGuard is dropped. + _completion: utils::completion::Completion, +} + +unsafe impl Send for SlotGuard {} +unsafe impl Sync for SlotGuard {} + +impl SlotGuard { + fn new( + tenant_id: TenantId, + old_value: Option, + completion: utils::completion::Completion, + ) -> Self { + Self { + tenant_id, + old_value, + upserted: false, + _completion: completion, + } + } + + /// Take any value that was present in the slot before we acquired ownership + /// of it: in state transitions, this will be the old state. + fn take_value(&mut self) -> Option { + self.old_value.take() + } + + /// Emplace a new value in the slot. This consumes the guard, and after + /// returning, the slot is no longer protected from concurrent changes. + fn upsert(mut self, new_value: TenantSlot) -> Result<(), TenantSlotUpsertError> { + let mut locked = TENANTS.write().unwrap(); + + if let TenantSlot::InProgress(_) = new_value { + // It is never expected to try and upsert InProgress via this path: it should + // only be written via the tenant_map_acquire_slot path. If we hit this it's a bug. + return Err(TenantSlotUpsertError::InternalError( + "Attempt to upsert an InProgress state".into(), + )); + } + + let m = match &mut *locked { + TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()), + TenantsMap::ShuttingDown(_) => { + return Err(TenantMapError::ShuttingDown.into()); } - Err(e) => Err(TenantMapInsertError::Other(e)), - }, + TenantsMap::Open(m) => m, + }; + + let replaced = m.insert(self.tenant_id, new_value); + self.upserted = true; + + // Sanity check: on an upsert we should always be replacing an InProgress marker + match replaced { + Some(TenantSlot::InProgress(_)) => { + // Expected case: we find our InProgress in the map: nothing should have + // replaced it because the code that acquires slots will not grant another + // one for the same TenantId. + Ok(()) + } + None => { + error!( + tenant_id = %self.tenant_id, + "Missing InProgress marker during tenant upsert, this is a bug." + ); + Err(TenantSlotUpsertError::InternalError( + "Missing InProgress marker during tenant upsert".into(), + )) + } + Some(slot) => { + error!(tenant_id=%self.tenant_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot); + Err(TenantSlotUpsertError::InternalError( + "Unexpected contents of TenantSlot".into(), + )) + } + } } } -async fn tenant_map_upsert_slot<'a, F, R>( - tenant_id: TenantId, - upsert_fn: F, -) -> Result<(), TenantMapInsertError> -where - F: FnOnce(Option) -> R, - R: std::future::Future>, -{ - let mut guard = TENANTS.write().await; - let m = match &mut *guard { - TenantsMap::Initializing => return Err(TenantMapInsertError::StillInitializing), - TenantsMap::ShuttingDown(_) => return Err(TenantMapInsertError::ShuttingDown), +impl Drop for SlotGuard { + fn drop(&mut self) { + if !self.upserted { + let mut locked = TENANTS.write().unwrap(); + + let m = match &mut *locked { + TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => { + return; + } + TenantsMap::Open(m) => m, + }; + + let slot = m.remove(&self.tenant_id); + match slot { + Some(slot) => match slot { + TenantSlot::InProgress(_) => { + // Normal case: nothing should have replaced the TenantSlot value + // that was set when we were constructed. + } + _ => { + error!(tenant_id=%self.tenant_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot); + } + }, + None => { + error!( + tenant_id = %self.tenant_id, + "Missing InProgress marker during SlotGuard drop, this is a bug." + ); + } + } + } + } +} + +fn tenant_map_peek_slot<'a>( + tenants: &'a std::sync::RwLockReadGuard<'a, TenantsMap>, + tenant_id: &TenantId, +) -> Result, TenantMapError> { + let m = match tenants.deref() { + TenantsMap::Initializing => return Err(TenantMapError::StillInitializing), + TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown), TenantsMap::Open(m) => m, }; - match upsert_fn(m.remove(&tenant_id)).await { - Ok(upsert_val) => { - m.insert(tenant_id, upsert_val); - Ok(()) + Ok(m.get(tenant_id)) +} + +fn tenant_map_acquire_slot( + tenant_id: &TenantId, + expect_exist: Option, +) -> Result { + tenant_map_acquire_slot_impl(tenant_id, &TENANTS, expect_exist) +} + +fn tenant_map_acquire_slot_impl( + tenant_id: &TenantId, + tenants: &std::sync::RwLock, + expect_exist: Option, +) -> Result { + let mut locked = tenants.write().unwrap(); + let span = tracing::info_span!("acquire_slot", %tenant_id); + let _guard = span.enter(); + + let m = match &mut *locked { + TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()), + TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown.into()), + TenantsMap::Open(m) => m, + }; + + use std::collections::hash_map::Entry; + let entry = m.entry(*tenant_id); + match entry { + Entry::Vacant(v) => { + if let Some(true) = expect_exist { + tracing::debug!("Vacant & expect_exist: return NotFound"); + return Err(TenantSlotError::NotFound(*tenant_id)); + } + + let (completion, barrier) = utils::completion::channel(); + v.insert(TenantSlot::InProgress(barrier)); + tracing::debug!("Vacant, inserted InProgress"); + Ok(SlotGuard::new(*tenant_id, None, completion)) + } + Entry::Occupied(mut o) => { + match (o.get(), expect_exist) { + (TenantSlot::InProgress(_), _) => { + tracing::debug!("Occupied, failing for InProgress"); + return Err(TenantSlotError::InProgress); + } + (slot, Some(false)) => match slot { + TenantSlot::Attached(tenant) => { + tracing::debug!("Attached & !expected_exist, return AlreadyExists"); + return Err(TenantSlotError::AlreadyExists( + *tenant_id, + tenant.current_state(), + )); + } + _ => { + // FIXME: the AlreadyExists error assumes that we have a Tenant + // to get the state from + tracing::debug!("Occupied & !expected_exist, return AlreadyExists"); + return Err(TenantSlotError::AlreadyExists( + *tenant_id, + TenantState::Broken { + reason: "Present but not attached".to_string(), + backtrace: "".to_string(), + }, + )); + } + }, + _ => {} + }; + + let (completion, barrier) = utils::completion::channel(); + let old_value = o.insert(TenantSlot::InProgress(barrier)); + tracing::debug!("Occupied, replaced with InProgress"); + Ok(SlotGuard::new(*tenant_id, Some(old_value), completion)) } - Err(e) => Err(TenantMapInsertError::Other(e)), } } @@ -1204,7 +1465,7 @@ where /// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal /// operation would be needed to remove it. async fn remove_tenant_from_memory( - tenants: &tokio::sync::RwLock, + tenants: &std::sync::RwLock, tenant_id: TenantId, tenant_cleanup: F, ) -> Result @@ -1213,20 +1474,14 @@ where { use utils::completion; - // It's important to keep the tenant in memory after the final cleanup, to avoid cleanup races. - // The exclusive lock here ensures we don't miss the tenant state updates before trying another removal. - // tenant-wde cleanup operations may take some time (removing the entire tenant directory), we want to - // avoid holding the lock for the entire process. - let tenant = { - match tenants - .write() - .await - .get_slot(&tenant_id) - .ok_or(TenantStateError::NotFound(tenant_id))? - { - TenantSlot::Attached(t) => Some(t.clone()), - TenantSlot::Secondary => None, - } + let mut tenant_guard = tenant_map_acquire_slot_impl(&tenant_id, tenants, Some(true))?; + let tenant_slot = tenant_guard.take_value(); + + // The SlotGuard allows us to manipulate the Tenant object without fear of some + // concurrent API request doing something else for the same tenant ID. + let attached_tenant = match tenant_slot { + Some(TenantSlot::Attached(t)) => Some(t), + _ => None, }; // allow pageserver shutdown to await for our completion @@ -1234,7 +1489,7 @@ where // If the tenant was attached, shut it down gracefully. For secondary // locations this part is not necessary - match tenant { + match &attached_tenant { Some(attached_tenant) => { // whenever we remove a tenant from memory, we don't want to flush and wait for upload let freeze_and_flush = false; @@ -1259,24 +1514,14 @@ where .await .with_context(|| format!("Failed to run cleanup for tenant {tenant_id}")) { - Ok(hook_value) => { - let mut tenants_accessor = tenants.write().await; - if tenants_accessor.remove(&tenant_id).is_none() { - warn!("Tenant {tenant_id} got removed from memory before operation finished"); - } - Ok(hook_value) - } + Ok(hook_value) => Ok(hook_value), Err(e) => { - let tenants_accessor = tenants.read().await; - match tenants_accessor.get(&tenant_id) { - Some(tenant) => { - tenant.set_broken(e.to_string()).await; - } - None => { - warn!("Tenant {tenant_id} got removed from memory"); - return Err(TenantStateError::NotFound(tenant_id)); - } + // If we had a Tenant, set it to Broken and put it back in the TenantsMap + if let Some(attached_tenant) = attached_tenant { + attached_tenant.set_broken(e.to_string()).await; + tenant_guard.upsert(TenantSlot::Attached(attached_tenant))?; } + Err(TenantStateError::Other(e)) } } @@ -1293,7 +1538,7 @@ pub(crate) async fn immediate_gc( gc_req: TimelineGcRequest, ctx: &RequestContext, ) -> Result>, ApiError> { - let guard = TENANTS.read().await; + let guard = TENANTS.read().unwrap(); let tenant = guard .get(&tenant_id) .map(Arc::clone) @@ -1346,14 +1591,12 @@ mod tests { use super::{super::harness::TenantHarness, TenantsMap}; - #[tokio::test(start_paused = true)] - async fn shutdown_joins_remove_tenant_from_memory() { - // the test is a bit ugly with the lockstep together with spawned tasks. the aim is to make - // sure `shutdown_all_tenants0` per-tenant processing joins in any active - // remove_tenant_from_memory calls, which is enforced by making the operation last until - // we've ran `shutdown_all_tenants0` for a long time. + #[tokio::test] + async fn shutdown_awaits_in_progress_tenant() { + // Test that if an InProgress tenant is in the map during shutdown, the shutdown will gracefully + // wait for it to complete before proceeding. - let (t, _ctx) = TenantHarness::create("shutdown_joins_detach") + let (t, _ctx) = TenantHarness::create("shutdown_awaits_in_progress_tenant") .unwrap() .load() .await; @@ -1366,13 +1609,14 @@ mod tests { let _e = info_span!("testing", tenant_id = %id).entered(); let tenants = HashMap::from([(id, TenantSlot::Attached(t.clone()))]); - let tenants = Arc::new(tokio::sync::RwLock::new(TenantsMap::Open(tenants))); + let tenants = Arc::new(std::sync::RwLock::new(TenantsMap::Open(tenants))); + + // Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually + // permit it to proceed: that will stick the tenant in InProgress let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel(); let (until_cleanup_started, cleanup_started) = utils::completion::channel(); - - // start a "detaching operation", which will take a while, until can_complete_cleanup - let cleanup_task = { + let mut remove_tenant_from_memory_task = { let jh = tokio::spawn({ let tenants = tenants.clone(); async move { @@ -1391,12 +1635,6 @@ mod tests { jh }; - let mut cleanup_progress = std::pin::pin!(t - .shutdown(utils::completion::Barrier::default(), false) - .await - .unwrap_err() - .wait()); - let mut shutdown_task = { let (until_shutdown_started, shutdown_started) = utils::completion::channel(); @@ -1409,37 +1647,17 @@ mod tests { shutdown_task }; - // if the joining in is removed from shutdown_all_tenants0, the shutdown_task should always - // get to complete within timeout and fail the test. it is expected to continue awaiting - // until completion or SIGKILL during normal shutdown. - // - // the timeout is long to cover anything that shutdown_task could be doing, but it is - // handled instantly because we use tokio's time pausing in this test. 100s is much more than - // what we get from systemd on shutdown (10s). - let long_time = std::time::Duration::from_secs(100); + let long_time = std::time::Duration::from_secs(15); tokio::select! { - _ = &mut shutdown_task => unreachable!("shutdown must continue, until_cleanup_completed is not dropped"), - _ = &mut cleanup_progress => unreachable!("cleanup progress must continue, until_cleanup_completed is not dropped"), + _ = &mut shutdown_task => unreachable!("shutdown should block on remove_tenant_from_memory completing"), + _ = &mut remove_tenant_from_memory_task => unreachable!("remove_tenant_from_memory_task should not complete until explicitly unblocked"), _ = tokio::time::sleep(long_time) => {}, } - // allow the remove_tenant_from_memory and thus eventually the shutdown to continue drop(until_cleanup_completed); - let (je, ()) = tokio::join!(shutdown_task, cleanup_progress); - je.expect("Tenant::shutdown shutdown not have panicked"); - cleanup_task - .await - .expect("no panicking") - .expect("remove_tenant_from_memory failed"); - - futures::future::poll_immediate( - t.shutdown(utils::completion::Barrier::default(), false) - .await - .unwrap_err() - .wait(), - ) - .await - .expect("the stopping progress must still be complete"); + // Now that we allow it to proceed, shutdown should complete immediately + remove_tenant_from_memory_task.await.unwrap().unwrap(); + shutdown_task.await.unwrap(); } } diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index dc5c71bbe1..659bef1580 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -344,20 +344,7 @@ impl Timeline { // Make one of the tenant's timelines draw the short straw and run the calculation. // The others wait until the calculation is done so that they take into account the // imitated accesses that the winner made. - // - // It is critical we are responsive to cancellation here. Otherwise, we deadlock with - // tenant deletion (holds TENANTS in read mode) any other task that attempts to - // acquire TENANTS in write mode before we here call get_tenant. - // See https://github.com/neondatabase/neon/issues/5284. - let res = tokio::select! { - _ = cancel.cancelled() => { - return ControlFlow::Break(()); - } - res = crate::tenant::mgr::get_tenant(self.tenant_id, true) => { - res - } - }; - let tenant = match res { + let tenant = match crate::tenant::mgr::get_tenant(self.tenant_id, true) { Ok(t) => t, Err(_) => { return ControlFlow::Break(());