mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 09:52:54 +00:00
add API to wait for a tenant to appear in the tenant map
I developed this before I pivoted to https://github.com/neondatabase/neon/pull/4299 for the problem I had at hand. Yet, it might be useful to have an API to wait for a tenant to appear. Includes a demo use case inside the tenant tasks.
This commit is contained in:
@@ -32,23 +32,46 @@ enum TenantsMap {
|
||||
Initializing,
|
||||
/// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
|
||||
/// New tenants can be added using [`tenant_map_insert`].
|
||||
Open(HashMap<TenantId, Arc<Tenant>>),
|
||||
Open(HashMap<TenantId, OpenTenantsMapEntry>),
|
||||
/// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
|
||||
/// Existing tenants are still accessible, but no new tenants can be created.
|
||||
ShuttingDown(HashMap<TenantId, Arc<Tenant>>),
|
||||
}
|
||||
|
||||
enum OpenTenantsMapEntry {
|
||||
InterestOnly(tokio::sync::broadcast::Sender<Arc<Tenant>>),
|
||||
Available(Arc<Tenant>),
|
||||
}
|
||||
|
||||
impl TenantsMap {
|
||||
/// Return a ref to a tenant if it exists in the map.
|
||||
/// If there's just interest in a tenant, but the tenant does not exist,
|
||||
/// this function returns `None`.
|
||||
fn get(&self, tenant_id: &TenantId) -> Option<&Arc<Tenant>> {
|
||||
match self {
|
||||
TenantsMap::Initializing => None,
|
||||
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.get(tenant_id),
|
||||
TenantsMap::Open(m) => match m.get(tenant_id) {
|
||||
Some(tme) => match tme {
|
||||
OpenTenantsMapEntry::InterestOnly(_) => None,
|
||||
OpenTenantsMapEntry::Available(t) => Some(t),
|
||||
},
|
||||
None => None,
|
||||
},
|
||||
TenantsMap::ShuttingDown(m) => match m.get(tenant_id) {
|
||||
Some(t) => Some(t),
|
||||
None => None,
|
||||
},
|
||||
}
|
||||
}
|
||||
fn remove(&mut self, tenant_id: &TenantId) -> Option<Arc<Tenant>> {
|
||||
match self {
|
||||
TenantsMap::Initializing => None,
|
||||
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id),
|
||||
TenantsMap::Open(m) => match m.remove(tenant_id) {
|
||||
None => None,
|
||||
Some(OpenTenantsMapEntry::InterestOnly(_)) => None,
|
||||
Some(OpenTenantsMapEntry::Available(tenant)) => Some(tenant),
|
||||
},
|
||||
TenantsMap::ShuttingDown(m) => m.remove(tenant_id),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -120,7 +143,8 @@ pub async fn init_tenant_mgr(
|
||||
&ctx,
|
||||
) {
|
||||
Ok(tenant) => {
|
||||
tenants.insert(tenant.tenant_id(), tenant);
|
||||
tenants
|
||||
.insert(tenant.tenant_id(), OpenTenantsMapEntry::Available(tenant));
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to collect tenant files from dir {tenants_dir:?} for entry {dir_entry:?}, reason: {e:#}");
|
||||
@@ -230,8 +254,18 @@ pub async fn shutdown_all_tenants() {
|
||||
return;
|
||||
}
|
||||
TenantsMap::Open(tenants) => {
|
||||
let tenants: HashMap<TenantId, Arc<Tenant>> = std::mem::take(tenants)
|
||||
.into_iter()
|
||||
.filter_map(|(tenant_id, tme)| {
|
||||
match tme {
|
||||
// don't give the waiters any more hope
|
||||
OpenTenantsMapEntry::InterestOnly(_) => None,
|
||||
OpenTenantsMapEntry::Available(tenant) => Some((tenant_id, tenant)),
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
let tenants_clone = tenants.clone();
|
||||
*m = TenantsMap::ShuttingDown(std::mem::take(tenants));
|
||||
*m = TenantsMap::ShuttingDown(tenants);
|
||||
tenants_clone
|
||||
}
|
||||
TenantsMap::ShuttingDown(_) => {
|
||||
@@ -278,7 +312,7 @@ pub async fn create_tenant(
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Arc<Tenant>, TenantMapInsertError> {
|
||||
tenant_map_insert(tenant_id, |vacant_entry| {
|
||||
tenant_map_insert(tenant_id, || {
|
||||
// We're holding the tenants lock in write mode while doing local IO.
|
||||
// If this section ever becomes contentious, introduce a new `TenantState::Creating`
|
||||
// and do the work in that state.
|
||||
@@ -296,7 +330,6 @@ pub async fn create_tenant(
|
||||
tenant_id == crated_tenant_id,
|
||||
"loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {crated_tenant_id})",
|
||||
);
|
||||
vacant_entry.insert(Arc::clone(&created_tenant));
|
||||
Ok(created_tenant)
|
||||
}).await
|
||||
}
|
||||
@@ -337,6 +370,71 @@ pub async fn get_tenant(
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum WaitForActiveTenantError {
|
||||
#[error("Tenant map is still initializing")]
|
||||
Initializing,
|
||||
#[error("Tenant map is shutting down")]
|
||||
ShuttingDown,
|
||||
#[error(transparent)]
|
||||
Tenant(anyhow::Error),
|
||||
}
|
||||
|
||||
/// Cancellation-Safety: This function is cancellation-safe.
|
||||
pub(crate) async fn wait_for_active_tenant(
|
||||
tenant_id: TenantId,
|
||||
) -> Result<Arc<Tenant>, WaitForActiveTenantError> {
|
||||
let tenant = loop {
|
||||
let r_guard = TENANTS.read().await;
|
||||
let m = match &*r_guard {
|
||||
TenantsMap::Initializing => return Err(WaitForActiveTenantError::Initializing),
|
||||
TenantsMap::ShuttingDown(_) => return Err(WaitForActiveTenantError::ShuttingDown),
|
||||
TenantsMap::Open(m) => m,
|
||||
};
|
||||
if let Some(tme) = m.get(&tenant_id) {
|
||||
match tme {
|
||||
OpenTenantsMapEntry::InterestOnly(tx) => {
|
||||
let mut rx = tx.subscribe();
|
||||
drop(r_guard);
|
||||
break rx
|
||||
.recv()
|
||||
.await
|
||||
.map_err(|_| WaitForActiveTenantError::ShuttingDown)?;
|
||||
}
|
||||
OpenTenantsMapEntry::Available(tenant) => break Arc::clone(tenant),
|
||||
}
|
||||
}
|
||||
// TODO: would be nice to have upgradeable rwlock guard here
|
||||
drop(r_guard);
|
||||
let mut w_guard = TENANTS.write().await;
|
||||
let m = match &mut *w_guard {
|
||||
TenantsMap::Initializing => return Err(WaitForActiveTenantError::Initializing),
|
||||
TenantsMap::ShuttingDown(_) => return Err(WaitForActiveTenantError::ShuttingDown),
|
||||
TenantsMap::Open(m) => m,
|
||||
};
|
||||
match m.entry(tenant_id) {
|
||||
hash_map::Entry::Occupied(_e) => {
|
||||
// the world changed while we weren't holding the lock
|
||||
continue;
|
||||
}
|
||||
hash_map::Entry::Vacant(e) => {
|
||||
let (tx, mut rx) = tokio::sync::broadcast::channel(1);
|
||||
e.insert(OpenTenantsMapEntry::InterestOnly(tx));
|
||||
drop(w_guard);
|
||||
break rx
|
||||
.recv()
|
||||
.await
|
||||
.map_err(|_| WaitForActiveTenantError::ShuttingDown)?;
|
||||
}
|
||||
}
|
||||
};
|
||||
tenant
|
||||
.wait_to_become_active()
|
||||
.await
|
||||
.map_err(WaitForActiveTenantError::Tenant)?;
|
||||
Ok(tenant)
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum DeleteTimelineError {
|
||||
#[error("Tenant {0}")]
|
||||
@@ -408,7 +506,7 @@ pub async fn load_tenant(
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), TenantMapInsertError> {
|
||||
tenant_map_insert(tenant_id, |vacant_entry| {
|
||||
tenant_map_insert(tenant_id, || {
|
||||
let tenant_path = conf.tenant_path(&tenant_id);
|
||||
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(tenant_id);
|
||||
if tenant_ignore_mark.exists() {
|
||||
@@ -421,9 +519,9 @@ pub async fn load_tenant(
|
||||
format!("Failed to schedule tenant processing in path {tenant_path:?}")
|
||||
})?;
|
||||
|
||||
vacant_entry.insert(new_tenant);
|
||||
Ok(())
|
||||
}).await
|
||||
Ok(new_tenant)
|
||||
}).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn ignore_tenant(
|
||||
@@ -456,13 +554,20 @@ pub enum TenantMapListError {
|
||||
///
|
||||
pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
|
||||
let tenants = TENANTS.read().await;
|
||||
let m = match &*tenants {
|
||||
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
|
||||
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
|
||||
};
|
||||
Ok(m.iter()
|
||||
.map(|(id, tenant)| (*id, tenant.current_state()))
|
||||
.collect())
|
||||
match &*tenants {
|
||||
TenantsMap::Initializing => Err(TenantMapListError::Initializing),
|
||||
TenantsMap::Open(m) => Ok(m
|
||||
.iter()
|
||||
.filter_map(|(id, tme)| match tme {
|
||||
OpenTenantsMapEntry::InterestOnly(_) => None,
|
||||
OpenTenantsMapEntry::Available(t) => Some((*id, t.current_state())),
|
||||
})
|
||||
.collect()),
|
||||
TenantsMap::ShuttingDown(m) => Ok(m
|
||||
.iter()
|
||||
.map(|(id, tenant)| (*id, tenant.current_state()))
|
||||
.collect()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute Attach mgmt API command.
|
||||
@@ -476,7 +581,7 @@ pub async fn attach_tenant(
|
||||
remote_storage: GenericRemoteStorage,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), TenantMapInsertError> {
|
||||
tenant_map_insert(tenant_id, |vacant_entry| {
|
||||
tenant_map_insert(tenant_id, || {
|
||||
let tenant_dir = create_tenant_files(conf, tenant_conf, tenant_id, CreateTenantFilesMode::Attach)?;
|
||||
// TODO: tenant directory remains on disk if we bail out from here on.
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
@@ -497,10 +602,10 @@ pub async fn attach_tenant(
|
||||
tenant_id == attached_tenant_id,
|
||||
"loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {attached_tenant_id})",
|
||||
);
|
||||
vacant_entry.insert(Arc::clone(&attached_tenant));
|
||||
Ok(())
|
||||
Ok(attached_tenant)
|
||||
})
|
||||
.await
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -521,12 +626,12 @@ pub enum TenantMapInsertError {
|
||||
///
|
||||
/// NB: the closure should return quickly because the current implementation of tenants map
|
||||
/// serializes access through an `RwLock`.
|
||||
async fn tenant_map_insert<F, V>(
|
||||
async fn tenant_map_insert<F>(
|
||||
tenant_id: TenantId,
|
||||
insert_fn: F,
|
||||
) -> Result<V, TenantMapInsertError>
|
||||
) -> Result<Arc<Tenant>, TenantMapInsertError>
|
||||
where
|
||||
F: FnOnce(hash_map::VacantEntry<TenantId, Arc<Tenant>>) -> anyhow::Result<V>,
|
||||
F: FnOnce() -> anyhow::Result<Arc<Tenant>>,
|
||||
{
|
||||
let mut guard = TENANTS.write().await;
|
||||
let m = match &mut *guard {
|
||||
@@ -535,14 +640,29 @@ where
|
||||
TenantsMap::Open(m) => m,
|
||||
};
|
||||
match m.entry(tenant_id) {
|
||||
hash_map::Entry::Occupied(e) => Err(TenantMapInsertError::TenantAlreadyExists(
|
||||
tenant_id,
|
||||
e.get().current_state(),
|
||||
)),
|
||||
hash_map::Entry::Vacant(v) => match insert_fn(v) {
|
||||
Ok(v) => Ok(v),
|
||||
Err(e) => Err(TenantMapInsertError::Closure(e)),
|
||||
hash_map::Entry::Occupied(mut e) => match e.get() {
|
||||
OpenTenantsMapEntry::Available(tenant) => Err(
|
||||
TenantMapInsertError::TenantAlreadyExists(tenant_id, tenant.current_state()),
|
||||
),
|
||||
OpenTenantsMapEntry::InterestOnly(v) => {
|
||||
let tenant = match insert_fn() {
|
||||
Ok(v) => v,
|
||||
Err(e) => return Err(TenantMapInsertError::Closure(e)),
|
||||
};
|
||||
// ignore if the interest has gone away
|
||||
let _ = v.send(Arc::clone(&tenant));
|
||||
e.insert(OpenTenantsMapEntry::Available(Arc::clone(&tenant)));
|
||||
Ok(tenant)
|
||||
}
|
||||
},
|
||||
hash_map::Entry::Vacant(v) => {
|
||||
let tenant = match insert_fn() {
|
||||
Ok(v) => v,
|
||||
Err(e) => return Err(TenantMapInsertError::Closure(e)),
|
||||
};
|
||||
v.insert(OpenTenantsMapEntry::Available(Arc::clone(&tenant)));
|
||||
Ok(tenant)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
//! This module contains functions to serve per-tenant background processes,
|
||||
//! such as compaction and GC
|
||||
|
||||
use std::ops::ControlFlow;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
@@ -10,7 +8,6 @@ use crate::metrics::TENANT_TASK_EVENTS;
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
|
||||
use crate::tenant::mgr;
|
||||
use crate::tenant::{Tenant, TenantState};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::id::TenantId;
|
||||
@@ -60,15 +57,12 @@ async fn compaction_loop(tenant_id: TenantId) {
|
||||
loop {
|
||||
trace!("waking up");
|
||||
|
||||
let tenant = tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
info!("received cancellation request");
|
||||
return;
|
||||
},
|
||||
tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result {
|
||||
ControlFlow::Break(()) => return,
|
||||
ControlFlow::Continue(tenant) => tenant,
|
||||
},
|
||||
let tenant = match mgr::wait_for_active_tenant(tenant_id).await {
|
||||
Ok(tenant) => tenant, // let's have another iteration
|
||||
Err(e) => {
|
||||
info!("exiting: {e:#}");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
let period = tenant.get_compaction_period();
|
||||
@@ -127,20 +121,18 @@ async fn gc_loop(tenant_id: TenantId) {
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
// GC might require downloading, to find the cutoff LSN that corresponds to the
|
||||
// cutoff specified as time.
|
||||
let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
|
||||
let ctx =
|
||||
RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
|
||||
let mut first = true;
|
||||
loop {
|
||||
trace!("waking up");
|
||||
|
||||
let tenant = tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
info!("received cancellation request");
|
||||
return;
|
||||
},
|
||||
tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result {
|
||||
ControlFlow::Break(()) => return,
|
||||
ControlFlow::Continue(tenant) => tenant,
|
||||
},
|
||||
let tenant = match mgr::wait_for_active_tenant(tenant_id).await {
|
||||
Ok(tenant) => tenant, // let's have another iteration
|
||||
Err(e) => {
|
||||
info!("exiting: {e:#}");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
let period = tenant.get_gc_period();
|
||||
@@ -161,7 +153,9 @@ async fn gc_loop(tenant_id: TenantId) {
|
||||
Duration::from_secs(10)
|
||||
} else {
|
||||
// Run gc
|
||||
let res = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx).await;
|
||||
let res = tenant
|
||||
.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx)
|
||||
.await;
|
||||
if let Err(e) = res {
|
||||
error!("Gc failed, retrying in {:?}: {e:?}", wait_duration);
|
||||
wait_duration
|
||||
@@ -187,49 +181,6 @@ async fn gc_loop(tenant_id: TenantId) {
|
||||
trace!("GC loop stopped.");
|
||||
}
|
||||
|
||||
async fn wait_for_active_tenant(
|
||||
tenant_id: TenantId,
|
||||
wait: Duration,
|
||||
) -> ControlFlow<(), Arc<Tenant>> {
|
||||
let tenant = loop {
|
||||
match mgr::get_tenant(tenant_id, false).await {
|
||||
Ok(tenant) => break tenant,
|
||||
Err(e) => {
|
||||
error!("Failed to get a tenant {tenant_id}: {e:#}");
|
||||
tokio::time::sleep(wait).await;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// if the tenant has a proper status already, no need to wait for anything
|
||||
if tenant.current_state() == TenantState::Active {
|
||||
ControlFlow::Continue(tenant)
|
||||
} else {
|
||||
let mut tenant_state_updates = tenant.subscribe_for_state_updates();
|
||||
loop {
|
||||
match tenant_state_updates.changed().await {
|
||||
Ok(()) => {
|
||||
let new_state = &*tenant_state_updates.borrow();
|
||||
match new_state {
|
||||
TenantState::Active => {
|
||||
debug!("Tenant state changed to active, continuing the task loop");
|
||||
return ControlFlow::Continue(tenant);
|
||||
}
|
||||
state => {
|
||||
debug!("Not running the task loop, tenant is not active: {state:?}");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_sender_dropped_error) => {
|
||||
info!("Tenant dropped the state updates sender, quitting waiting for tenant and the task loop");
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
#[error("cancelled")]
|
||||
pub(crate) struct Cancelled;
|
||||
|
||||
Reference in New Issue
Block a user