Remove global static TENANTS (#12169)

## Problem
There is this TODO in code:
https://github.com/neondatabase/neon/blob/main/pageserver/src/tenant/mgr.rs#L300-L302
This is an old TODO by @jcsp.

## Summary of changes
This PR addresses the TODO. Specifically, it removes a global static
`TENANTS`. Instead the `TenantManager` now directly manages the tenant
map. Enhancing abstraction.

Essentially, this PR moves all module-level methods to inside the
implementation of `TenantManager`.
This commit is contained in:
Trung Dinh
2025-06-10 02:26:40 -07:00
committed by GitHub
parent 58327ef74d
commit 02f94edb60
2 changed files with 388 additions and 344 deletions

View File

@@ -573,7 +573,8 @@ fn start_pageserver(
tokio::sync::mpsc::unbounded_channel();
let deletion_queue_client = deletion_queue.new_client();
let background_purges = mgr::BackgroundPurges::default();
let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
let tenant_manager = mgr::init(
conf,
background_purges.clone(),
TenantSharedResources {
@@ -584,10 +585,10 @@ fn start_pageserver(
basebackup_prepare_sender,
feature_resolver,
},
order,
shutdown_pageserver.clone(),
))?;
);
let tenant_manager = Arc::new(tenant_manager);
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(tenant_manager.clone(), order))?;
let basebackup_cache = BasebackupCache::spawn(
BACKGROUND_RUNTIME.handle(),

View File

@@ -12,7 +12,6 @@ use anyhow::Context;
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
use futures::StreamExt;
use itertools::Itertools;
use once_cell::sync::Lazy;
use pageserver_api::key::Key;
use pageserver_api::models::{DetachBehavior, LocationConfigMode};
use pageserver_api::shard::{
@@ -103,7 +102,7 @@ pub(crate) enum TenantsMap {
/// [`init_tenant_mgr`] is not done yet.
Initializing,
/// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
/// New tenants can be added using [`tenant_map_acquire_slot`].
/// New tenants can be added using [`TenantManager::tenant_map_acquire_slot`].
Open(BTreeMap<TenantShardId, TenantSlot>),
/// The pageserver has entered shutdown mode via [`TenantManager::shutdown`].
/// Existing tenants are still accessible, but no new tenants can be created.
@@ -284,9 +283,6 @@ impl BackgroundPurges {
}
}
static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing));
/// Responsible for storing and mutating the collection of all tenants
/// that this pageserver has state for.
///
@@ -297,10 +293,7 @@ static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
/// and attached modes concurrently.
pub struct TenantManager {
conf: &'static PageServerConf,
// TODO: currently this is a &'static pointing to TENANTs. When we finish refactoring
// out of that static variable, the TenantManager can own this.
// See https://github.com/neondatabase/neon/issues/5796
tenants: &'static std::sync::RwLock<TenantsMap>,
tenants: std::sync::RwLock<TenantsMap>,
resources: TenantSharedResources,
// Long-running operations that happen outside of a [`Tenant`] lifetime should respect this token.
@@ -479,21 +472,43 @@ pub(crate) enum DeleteTenantError {
Other(#[from] anyhow::Error),
}
/// Initialize repositories with locally available timelines.
/// Initialize repositories at `Initializing` state.
pub fn init(
conf: &'static PageServerConf,
background_purges: BackgroundPurges,
resources: TenantSharedResources,
cancel: CancellationToken,
) -> TenantManager {
TenantManager {
conf,
tenants: std::sync::RwLock::new(TenantsMap::Initializing),
resources,
cancel,
background_purges,
}
}
/// Transition repositories from `Initializing` state to `Open` state with locally available timelines.
/// Timelines that are only partially available locally (remote storage has more data than this pageserver)
/// are scheduled for download and added to the tenant once download is completed.
#[instrument(skip_all)]
pub async fn init_tenant_mgr(
conf: &'static PageServerConf,
background_purges: BackgroundPurges,
resources: TenantSharedResources,
tenant_manager: Arc<TenantManager>,
init_order: InitializationOrder,
cancel: CancellationToken,
) -> anyhow::Result<TenantManager> {
) -> anyhow::Result<()> {
debug_assert!(matches!(
*tenant_manager.tenants.read().unwrap(),
TenantsMap::Initializing
));
let mut tenants = BTreeMap::new();
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
let conf = tenant_manager.conf;
let resources = &tenant_manager.resources;
let cancel = &tenant_manager.cancel;
let background_purges = &tenant_manager.background_purges;
// Initialize dynamic limits that depend on system resources
let system_memory =
sysinfo::System::new_with_specifics(sysinfo::RefreshKind::new().with_memory())
@@ -512,7 +527,7 @@ pub async fn init_tenant_mgr(
let tenant_configs = init_load_tenant_configs(conf).await;
// Determine which tenants are to be secondary or attached, and in which generation
let tenant_modes = init_load_generations(conf, &tenant_configs, &resources, &cancel).await?;
let tenant_modes = init_load_generations(conf, &tenant_configs, resources, cancel).await?;
tracing::info!(
"Attaching {} tenants at startup, warming up {} at a time",
@@ -669,18 +684,10 @@ pub async fn init_tenant_mgr(
info!("Processed {} local tenants at startup", tenants.len());
let mut tenants_map = TENANTS.write().unwrap();
assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
let mut tenant_map = tenant_manager.tenants.write().unwrap();
*tenant_map = TenantsMap::Open(tenants);
*tenants_map = TenantsMap::Open(tenants);
Ok(TenantManager {
conf,
tenants: &TENANTS,
resources,
cancel: CancellationToken::new(),
background_purges,
})
Ok(())
}
/// Wrapper for Tenant::spawn that checks invariants before running
@@ -719,142 +726,6 @@ fn tenant_spawn(
)
}
async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
let mut join_set = JoinSet::new();
#[cfg(all(debug_assertions, not(test)))]
{
// Check that our metrics properly tracked the size of the tenants map. This is a convenient location to check,
// as it happens implicitly at the end of tests etc.
let m = tenants.read().unwrap();
debug_assert_eq!(METRICS.slots_total(), m.len() as u64);
}
// Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
let (total_in_progress, total_attached) = {
let mut m = tenants.write().unwrap();
match &mut *m {
TenantsMap::Initializing => {
*m = TenantsMap::ShuttingDown(BTreeMap::default());
info!("tenants map is empty");
return;
}
TenantsMap::Open(tenants) => {
let mut shutdown_state = BTreeMap::new();
let mut total_in_progress = 0;
let mut total_attached = 0;
for (tenant_shard_id, v) in std::mem::take(tenants).into_iter() {
match v {
TenantSlot::Attached(t) => {
shutdown_state.insert(tenant_shard_id, TenantSlot::Attached(t.clone()));
join_set.spawn(
async move {
let res = {
let (_guard, shutdown_progress) = completion::channel();
t.shutdown(shutdown_progress, ShutdownMode::FreezeAndFlush).await
};
if let Err(other_progress) = res {
// join the another shutdown in progress
other_progress.wait().await;
}
// we cannot afford per tenant logging here, because if s3 is degraded, we are
// going to log too many lines
debug!("tenant successfully stopped");
}
.instrument(info_span!("shutdown", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug())),
);
total_attached += 1;
}
TenantSlot::Secondary(state) => {
// We don't need to wait for this individually per-tenant: the
// downloader task will be waited on eventually, this cancel
// is just to encourage it to drop out if it is doing work
// for this tenant right now.
state.cancel.cancel();
shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary(state));
}
TenantSlot::InProgress(notify) => {
// InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
// wait for their notifications to fire in this function.
join_set.spawn(async move {
notify.wait().await;
});
total_in_progress += 1;
}
}
}
*m = TenantsMap::ShuttingDown(shutdown_state);
(total_in_progress, total_attached)
}
TenantsMap::ShuttingDown(_) => {
error!(
"already shutting down, this function isn't supposed to be called more than once"
);
return;
}
}
};
let started_at = std::time::Instant::now();
info!(
"Waiting for {} InProgress tenants and {} Attached tenants to shut down",
total_in_progress, total_attached
);
let total = join_set.len();
let mut panicked = 0;
let mut buffering = true;
const BUFFER_FOR: std::time::Duration = std::time::Duration::from_millis(500);
let mut buffered = std::pin::pin!(tokio::time::sleep(BUFFER_FOR));
while !join_set.is_empty() {
tokio::select! {
Some(joined) = join_set.join_next() => {
match joined {
Ok(()) => {},
Err(join_error) if join_error.is_cancelled() => {
unreachable!("we are not cancelling any of the tasks");
}
Err(join_error) if join_error.is_panic() => {
// cannot really do anything, as this panic is likely a bug
panicked += 1;
}
Err(join_error) => {
warn!("unknown kind of JoinError: {join_error}");
}
}
if !buffering {
// buffer so that every 500ms since the first update (or starting) we'll log
// how far away we are; this is because we will get SIGKILL'd at 10s, and we
// are not able to log *then*.
buffering = true;
buffered.as_mut().reset(tokio::time::Instant::now() + BUFFER_FOR);
}
},
_ = &mut buffered, if buffering => {
buffering = false;
info!(remaining = join_set.len(), total, elapsed_ms = started_at.elapsed().as_millis(), "waiting for tenants to shutdown");
}
}
}
if panicked > 0 {
warn!(
panicked,
total, "observed panicks while shutting down tenants"
);
}
// caller will log how long we took
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum UpsertLocationError {
#[error("Bad config request: {0}")]
@@ -1056,7 +927,8 @@ impl TenantManager {
// the tenant is inaccessible to the outside world while we are doing this, but that is sensible:
// the state is ill-defined while we're in transition. Transitions are async, but fast: we do
// not do significant I/O, and shutdowns should be prompt via cancellation tokens.
let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)
let mut slot_guard = self
.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)
.map_err(|e| match e {
TenantSlotError::NotFound(_) => {
unreachable!("Called with mode Any")
@@ -1223,6 +1095,75 @@ impl TenantManager {
}
}
fn tenant_map_acquire_slot(
&self,
tenant_shard_id: &TenantShardId,
mode: TenantSlotAcquireMode,
) -> Result<SlotGuard, TenantSlotError> {
use TenantSlotAcquireMode::*;
METRICS.tenant_slot_writes.inc();
let mut locked = self.tenants.write().unwrap();
let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug());
let _guard = span.enter();
let m = match &mut *locked {
TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()),
TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown.into()),
TenantsMap::Open(m) => m,
};
use std::collections::btree_map::Entry;
let entry = m.entry(*tenant_shard_id);
match entry {
Entry::Vacant(v) => match mode {
MustExist => {
tracing::debug!("Vacant && MustExist: return NotFound");
Err(TenantSlotError::NotFound(*tenant_shard_id))
}
_ => {
let (completion, barrier) = utils::completion::channel();
let inserting = TenantSlot::InProgress(barrier);
METRICS.slot_inserted(&inserting);
v.insert(inserting);
tracing::debug!("Vacant, inserted InProgress");
Ok(SlotGuard::new(
*tenant_shard_id,
None,
completion,
&self.tenants,
))
}
},
Entry::Occupied(mut o) => {
// Apply mode-driven checks
match (o.get(), mode) {
(TenantSlot::InProgress(_), _) => {
tracing::debug!("Occupied, failing for InProgress");
Err(TenantSlotError::InProgress)
}
_ => {
// Happy case: the slot was not in any state that violated our mode
let (completion, barrier) = utils::completion::channel();
let in_progress = TenantSlot::InProgress(barrier);
METRICS.slot_inserted(&in_progress);
let old_value = o.insert(in_progress);
METRICS.slot_removed(&old_value);
tracing::debug!("Occupied, replaced with InProgress");
Ok(SlotGuard::new(
*tenant_shard_id,
Some(old_value),
completion,
&self.tenants,
))
}
}
}
}
}
/// Resetting a tenant is equivalent to detaching it, then attaching it again with the same
/// LocationConf that was last used to attach it. Optionally, the local file cache may be
/// dropped before re-attaching.
@@ -1239,7 +1180,8 @@ impl TenantManager {
drop_cache: bool,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
let mut slot_guard =
self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
let Some(old_slot) = slot_guard.get_old_value() else {
anyhow::bail!("Tenant not found when trying to reset");
};
@@ -1388,7 +1330,8 @@ impl TenantManager {
Ok(())
}
let slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
let slot_guard =
self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
match &slot_guard.old_value {
Some(TenantSlot::Attached(tenant)) => {
// Legacy deletion flow: the tenant remains attached, goes to Stopping state, and
@@ -1539,7 +1482,7 @@ impl TenantManager {
// Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant
drop(tenant);
let mut parent_slot_guard =
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
let parent = match parent_slot_guard.get_old_value() {
Some(TenantSlot::Attached(t)) => t,
Some(TenantSlot::Secondary(_)) => anyhow::bail!("Tenant location in secondary mode"),
@@ -1843,7 +1786,145 @@ impl TenantManager {
pub(crate) async fn shutdown(&self) {
self.cancel.cancel();
shutdown_all_tenants0(self.tenants).await
self.shutdown_all_tenants0().await
}
async fn shutdown_all_tenants0(&self) {
let mut join_set = JoinSet::new();
#[cfg(all(debug_assertions, not(test)))]
{
// Check that our metrics properly tracked the size of the tenants map. This is a convenient location to check,
// as it happens implicitly at the end of tests etc.
let m = self.tenants.read().unwrap();
debug_assert_eq!(METRICS.slots_total(), m.len() as u64);
}
// Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
let (total_in_progress, total_attached) = {
let mut m = self.tenants.write().unwrap();
match &mut *m {
TenantsMap::Initializing => {
*m = TenantsMap::ShuttingDown(BTreeMap::default());
info!("tenants map is empty");
return;
}
TenantsMap::Open(tenants) => {
let mut shutdown_state = BTreeMap::new();
let mut total_in_progress = 0;
let mut total_attached = 0;
for (tenant_shard_id, v) in std::mem::take(tenants).into_iter() {
match v {
TenantSlot::Attached(t) => {
shutdown_state
.insert(tenant_shard_id, TenantSlot::Attached(t.clone()));
join_set.spawn(
async move {
let res = {
let (_guard, shutdown_progress) = completion::channel();
t.shutdown(shutdown_progress, ShutdownMode::FreezeAndFlush).await
};
if let Err(other_progress) = res {
// join the another shutdown in progress
other_progress.wait().await;
}
// we cannot afford per tenant logging here, because if s3 is degraded, we are
// going to log too many lines
debug!("tenant successfully stopped");
}
.instrument(info_span!("shutdown", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug())),
);
total_attached += 1;
}
TenantSlot::Secondary(state) => {
// We don't need to wait for this individually per-tenant: the
// downloader task will be waited on eventually, this cancel
// is just to encourage it to drop out if it is doing work
// for this tenant right now.
state.cancel.cancel();
shutdown_state
.insert(tenant_shard_id, TenantSlot::Secondary(state));
}
TenantSlot::InProgress(notify) => {
// InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
// wait for their notifications to fire in this function.
join_set.spawn(async move {
notify.wait().await;
});
total_in_progress += 1;
}
}
}
*m = TenantsMap::ShuttingDown(shutdown_state);
(total_in_progress, total_attached)
}
TenantsMap::ShuttingDown(_) => {
error!(
"already shutting down, this function isn't supposed to be called more than once"
);
return;
}
}
};
let started_at = std::time::Instant::now();
info!(
"Waiting for {} InProgress tenants and {} Attached tenants to shut down",
total_in_progress, total_attached
);
let total = join_set.len();
let mut panicked = 0;
let mut buffering = true;
const BUFFER_FOR: std::time::Duration = std::time::Duration::from_millis(500);
let mut buffered = std::pin::pin!(tokio::time::sleep(BUFFER_FOR));
while !join_set.is_empty() {
tokio::select! {
Some(joined) = join_set.join_next() => {
match joined {
Ok(()) => {},
Err(join_error) if join_error.is_cancelled() => {
unreachable!("we are not cancelling any of the tasks");
}
Err(join_error) if join_error.is_panic() => {
// cannot really do anything, as this panic is likely a bug
panicked += 1;
}
Err(join_error) => {
warn!("unknown kind of JoinError: {join_error}");
}
}
if !buffering {
// buffer so that every 500ms since the first update (or starting) we'll log
// how far away we are; this is because we will get SIGKILL'd at 10s, and we
// are not able to log *then*.
buffering = true;
buffered.as_mut().reset(tokio::time::Instant::now() + BUFFER_FOR);
}
},
_ = &mut buffered, if buffering => {
buffering = false;
info!(remaining = join_set.len(), total, elapsed_ms = started_at.elapsed().as_millis(), "waiting for tenants to shutdown");
}
}
}
if panicked > 0 {
warn!(
panicked,
total, "observed panicks while shutting down tenants"
);
}
// caller will log how long we took
}
/// Detaches a tenant, and removes its local files asynchronously.
@@ -1889,12 +1970,12 @@ impl TenantManager {
.map(Some)
};
let mut removal_result = remove_tenant_from_memory(
self.tenants,
tenant_shard_id,
tenant_dir_rename_operation(tenant_shard_id),
)
.await;
let mut removal_result = self
.remove_tenant_from_memory(
tenant_shard_id,
tenant_dir_rename_operation(tenant_shard_id),
)
.await;
// If the tenant was not found, it was likely already removed. Attempt to remove the tenant
// directory on disk anyway. For example, during shard splits, we shut down and remove the
@@ -1948,17 +2029,16 @@ impl TenantManager {
) -> Result<HashSet<TimelineId>, detach_ancestor::Error> {
use detach_ancestor::Error;
let slot_guard =
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist).map_err(
|e| {
use TenantSlotError::*;
let slot_guard = self
.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)
.map_err(|e| {
use TenantSlotError::*;
match e {
MapState(TenantMapError::ShuttingDown) => Error::ShuttingDown,
NotFound(_) | InProgress | MapState(_) => Error::DetachReparent(e.into()),
}
},
)?;
match e {
MapState(TenantMapError::ShuttingDown) => Error::ShuttingDown,
NotFound(_) | InProgress | MapState(_) => Error::DetachReparent(e.into()),
}
})?;
let tenant = {
let old_slot = slot_guard
@@ -2291,6 +2371,80 @@ impl TenantManager {
other => ApiError::InternalServerError(anyhow::anyhow!(other)),
})
}
/// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
/// Allows to remove other tenant resources manually, via `tenant_cleanup`.
/// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
async fn remove_tenant_from_memory<V, F>(
&self,
tenant_shard_id: TenantShardId,
tenant_cleanup: F,
) -> Result<V, TenantStateError>
where
F: std::future::Future<Output = anyhow::Result<V>>,
{
let mut slot_guard =
self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
// allow pageserver shutdown to await for our completion
let (_guard, progress) = completion::channel();
// The SlotGuard allows us to manipulate the Tenant object without fear of some
// concurrent API request doing something else for the same tenant ID.
let attached_tenant = match slot_guard.get_old_value() {
Some(TenantSlot::Attached(tenant)) => {
// whenever we remove a tenant from memory, we don't want to flush and wait for upload
let shutdown_mode = ShutdownMode::Hard;
// shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
// that we can continue safely to cleanup.
match tenant.shutdown(progress, shutdown_mode).await {
Ok(()) => {}
Err(_other) => {
// if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
// wait for it but return an error right away because these are distinct requests.
slot_guard.revert();
return Err(TenantStateError::IsStopping(tenant_shard_id));
}
}
Some(tenant)
}
Some(TenantSlot::Secondary(secondary_state)) => {
tracing::info!("Shutting down in secondary mode");
secondary_state.shutdown().await;
None
}
Some(TenantSlot::InProgress(_)) => {
// Acquiring a slot guarantees its old value was not InProgress
unreachable!();
}
None => None,
};
match tenant_cleanup
.await
.with_context(|| format!("Failed to run cleanup for tenant {tenant_shard_id}"))
{
Ok(hook_value) => {
// Success: drop the old TenantSlot::Attached.
slot_guard
.drop_old_value()
.expect("We just called shutdown");
Ok(hook_value)
}
Err(e) => {
// If we had a Tenant, set it to Broken and put it back in the TenantsMap
if let Some(attached_tenant) = attached_tenant {
attached_tenant.set_broken(e.to_string()).await;
}
// Leave the broken tenant in the map
slot_guard.revert();
Err(TenantStateError::Other(e))
}
}
}
}
#[derive(Debug, thiserror::Error)]
@@ -2455,7 +2609,7 @@ pub(crate) enum TenantMapError {
/// this tenant to retry later, or wait for the InProgress state to end.
///
/// This structure enforces the important invariant that we do not have overlapping
/// tasks that will try use local storage for a the same tenant ID: we enforce that
/// tasks that will try to use local storage for a the same tenant ID: we enforce that
/// the previous contents of a slot have been shut down before the slot can be
/// left empty or used for something else
///
@@ -2468,7 +2622,7 @@ pub(crate) enum TenantMapError {
/// The `old_value` may be dropped before the SlotGuard is dropped, by calling
/// `drop_old_value`. It is an error to call this without shutting down
/// the conents of `old_value`.
pub(crate) struct SlotGuard {
pub(crate) struct SlotGuard<'a> {
tenant_shard_id: TenantShardId,
old_value: Option<TenantSlot>,
upserted: bool,
@@ -2476,19 +2630,23 @@ pub(crate) struct SlotGuard {
/// [`TenantSlot::InProgress`] carries the corresponding Barrier: it will
/// release any waiters as soon as this SlotGuard is dropped.
completion: utils::completion::Completion,
tenants: &'a std::sync::RwLock<TenantsMap>,
}
impl SlotGuard {
impl<'a> SlotGuard<'a> {
fn new(
tenant_shard_id: TenantShardId,
old_value: Option<TenantSlot>,
completion: utils::completion::Completion,
tenants: &'a std::sync::RwLock<TenantsMap>,
) -> Self {
Self {
tenant_shard_id,
old_value,
upserted: false,
completion,
tenants,
}
}
@@ -2512,8 +2670,8 @@ impl SlotGuard {
));
}
let replaced = {
let mut locked = TENANTS.write().unwrap();
let replaced: Option<TenantSlot> = {
let mut locked = self.tenants.write().unwrap();
if let TenantSlot::InProgress(_) = new_value {
// It is never expected to try and upsert InProgress via this path: it should
@@ -2621,7 +2779,7 @@ impl SlotGuard {
}
}
impl Drop for SlotGuard {
impl<'a> Drop for SlotGuard<'a> {
fn drop(&mut self) {
if self.upserted {
return;
@@ -2629,7 +2787,7 @@ impl Drop for SlotGuard {
// Our old value is already shutdown, or it never existed: it is safe
// for us to fully release the TenantSlot back into an empty state
let mut locked = TENANTS.write().unwrap();
let mut locked = self.tenants.write().unwrap();
let m = match &mut *locked {
TenantsMap::Initializing => {
@@ -2711,151 +2869,6 @@ enum TenantSlotAcquireMode {
MustExist,
}
fn tenant_map_acquire_slot(
tenant_shard_id: &TenantShardId,
mode: TenantSlotAcquireMode,
) -> Result<SlotGuard, TenantSlotError> {
tenant_map_acquire_slot_impl(tenant_shard_id, &TENANTS, mode)
}
fn tenant_map_acquire_slot_impl(
tenant_shard_id: &TenantShardId,
tenants: &std::sync::RwLock<TenantsMap>,
mode: TenantSlotAcquireMode,
) -> Result<SlotGuard, TenantSlotError> {
use TenantSlotAcquireMode::*;
METRICS.tenant_slot_writes.inc();
let mut locked = tenants.write().unwrap();
let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug());
let _guard = span.enter();
let m = match &mut *locked {
TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()),
TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown.into()),
TenantsMap::Open(m) => m,
};
use std::collections::btree_map::Entry;
let entry = m.entry(*tenant_shard_id);
match entry {
Entry::Vacant(v) => match mode {
MustExist => {
tracing::debug!("Vacant && MustExist: return NotFound");
Err(TenantSlotError::NotFound(*tenant_shard_id))
}
_ => {
let (completion, barrier) = utils::completion::channel();
let inserting = TenantSlot::InProgress(barrier);
METRICS.slot_inserted(&inserting);
v.insert(inserting);
tracing::debug!("Vacant, inserted InProgress");
Ok(SlotGuard::new(*tenant_shard_id, None, completion))
}
},
Entry::Occupied(mut o) => {
// Apply mode-driven checks
match (o.get(), mode) {
(TenantSlot::InProgress(_), _) => {
tracing::debug!("Occupied, failing for InProgress");
Err(TenantSlotError::InProgress)
}
_ => {
// Happy case: the slot was not in any state that violated our mode
let (completion, barrier) = utils::completion::channel();
let in_progress = TenantSlot::InProgress(barrier);
METRICS.slot_inserted(&in_progress);
let old_value = o.insert(in_progress);
METRICS.slot_removed(&old_value);
tracing::debug!("Occupied, replaced with InProgress");
Ok(SlotGuard::new(
*tenant_shard_id,
Some(old_value),
completion,
))
}
}
}
}
}
/// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
/// Allows to remove other tenant resources manually, via `tenant_cleanup`.
/// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
/// operation would be needed to remove it.
async fn remove_tenant_from_memory<V, F>(
tenants: &std::sync::RwLock<TenantsMap>,
tenant_shard_id: TenantShardId,
tenant_cleanup: F,
) -> Result<V, TenantStateError>
where
F: std::future::Future<Output = anyhow::Result<V>>,
{
let mut slot_guard =
tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?;
// allow pageserver shutdown to await for our completion
let (_guard, progress) = completion::channel();
// The SlotGuard allows us to manipulate the Tenant object without fear of some
// concurrent API request doing something else for the same tenant ID.
let attached_tenant = match slot_guard.get_old_value() {
Some(TenantSlot::Attached(tenant)) => {
// whenever we remove a tenant from memory, we don't want to flush and wait for upload
let shutdown_mode = ShutdownMode::Hard;
// shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
// that we can continue safely to cleanup.
match tenant.shutdown(progress, shutdown_mode).await {
Ok(()) => {}
Err(_other) => {
// if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
// wait for it but return an error right away because these are distinct requests.
slot_guard.revert();
return Err(TenantStateError::IsStopping(tenant_shard_id));
}
}
Some(tenant)
}
Some(TenantSlot::Secondary(secondary_state)) => {
tracing::info!("Shutting down in secondary mode");
secondary_state.shutdown().await;
None
}
Some(TenantSlot::InProgress(_)) => {
// Acquiring a slot guarantees its old value was not InProgress
unreachable!();
}
None => None,
};
match tenant_cleanup
.await
.with_context(|| format!("Failed to run cleanup for tenant {tenant_shard_id}"))
{
Ok(hook_value) => {
// Success: drop the old TenantSlot::Attached.
slot_guard
.drop_old_value()
.expect("We just called shutdown");
Ok(hook_value)
}
Err(e) => {
// If we had a Tenant, set it to Broken and put it back in the TenantsMap
if let Some(attached_tenant) = attached_tenant {
attached_tenant.set_broken(e.to_string()).await;
}
// Leave the broken tenant in the map
slot_guard.revert();
Err(TenantStateError::Other(e))
}
}
}
use http_utils::error::ApiError;
use pageserver_api::models::TimelineGcRequest;
@@ -2866,11 +2879,15 @@ mod tests {
use std::collections::BTreeMap;
use std::sync::Arc;
use storage_broker::BrokerClientChannel;
use tracing::Instrument;
use super::super::harness::TenantHarness;
use super::TenantsMap;
use crate::tenant::mgr::TenantSlot;
use crate::tenant::{
TenantSharedResources,
mgr::{BackgroundPurges, TenantManager, TenantSlot},
};
#[tokio::test(start_paused = true)]
async fn shutdown_awaits_in_progress_tenant() {
@@ -2891,23 +2908,47 @@ mod tests {
let _e = span.enter();
let tenants = BTreeMap::from([(id, TenantSlot::Attached(t.clone()))]);
let tenants = Arc::new(std::sync::RwLock::new(TenantsMap::Open(tenants)));
// Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually
// permit it to proceed: that will stick the tenant in InProgress
let (basebackup_prepare_sender, _) = tokio::sync::mpsc::unbounded_channel::<
crate::basebackup_cache::BasebackupPrepareRequest,
>();
let tenant_manager = TenantManager {
tenants: std::sync::RwLock::new(TenantsMap::Open(tenants)),
conf: h.conf,
resources: TenantSharedResources {
broker_client: BrokerClientChannel::connect_lazy("foobar.com")
.await
.unwrap(),
remote_storage: h.remote_storage.clone(),
deletion_queue_client: h.deletion_queue.new_client(),
l0_flush_global_state: crate::l0_flush::L0FlushGlobalState::new(
h.conf.l0_flush.clone(),
),
basebackup_prepare_sender,
feature_resolver: crate::feature_resolver::FeatureResolver::new_disabled(),
},
cancel: tokio_util::sync::CancellationToken::new(),
background_purges: BackgroundPurges::default(),
};
let tenant_manager = Arc::new(tenant_manager);
let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel();
let (until_cleanup_started, cleanup_started) = utils::completion::channel();
let mut remove_tenant_from_memory_task = {
let tenant_manager = tenant_manager.clone();
let jh = tokio::spawn({
let tenants = tenants.clone();
async move {
let cleanup = async move {
drop(until_cleanup_started);
can_complete_cleanup.wait().await;
anyhow::Ok(())
};
super::remove_tenant_from_memory(&tenants, id, cleanup).await
tenant_manager.remove_tenant_from_memory(id, cleanup).await
}
.instrument(h.span())
});
@@ -2920,9 +2961,11 @@ mod tests {
let mut shutdown_task = {
let (until_shutdown_started, shutdown_started) = utils::completion::channel();
let tenant_manager = tenant_manager.clone();
let shutdown_task = tokio::spawn(async move {
drop(until_shutdown_started);
super::shutdown_all_tenants0(&tenants).await;
tenant_manager.shutdown_all_tenants0().await;
});
shutdown_started.wait().await;