mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-23 16:10:37 +00:00
pageserver: TenantManager support for SecondaryTenant
This commit is contained in:
@@ -39,6 +39,8 @@ use utils::generation::Generation;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::delete::DeleteTenantError;
|
||||
use super::secondary::SecondaryTenant;
|
||||
use super::storage_layer::Layer;
|
||||
use super::timeline::delete::DeleteTimelineFlow;
|
||||
use super::TenantSharedResources;
|
||||
|
||||
@@ -53,7 +55,7 @@ use super::TenantSharedResources;
|
||||
/// having a properly acquired generation (Secondary doesn't need a generation)
|
||||
pub(crate) enum TenantSlot {
|
||||
Attached(Arc<Tenant>),
|
||||
Secondary,
|
||||
Secondary(Arc<SecondaryTenant>),
|
||||
/// In this state, other administrative operations acting on the TenantId should
|
||||
/// block, or return a retry indicator equivalent to HTTP 503.
|
||||
InProgress(utils::completion::Barrier),
|
||||
@@ -63,7 +65,7 @@ impl std::fmt::Debug for TenantSlot {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Attached(tenant) => write!(f, "Attached({})", tenant.current_state()),
|
||||
Self::Secondary => write!(f, "Secondary"),
|
||||
Self::Secondary(_) => write!(f, "Secondary"),
|
||||
Self::InProgress(_) => write!(f, "InProgress"),
|
||||
}
|
||||
}
|
||||
@@ -74,7 +76,7 @@ impl TenantSlot {
|
||||
fn get_attached(&self) -> Option<&Arc<Tenant>> {
|
||||
match self {
|
||||
Self::Attached(t) => Some(t),
|
||||
Self::Secondary => None,
|
||||
Self::Secondary(_) => None,
|
||||
Self::InProgress(_) => None,
|
||||
}
|
||||
}
|
||||
@@ -427,7 +429,10 @@ pub async fn init_tenant_mgr(
|
||||
// tenants, because they do no remote writes and hence require no
|
||||
// generation number
|
||||
info!(%tenant_id, "Loaded tenant in secondary mode");
|
||||
tenants.insert(tenant_id, TenantSlot::Secondary);
|
||||
tenants.insert(
|
||||
tenant_id,
|
||||
TenantSlot::Secondary(SecondaryTenant::new(tenant_id)),
|
||||
);
|
||||
}
|
||||
LocationMode::Attached(_) => {
|
||||
// TODO: augment re-attach API to enable the control plane to
|
||||
@@ -586,8 +591,14 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
|
||||
tenants_to_shut_down.push(t.clone());
|
||||
shutdown_state.insert(k, TenantSlot::Attached(t));
|
||||
}
|
||||
TenantSlot::Secondary => {
|
||||
shutdown_state.insert(k, TenantSlot::Secondary);
|
||||
TenantSlot::Secondary(state) => {
|
||||
// We don't need to wait for this individually per-tenant: the
|
||||
// downloader task will be waited on eventually, this cancel
|
||||
// is just to encourage it to drop out if it is doing work
|
||||
// for this tenant right now.
|
||||
state.cancel.cancel();
|
||||
|
||||
shutdown_state.insert(k, TenantSlot::Secondary(state));
|
||||
}
|
||||
TenantSlot::InProgress(notify) => {
|
||||
// InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
|
||||
@@ -764,6 +775,12 @@ pub(crate) async fn set_new_tenant_config(
|
||||
}
|
||||
|
||||
impl TenantManager {
|
||||
/// As a convenience to avoid carrying a configuration reference separately, anyone who
|
||||
/// as a reference to the TenantManager can access configuration this way.
|
||||
pub(crate) fn get_conf(&self) -> &'static PageServerConf {
|
||||
self.conf
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(%tenant_id))]
|
||||
pub(crate) async fn upsert_location(
|
||||
&self,
|
||||
@@ -811,43 +828,57 @@ impl TenantManager {
|
||||
// not do significant I/O, and shutdowns should be prompt via cancellation tokens.
|
||||
let mut slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::Any)?;
|
||||
|
||||
if let Some(TenantSlot::Attached(tenant)) = slot_guard.get_old_value() {
|
||||
// The case where we keep a Tenant alive was covered above in the special case
|
||||
// for Attached->Attached transitions in the same generation. By this point,
|
||||
// if we see an attached tenant we know it will be discarded and should be
|
||||
// shut down.
|
||||
let (_guard, progress) = utils::completion::channel();
|
||||
match slot_guard.get_old_value() {
|
||||
Some(TenantSlot::Attached(tenant)) => {
|
||||
// The case where we keep a Tenant alive was covered above in the special case
|
||||
// for Attached->Attached transitions in the same generation. By this point,
|
||||
// if we see an attached tenant we know it will be discarded and should be
|
||||
// shut down.
|
||||
let (_guard, progress) = utils::completion::channel();
|
||||
|
||||
match tenant.get_attach_mode() {
|
||||
AttachmentMode::Single | AttachmentMode::Multi => {
|
||||
// Before we leave our state as the presumed holder of the latest generation,
|
||||
// flush any outstanding deletions to reduce the risk of leaking objects.
|
||||
self.resources.deletion_queue_client.flush_advisory()
|
||||
}
|
||||
AttachmentMode::Stale => {
|
||||
// If we're stale there's not point trying to flush deletions
|
||||
}
|
||||
};
|
||||
match tenant.get_attach_mode() {
|
||||
AttachmentMode::Single | AttachmentMode::Multi => {
|
||||
// Before we leave our state as the presumed holder of the latest generation,
|
||||
// flush any outstanding deletions to reduce the risk of leaking objects.
|
||||
self.resources.deletion_queue_client.flush_advisory()
|
||||
}
|
||||
AttachmentMode::Stale => {
|
||||
// If we're stale there's not point trying to flush deletions
|
||||
}
|
||||
};
|
||||
|
||||
info!("Shutting down attached tenant");
|
||||
match tenant.shutdown(progress, false).await {
|
||||
Ok(()) => {}
|
||||
Err(barrier) => {
|
||||
info!("Shutdown already in progress, waiting for it to complete");
|
||||
barrier.wait().await;
|
||||
info!("Shutting down attached tenant");
|
||||
match tenant.shutdown(progress, false).await {
|
||||
Ok(()) => {}
|
||||
Err(barrier) => {
|
||||
info!("Shutdown already in progress, waiting for it to complete");
|
||||
barrier.wait().await;
|
||||
}
|
||||
}
|
||||
slot_guard.drop_old_value().expect("We just shut it down");
|
||||
}
|
||||
Some(TenantSlot::Secondary(state)) => {
|
||||
info!("Shutting down secondary tenant");
|
||||
state.shutdown().await;
|
||||
}
|
||||
Some(TenantSlot::InProgress(_)) => {
|
||||
// This should never happen: acquire_slot should error out
|
||||
// if the contents of a slot were InProgress.
|
||||
anyhow::bail!("Acquired an InProgress slot, this is a bug.")
|
||||
}
|
||||
None => {
|
||||
// Slot was vacant, nothing needs shutting down.
|
||||
}
|
||||
slot_guard.drop_old_value().expect("We just shut it down");
|
||||
}
|
||||
|
||||
let tenant_path = self.conf.tenant_path(&tenant_id);
|
||||
let timelines_path = self.conf.timelines_path(&tenant_id);
|
||||
|
||||
let new_slot = match &new_location_config.mode {
|
||||
LocationMode::Secondary(_) => {
|
||||
let tenant_path = self.conf.tenant_path(&tenant_id);
|
||||
// Directory doesn't need to be fsync'd because if we crash it can
|
||||
// safely be recreated next time this tenant location is configured.
|
||||
unsafe_create_dir_all(&tenant_path)
|
||||
unsafe_create_dir_all(&timelines_path)
|
||||
.await
|
||||
.with_context(|| format!("Creating {tenant_path}"))?;
|
||||
|
||||
@@ -855,11 +886,9 @@ impl TenantManager {
|
||||
.await
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
|
||||
TenantSlot::Secondary
|
||||
TenantSlot::Secondary(SecondaryTenant::new(tenant_id))
|
||||
}
|
||||
LocationMode::Attached(_attach_config) => {
|
||||
let timelines_path = self.conf.timelines_path(&tenant_id);
|
||||
|
||||
// Directory doesn't need to be fsync'd because we do not depend on
|
||||
// it to exist after crashes: it may be recreated when tenant is
|
||||
// re-attached, see https://github.com/neondatabase/neon/issues/5550
|
||||
@@ -891,6 +920,97 @@ impl TenantManager {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Do some synchronous work for all tenant slots in Secondary state. The provided
|
||||
// callback should be small and fast, as it will be called inside the global
|
||||
// TenantsMap lock.
|
||||
pub(crate) fn foreach_secondary_tenants<F>(&self, mut func: F)
|
||||
where
|
||||
// TODO: let the callback return a hint to drop out of the loop early
|
||||
F: FnMut(&TenantId, &Arc<SecondaryTenant>),
|
||||
{
|
||||
let locked = self.tenants.read().unwrap();
|
||||
|
||||
let map = match &*locked {
|
||||
TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => return,
|
||||
TenantsMap::Open(m) => m,
|
||||
};
|
||||
|
||||
for (tenant_id, slot) in map {
|
||||
if let TenantSlot::Secondary(state) = slot {
|
||||
// Only expose secondary tenants that are not currently shutting down
|
||||
if !state.cancel.is_cancelled() {
|
||||
func(tenant_id, state)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_attached_tenants(&self) -> Vec<Arc<Tenant>> {
|
||||
let locked = self.tenants.read().unwrap();
|
||||
|
||||
let map = match &*locked {
|
||||
TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => return Vec::new(),
|
||||
TenantsMap::Open(m) => m,
|
||||
};
|
||||
|
||||
map.iter()
|
||||
.filter_map(|(_k, v)| match v {
|
||||
TenantSlot::Attached(t) => Some(t.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Having planned some evictions for a tenant, attempt to execute them.
|
||||
///
|
||||
/// Execution will not occur if the TenantSlot for this tenant is not in
|
||||
/// a state suitable to execute.
|
||||
// TODO: is Layer really needed here? Maybe we should have reduced to a LayerFileName by this point.
|
||||
pub(crate) async fn evict_tenant_layers(
|
||||
&self,
|
||||
tenant_id: &TenantId,
|
||||
timeline_layers: Vec<(TimelineId, Layer)>,
|
||||
) {
|
||||
// TODO: unify with how we evict for attached tenants. They should also
|
||||
// pass through here, to avoid attached tenant evictions racing with
|
||||
// the lifetime of secondary locations for the same tenant ID.
|
||||
|
||||
let state = {
|
||||
let locked = self.tenants.read().unwrap();
|
||||
let map = match &*locked {
|
||||
TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => return,
|
||||
TenantsMap::Open(m) => m,
|
||||
};
|
||||
|
||||
match map.get(tenant_id) {
|
||||
Some(TenantSlot::Secondary(secondary_state)) => {
|
||||
// Found a secondary as expected
|
||||
secondary_state.clone()
|
||||
}
|
||||
_ => {
|
||||
// A location configuration change raced with this eviction
|
||||
tracing::info!(
|
||||
"Dropping {} layer evictions, tenant not in suitable state",
|
||||
timeline_layers.len()
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Concurrency: downloads might have been going on while we deleted layers. However,
|
||||
// we are only deleting layers that the SecondaryTenant already thought were on disk,
|
||||
// so we won't be deleting anything that it is _currently_ downloading. All deletions
|
||||
// of SecondaryTenant layers flow through this function, so there is no risk that the
|
||||
// layer we're evicting is no longer present in-memory.
|
||||
state
|
||||
.evict_layers(self.conf, tenant_id, timeline_layers)
|
||||
.instrument(tracing::info_span!("evict_layers",
|
||||
%tenant_id
|
||||
))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -937,7 +1057,7 @@ pub(crate) fn get_tenant(
|
||||
}
|
||||
},
|
||||
Some(TenantSlot::InProgress(_)) => Err(GetTenantError::NotActive(tenant_id)),
|
||||
None | Some(TenantSlot::Secondary) => Err(GetTenantError::NotFound(tenant_id)),
|
||||
None | Some(TenantSlot::Secondary(_)) => Err(GetTenantError::NotFound(tenant_id)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -996,7 +1116,7 @@ pub(crate) async fn get_active_tenant_with_timeout(
|
||||
_ => WaitFor::Tenant(tenant.clone()),
|
||||
}
|
||||
}
|
||||
Some(TenantSlot::Secondary) => {
|
||||
Some(TenantSlot::Secondary(_)) => {
|
||||
return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
|
||||
tenant_id,
|
||||
)))
|
||||
@@ -1303,7 +1423,7 @@ pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, Tenan
|
||||
Ok(m.iter()
|
||||
.filter_map(|(id, tenant)| match tenant {
|
||||
TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())),
|
||||
TenantSlot::Secondary => None,
|
||||
TenantSlot::Secondary(_) => None,
|
||||
TenantSlot::InProgress(_) => None,
|
||||
})
|
||||
.collect())
|
||||
@@ -1553,11 +1673,7 @@ impl SlotGuard {
|
||||
fn old_value_is_shutdown(&self) -> bool {
|
||||
match self.old_value.as_ref() {
|
||||
Some(TenantSlot::Attached(tenant)) => tenant.gate.close_complete(),
|
||||
Some(TenantSlot::Secondary) => {
|
||||
// TODO: when adding secondary mode tenants, this will check for shutdown
|
||||
// in the same way that we do for `Tenant` above
|
||||
true
|
||||
}
|
||||
Some(TenantSlot::Secondary(secondary_tenant)) => secondary_tenant.gate.close_complete(),
|
||||
Some(TenantSlot::InProgress(_)) => {
|
||||
// A SlotGuard cannot be constructed for a slot that was already InProgress
|
||||
unreachable!()
|
||||
@@ -1761,26 +1877,19 @@ where
|
||||
let mut slot_guard =
|
||||
tenant_map_acquire_slot_impl(&tenant_id, tenants, TenantSlotAcquireMode::MustExist)?;
|
||||
|
||||
// The SlotGuard allows us to manipulate the Tenant object without fear of some
|
||||
// concurrent API request doing something else for the same tenant ID.
|
||||
let attached_tenant = match slot_guard.get_old_value() {
|
||||
Some(TenantSlot::Attached(t)) => Some(t),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
// allow pageserver shutdown to await for our completion
|
||||
let (_guard, progress) = completion::channel();
|
||||
|
||||
// If the tenant was attached, shut it down gracefully. For secondary
|
||||
// locations this part is not necessary
|
||||
match &attached_tenant {
|
||||
Some(attached_tenant) => {
|
||||
// The SlotGuard allows us to manipulate the Tenant object without fear of some
|
||||
// concurrent API request doing something else for the same tenant ID.
|
||||
let attached_tenant = match slot_guard.get_old_value() {
|
||||
Some(TenantSlot::Attached(tenant)) => {
|
||||
// whenever we remove a tenant from memory, we don't want to flush and wait for upload
|
||||
let freeze_and_flush = false;
|
||||
|
||||
// shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
|
||||
// that we can continue safely to cleanup.
|
||||
match attached_tenant.shutdown(progress, freeze_and_flush).await {
|
||||
match tenant.shutdown(progress, freeze_and_flush).await {
|
||||
Ok(()) => {}
|
||||
Err(_other) => {
|
||||
// if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
|
||||
@@ -1789,11 +1898,19 @@ where
|
||||
return Err(TenantStateError::IsStopping(tenant_id));
|
||||
}
|
||||
}
|
||||
Some(tenant)
|
||||
}
|
||||
None => {
|
||||
// Nothing to wait on when not attached, proceed.
|
||||
Some(TenantSlot::Secondary(secondary_state)) => {
|
||||
tracing::info!("Shutting down in secondary mode");
|
||||
secondary_state.shutdown().await;
|
||||
None
|
||||
}
|
||||
}
|
||||
Some(TenantSlot::InProgress(_)) => {
|
||||
// Acquiring a slot guarantees its old value was not InProgress
|
||||
unreachable!();
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
match tenant_cleanup
|
||||
.await
|
||||
|
||||
Reference in New Issue
Block a user