From 373f25bd90a5dbb86601d0484c2e73375a2900f8 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 22 May 2023 17:27:28 +0200 Subject: [PATCH] add API to wait for a tenant to appear in the tenant map I developed this before I pivoted to https://github.com/neondatabase/neon/pull/4299 for the problem I had at hand. Yet, it might be useful to have an API to wait for a tenant to appear. Includes a demo use case inside the tenant tasks. --- pageserver/src/tenant/mgr.rs | 184 +++++++++++++++++++++++++++------ pageserver/src/tenant/tasks.rs | 83 +++------------ 2 files changed, 169 insertions(+), 98 deletions(-) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 1542d34a66..9b5bc3c7bf 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -32,23 +32,46 @@ enum TenantsMap { Initializing, /// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded. /// New tenants can be added using [`tenant_map_insert`]. - Open(HashMap>), + Open(HashMap), /// The pageserver has entered shutdown mode via [`shutdown_all_tenants`]. /// Existing tenants are still accessible, but no new tenants can be created. ShuttingDown(HashMap>), } +enum OpenTenantsMapEntry { + InterestOnly(tokio::sync::broadcast::Sender>), + Available(Arc), +} + impl TenantsMap { + /// Return a ref to a tenant if it exists in the map. + /// If there's just interest in a tenant, but the tenant does not exist, + /// this function returns `None`. fn get(&self, tenant_id: &TenantId) -> Option<&Arc> { match self { TenantsMap::Initializing => None, - TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.get(tenant_id), + TenantsMap::Open(m) => match m.get(tenant_id) { + Some(tme) => match tme { + OpenTenantsMapEntry::InterestOnly(_) => None, + OpenTenantsMapEntry::Available(t) => Some(t), + }, + None => None, + }, + TenantsMap::ShuttingDown(m) => match m.get(tenant_id) { + Some(t) => Some(t), + None => None, + }, } } fn remove(&mut self, tenant_id: &TenantId) -> Option> { match self { TenantsMap::Initializing => None, - TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id), + TenantsMap::Open(m) => match m.remove(tenant_id) { + None => None, + Some(OpenTenantsMapEntry::InterestOnly(_)) => None, + Some(OpenTenantsMapEntry::Available(tenant)) => Some(tenant), + }, + TenantsMap::ShuttingDown(m) => m.remove(tenant_id), } } } @@ -120,7 +143,8 @@ pub async fn init_tenant_mgr( &ctx, ) { Ok(tenant) => { - tenants.insert(tenant.tenant_id(), tenant); + tenants + .insert(tenant.tenant_id(), OpenTenantsMapEntry::Available(tenant)); } Err(e) => { error!("Failed to collect tenant files from dir {tenants_dir:?} for entry {dir_entry:?}, reason: {e:#}"); @@ -230,8 +254,18 @@ pub async fn shutdown_all_tenants() { return; } TenantsMap::Open(tenants) => { + let tenants: HashMap> = std::mem::take(tenants) + .into_iter() + .filter_map(|(tenant_id, tme)| { + match tme { + // don't give the waiters any more hope + OpenTenantsMapEntry::InterestOnly(_) => None, + OpenTenantsMapEntry::Available(tenant) => Some((tenant_id, tenant)), + } + }) + .collect(); let tenants_clone = tenants.clone(); - *m = TenantsMap::ShuttingDown(std::mem::take(tenants)); + *m = TenantsMap::ShuttingDown(tenants); tenants_clone } TenantsMap::ShuttingDown(_) => { @@ -278,7 +312,7 @@ pub async fn create_tenant( remote_storage: Option, ctx: &RequestContext, ) -> Result, TenantMapInsertError> { - tenant_map_insert(tenant_id, |vacant_entry| { + tenant_map_insert(tenant_id, || { // We're holding the tenants lock in write mode while doing local IO. // If this section ever becomes contentious, introduce a new `TenantState::Creating` // and do the work in that state. @@ -296,7 +330,6 @@ pub async fn create_tenant( tenant_id == crated_tenant_id, "loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {crated_tenant_id})", ); - vacant_entry.insert(Arc::clone(&created_tenant)); Ok(created_tenant) }).await } @@ -337,6 +370,71 @@ pub async fn get_tenant( } } +#[derive(Debug, thiserror::Error)] +pub(crate) enum WaitForActiveTenantError { + #[error("Tenant map is still initializing")] + Initializing, + #[error("Tenant map is shutting down")] + ShuttingDown, + #[error(transparent)] + Tenant(anyhow::Error), +} + +/// Cancellation-Safety: This function is cancellation-safe. +pub(crate) async fn wait_for_active_tenant( + tenant_id: TenantId, +) -> Result, WaitForActiveTenantError> { + let tenant = loop { + let r_guard = TENANTS.read().await; + let m = match &*r_guard { + TenantsMap::Initializing => return Err(WaitForActiveTenantError::Initializing), + TenantsMap::ShuttingDown(_) => return Err(WaitForActiveTenantError::ShuttingDown), + TenantsMap::Open(m) => m, + }; + if let Some(tme) = m.get(&tenant_id) { + match tme { + OpenTenantsMapEntry::InterestOnly(tx) => { + let mut rx = tx.subscribe(); + drop(r_guard); + break rx + .recv() + .await + .map_err(|_| WaitForActiveTenantError::ShuttingDown)?; + } + OpenTenantsMapEntry::Available(tenant) => break Arc::clone(tenant), + } + } + // TODO: would be nice to have upgradeable rwlock guard here + drop(r_guard); + let mut w_guard = TENANTS.write().await; + let m = match &mut *w_guard { + TenantsMap::Initializing => return Err(WaitForActiveTenantError::Initializing), + TenantsMap::ShuttingDown(_) => return Err(WaitForActiveTenantError::ShuttingDown), + TenantsMap::Open(m) => m, + }; + match m.entry(tenant_id) { + hash_map::Entry::Occupied(_e) => { + // the world changed while we weren't holding the lock + continue; + } + hash_map::Entry::Vacant(e) => { + let (tx, mut rx) = tokio::sync::broadcast::channel(1); + e.insert(OpenTenantsMapEntry::InterestOnly(tx)); + drop(w_guard); + break rx + .recv() + .await + .map_err(|_| WaitForActiveTenantError::ShuttingDown)?; + } + } + }; + tenant + .wait_to_become_active() + .await + .map_err(WaitForActiveTenantError::Tenant)?; + Ok(tenant) +} + #[derive(Debug, thiserror::Error)] pub enum DeleteTimelineError { #[error("Tenant {0}")] @@ -408,7 +506,7 @@ pub async fn load_tenant( remote_storage: Option, ctx: &RequestContext, ) -> Result<(), TenantMapInsertError> { - tenant_map_insert(tenant_id, |vacant_entry| { + tenant_map_insert(tenant_id, || { let tenant_path = conf.tenant_path(&tenant_id); let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(tenant_id); if tenant_ignore_mark.exists() { @@ -421,9 +519,9 @@ pub async fn load_tenant( format!("Failed to schedule tenant processing in path {tenant_path:?}") })?; - vacant_entry.insert(new_tenant); - Ok(()) - }).await + Ok(new_tenant) + }).await?; + Ok(()) } pub async fn ignore_tenant( @@ -456,13 +554,20 @@ pub enum TenantMapListError { /// pub async fn list_tenants() -> Result, TenantMapListError> { let tenants = TENANTS.read().await; - let m = match &*tenants { - TenantsMap::Initializing => return Err(TenantMapListError::Initializing), - TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m, - }; - Ok(m.iter() - .map(|(id, tenant)| (*id, tenant.current_state())) - .collect()) + match &*tenants { + TenantsMap::Initializing => Err(TenantMapListError::Initializing), + TenantsMap::Open(m) => Ok(m + .iter() + .filter_map(|(id, tme)| match tme { + OpenTenantsMapEntry::InterestOnly(_) => None, + OpenTenantsMapEntry::Available(t) => Some((*id, t.current_state())), + }) + .collect()), + TenantsMap::ShuttingDown(m) => Ok(m + .iter() + .map(|(id, tenant)| (*id, tenant.current_state())) + .collect()), + } } /// Execute Attach mgmt API command. @@ -476,7 +581,7 @@ pub async fn attach_tenant( remote_storage: GenericRemoteStorage, ctx: &RequestContext, ) -> Result<(), TenantMapInsertError> { - tenant_map_insert(tenant_id, |vacant_entry| { + tenant_map_insert(tenant_id, || { let tenant_dir = create_tenant_files(conf, tenant_conf, tenant_id, CreateTenantFilesMode::Attach)?; // TODO: tenant directory remains on disk if we bail out from here on. // See https://github.com/neondatabase/neon/issues/4233 @@ -497,10 +602,10 @@ pub async fn attach_tenant( tenant_id == attached_tenant_id, "loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {attached_tenant_id})", ); - vacant_entry.insert(Arc::clone(&attached_tenant)); - Ok(()) + Ok(attached_tenant) }) - .await + .await?; + Ok(()) } #[derive(Debug, thiserror::Error)] @@ -521,12 +626,12 @@ pub enum TenantMapInsertError { /// /// NB: the closure should return quickly because the current implementation of tenants map /// serializes access through an `RwLock`. -async fn tenant_map_insert( +async fn tenant_map_insert( tenant_id: TenantId, insert_fn: F, -) -> Result +) -> Result, TenantMapInsertError> where - F: FnOnce(hash_map::VacantEntry>) -> anyhow::Result, + F: FnOnce() -> anyhow::Result>, { let mut guard = TENANTS.write().await; let m = match &mut *guard { @@ -535,14 +640,29 @@ where TenantsMap::Open(m) => m, }; match m.entry(tenant_id) { - hash_map::Entry::Occupied(e) => Err(TenantMapInsertError::TenantAlreadyExists( - tenant_id, - e.get().current_state(), - )), - hash_map::Entry::Vacant(v) => match insert_fn(v) { - Ok(v) => Ok(v), - Err(e) => Err(TenantMapInsertError::Closure(e)), + hash_map::Entry::Occupied(mut e) => match e.get() { + OpenTenantsMapEntry::Available(tenant) => Err( + TenantMapInsertError::TenantAlreadyExists(tenant_id, tenant.current_state()), + ), + OpenTenantsMapEntry::InterestOnly(v) => { + let tenant = match insert_fn() { + Ok(v) => v, + Err(e) => return Err(TenantMapInsertError::Closure(e)), + }; + // ignore if the interest has gone away + let _ = v.send(Arc::clone(&tenant)); + e.insert(OpenTenantsMapEntry::Available(Arc::clone(&tenant))); + Ok(tenant) + } }, + hash_map::Entry::Vacant(v) => { + let tenant = match insert_fn() { + Ok(v) => v, + Err(e) => return Err(TenantMapInsertError::Closure(e)), + }; + v.insert(OpenTenantsMapEntry::Available(Arc::clone(&tenant))); + Ok(tenant) + } } } diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 6bf26f1da1..ac3f74ed2e 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -1,8 +1,6 @@ //! This module contains functions to serve per-tenant background processes, //! such as compaction and GC -use std::ops::ControlFlow; -use std::sync::Arc; use std::time::{Duration, Instant}; use crate::context::{DownloadBehavior, RequestContext}; @@ -10,7 +8,6 @@ use crate::metrics::TENANT_TASK_EVENTS; use crate::task_mgr; use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME}; use crate::tenant::mgr; -use crate::tenant::{Tenant, TenantState}; use tokio_util::sync::CancellationToken; use tracing::*; use utils::id::TenantId; @@ -60,15 +57,12 @@ async fn compaction_loop(tenant_id: TenantId) { loop { trace!("waking up"); - let tenant = tokio::select! { - _ = cancel.cancelled() => { - info!("received cancellation request"); - return; - }, - tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result { - ControlFlow::Break(()) => return, - ControlFlow::Continue(tenant) => tenant, - }, + let tenant = match mgr::wait_for_active_tenant(tenant_id).await { + Ok(tenant) => tenant, // let's have another iteration + Err(e) => { + info!("exiting: {e:#}"); + break; + } }; let period = tenant.get_compaction_period(); @@ -127,20 +121,18 @@ async fn gc_loop(tenant_id: TenantId) { let cancel = task_mgr::shutdown_token(); // GC might require downloading, to find the cutoff LSN that corresponds to the // cutoff specified as time. - let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download); + let ctx = + RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download); let mut first = true; loop { trace!("waking up"); - let tenant = tokio::select! { - _ = cancel.cancelled() => { - info!("received cancellation request"); - return; - }, - tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result { - ControlFlow::Break(()) => return, - ControlFlow::Continue(tenant) => tenant, - }, + let tenant = match mgr::wait_for_active_tenant(tenant_id).await { + Ok(tenant) => tenant, // let's have another iteration + Err(e) => { + info!("exiting: {e:#}"); + break; + } }; let period = tenant.get_gc_period(); @@ -161,7 +153,9 @@ async fn gc_loop(tenant_id: TenantId) { Duration::from_secs(10) } else { // Run gc - let res = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx).await; + let res = tenant + .gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx) + .await; if let Err(e) = res { error!("Gc failed, retrying in {:?}: {e:?}", wait_duration); wait_duration @@ -187,49 +181,6 @@ async fn gc_loop(tenant_id: TenantId) { trace!("GC loop stopped."); } -async fn wait_for_active_tenant( - tenant_id: TenantId, - wait: Duration, -) -> ControlFlow<(), Arc> { - let tenant = loop { - match mgr::get_tenant(tenant_id, false).await { - Ok(tenant) => break tenant, - Err(e) => { - error!("Failed to get a tenant {tenant_id}: {e:#}"); - tokio::time::sleep(wait).await; - } - } - }; - - // if the tenant has a proper status already, no need to wait for anything - if tenant.current_state() == TenantState::Active { - ControlFlow::Continue(tenant) - } else { - let mut tenant_state_updates = tenant.subscribe_for_state_updates(); - loop { - match tenant_state_updates.changed().await { - Ok(()) => { - let new_state = &*tenant_state_updates.borrow(); - match new_state { - TenantState::Active => { - debug!("Tenant state changed to active, continuing the task loop"); - return ControlFlow::Continue(tenant); - } - state => { - debug!("Not running the task loop, tenant is not active: {state:?}"); - continue; - } - } - } - Err(_sender_dropped_error) => { - info!("Tenant dropped the state updates sender, quitting waiting for tenant and the task loop"); - return ControlFlow::Break(()); - } - } - } - } -} - #[derive(thiserror::Error, Debug)] #[error("cancelled")] pub(crate) struct Cancelled;