mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
storcon: also support tenant deletion for safekeepers (#11289)
If a tenant gets deleted, delete also all of its timelines. We assume that by the time a tenant is being deleted, no new timelines are being created, so we don't need to worry about races with creation in this situation. Unlike #11233, which was very simple because it listed the timelines and invoked timeline deletion, this PR obtains a list of safekeepers to invoke the tenant deletion on, and then invokes tenant deletion on each safekeeper that has one or multiple timelines. Alternative to #11233 Builds on #11288 Part of #9011
This commit is contained in:
@@ -120,6 +120,12 @@ impl Client {
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TimelineDeleteResult> {
|
||||
let uri = format!("{}/v1/tenant/{}", self.mgmt_api_endpoint, tenant_id);
|
||||
let resp = self.request(Method::DELETE, &uri, ()).await?;
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn bump_timeline_term(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
|
||||
@@ -1367,6 +1367,34 @@ impl Persistence {
|
||||
|
||||
Ok(timeline_from_db)
|
||||
}
|
||||
|
||||
/// Loads a list of all timelines from database.
|
||||
pub(crate) async fn list_timelines_for_tenant(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> DatabaseResult<Vec<TimelinePersistence>> {
|
||||
use crate::schema::timelines::dsl;
|
||||
|
||||
let tenant_id = &tenant_id;
|
||||
let timelines = self
|
||||
.with_measured_conn(DatabaseOperation::GetTimeline, move |conn| {
|
||||
Box::pin(async move {
|
||||
let timelines: Vec<TimelineFromDb> = dsl::timelines
|
||||
.filter(dsl::tenant_id.eq(&tenant_id.to_string()))
|
||||
.load(conn)
|
||||
.await?;
|
||||
Ok(timelines)
|
||||
})
|
||||
})
|
||||
.await?;
|
||||
|
||||
let timelines = timelines
|
||||
.into_iter()
|
||||
.map(TimelineFromDb::into_persistence)
|
||||
.collect();
|
||||
Ok(timelines)
|
||||
}
|
||||
|
||||
/// Persist pending op. Returns if it was newly inserted. If it wasn't, we haven't done any writes.
|
||||
pub(crate) async fn insert_pending_op(
|
||||
&self,
|
||||
@@ -1409,7 +1437,7 @@ impl Persistence {
|
||||
pub(crate) async fn remove_pending_op(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
timeline_id: Option<TimelineId>,
|
||||
sk_id: NodeId,
|
||||
generation: u32,
|
||||
) -> DatabaseResult<()> {
|
||||
@@ -1418,10 +1446,11 @@ impl Persistence {
|
||||
let tenant_id = &tenant_id;
|
||||
let timeline_id = &timeline_id;
|
||||
self.with_measured_conn(DatabaseOperation::RemoveTimelineReconcile, move |conn| {
|
||||
let timeline_id_str = timeline_id.map(|tid| tid.to_string()).unwrap_or_default();
|
||||
Box::pin(async move {
|
||||
diesel::delete(dsl::safekeeper_timeline_pending_ops)
|
||||
.filter(dsl::tenant_id.eq(tenant_id.to_string()))
|
||||
.filter(dsl::timeline_id.eq(timeline_id.to_string()))
|
||||
.filter(dsl::timeline_id.eq(timeline_id_str))
|
||||
.filter(dsl::sk_id.eq(sk_id.0 as i64))
|
||||
.filter(dsl::generation.eq(generation as i32))
|
||||
.execute(conn)
|
||||
@@ -1461,6 +1490,34 @@ impl Persistence {
|
||||
|
||||
Ok(timeline_from_db)
|
||||
}
|
||||
|
||||
/// Delete all pending ops for the given timeline.
|
||||
///
|
||||
/// Use this only at timeline deletion, otherwise use generation based APIs
|
||||
pub(crate) async fn remove_pending_ops_for_timeline(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: Option<TimelineId>,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::safekeeper_timeline_pending_ops::dsl;
|
||||
|
||||
let tenant_id = &tenant_id;
|
||||
let timeline_id = &timeline_id;
|
||||
self.with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| {
|
||||
let timeline_id_str = timeline_id.map(|tid| tid.to_string()).unwrap_or_default();
|
||||
Box::pin(async move {
|
||||
diesel::delete(dsl::safekeeper_timeline_pending_ops)
|
||||
.filter(dsl::tenant_id.eq(tenant_id.to_string()))
|
||||
.filter(dsl::timeline_id.eq(timeline_id_str))
|
||||
.execute(conn)
|
||||
.await?;
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {
|
||||
|
||||
@@ -98,6 +98,18 @@ impl SafekeeperClient {
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_tenant(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<models::TimelineDeleteResult> {
|
||||
measured_request!(
|
||||
"delete_tenant",
|
||||
crate::metrics::Method::Delete,
|
||||
&self.node_id_label,
|
||||
self.inner.delete_tenant(tenant_id).await
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn pull_timeline(
|
||||
&self,
|
||||
req: &PullTimelineRequest,
|
||||
|
||||
@@ -3323,7 +3323,10 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_delete(&self, tenant_id: TenantId) -> Result<StatusCode, ApiError> {
|
||||
pub(crate) async fn tenant_delete(
|
||||
self: &Arc<Self>,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<StatusCode, ApiError> {
|
||||
let _tenant_lock =
|
||||
trace_exclusive_lock(&self.tenant_op_locks, tenant_id, TenantOperations::Delete).await;
|
||||
|
||||
@@ -3431,6 +3434,11 @@ impl Service {
|
||||
);
|
||||
};
|
||||
|
||||
// Delete the tenant from safekeepers (if needed)
|
||||
self.tenant_delete_safekeepers(tenant_id)
|
||||
.instrument(tracing::info_span!("tenant_delete_safekeepers", %tenant_id))
|
||||
.await?;
|
||||
|
||||
// Success is represented as 404, to imitate the existing pageserver deletion API
|
||||
Ok(StatusCode::NOT_FOUND)
|
||||
}
|
||||
|
||||
@@ -51,6 +51,22 @@ impl SafekeeperReconcilers {
|
||||
handle.cancel.cancel();
|
||||
}
|
||||
}
|
||||
/// Cancel ongoing reconciles for the given timeline
|
||||
///
|
||||
/// Specifying `None` here only removes reconciles for the tenant-global reconciliation,
|
||||
/// instead of doing this for all timelines of the tenant.
|
||||
///
|
||||
/// Callers must remove the reconciles from the db manually
|
||||
pub(crate) fn cancel_reconciles_for_timeline(
|
||||
&mut self,
|
||||
node_id: NodeId,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: Option<TimelineId>,
|
||||
) {
|
||||
if let Some(handle) = self.reconcilers.get(&node_id) {
|
||||
handle.cancel_reconciliation(tenant_id, timeline_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Initial load of the pending operations from the db
|
||||
@@ -73,12 +89,21 @@ pub(crate) async fn load_schedule_requests(
|
||||
};
|
||||
let sk = Box::new(sk.clone());
|
||||
let tenant_id = TenantId::from_str(&op_persist.tenant_id)?;
|
||||
let timeline_id = TimelineId::from_str(&op_persist.timeline_id)?;
|
||||
let timeline_id = if !op_persist.timeline_id.is_empty() {
|
||||
Some(TimelineId::from_str(&op_persist.timeline_id)?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let host_list = match op_persist.op_kind {
|
||||
SafekeeperTimelineOpKind::Delete => Vec::new(),
|
||||
SafekeeperTimelineOpKind::Exclude => Vec::new(),
|
||||
SafekeeperTimelineOpKind::Pull => {
|
||||
// TODO this code is super hacky, it doesn't take migrations into account
|
||||
let Some(timeline_id) = timeline_id else {
|
||||
anyhow::bail!(
|
||||
"timeline_id is empty for `pull` schedule request for {tenant_id}"
|
||||
);
|
||||
};
|
||||
let timeline_persist = service
|
||||
.persistence
|
||||
.get_timeline(tenant_id, timeline_id)
|
||||
@@ -129,14 +154,15 @@ pub(crate) struct ScheduleRequest {
|
||||
pub(crate) safekeeper: Box<Safekeeper>,
|
||||
pub(crate) host_list: Vec<(NodeId, String)>,
|
||||
pub(crate) tenant_id: TenantId,
|
||||
pub(crate) timeline_id: TimelineId,
|
||||
pub(crate) timeline_id: Option<TimelineId>,
|
||||
pub(crate) generation: u32,
|
||||
pub(crate) kind: SafekeeperTimelineOpKind,
|
||||
}
|
||||
|
||||
struct ReconcilerHandle {
|
||||
tx: UnboundedSender<(ScheduleRequest, Arc<CancellationToken>)>,
|
||||
ongoing_tokens: Arc<ClashMap<(TenantId, TimelineId), Arc<CancellationToken>>>,
|
||||
#[allow(clippy::type_complexity)]
|
||||
ongoing_tokens: Arc<ClashMap<(TenantId, Option<TimelineId>), Arc<CancellationToken>>>,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
@@ -145,7 +171,7 @@ impl ReconcilerHandle {
|
||||
fn new_token_slot(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
timeline_id: Option<TimelineId>,
|
||||
) -> Arc<CancellationToken> {
|
||||
let entry = self.ongoing_tokens.entry((tenant_id, timeline_id));
|
||||
if let Entry::Occupied(entry) = &entry {
|
||||
@@ -154,6 +180,12 @@ impl ReconcilerHandle {
|
||||
}
|
||||
entry.insert(Arc::new(self.cancel.child_token())).clone()
|
||||
}
|
||||
/// Cancel an ongoing reconciliation
|
||||
fn cancel_reconciliation(&self, tenant_id: TenantId, timeline_id: Option<TimelineId>) {
|
||||
if let Some((_, cancel)) = self.ongoing_tokens.remove(&(tenant_id, timeline_id)) {
|
||||
cancel.cancel();
|
||||
}
|
||||
}
|
||||
fn schedule_reconcile(&self, req: ScheduleRequest) {
|
||||
let cancel = self.new_token_slot(req.tenant_id, req.timeline_id);
|
||||
let hostname = req.safekeeper.skp.host.clone();
|
||||
@@ -206,7 +238,7 @@ impl SafekeeperReconciler {
|
||||
"reconcile_one",
|
||||
?kind,
|
||||
%tenant_id,
|
||||
%timeline_id
|
||||
?timeline_id
|
||||
))
|
||||
.await;
|
||||
}
|
||||
@@ -215,6 +247,12 @@ impl SafekeeperReconciler {
|
||||
let req_host = req.safekeeper.skp.host.clone();
|
||||
match req.kind {
|
||||
SafekeeperTimelineOpKind::Pull => {
|
||||
let Some(timeline_id) = req.timeline_id else {
|
||||
tracing::warn!(
|
||||
"ignoring invalid schedule request: timeline_id is empty for `pull`"
|
||||
);
|
||||
return;
|
||||
};
|
||||
let our_id = req.safekeeper.get_id();
|
||||
let http_hosts = req
|
||||
.host_list
|
||||
@@ -225,7 +263,7 @@ impl SafekeeperReconciler {
|
||||
let pull_req = PullTimelineRequest {
|
||||
http_hosts,
|
||||
tenant_id: req.tenant_id,
|
||||
timeline_id: req.timeline_id,
|
||||
timeline_id,
|
||||
};
|
||||
self.reconcile_inner(
|
||||
req,
|
||||
@@ -243,7 +281,12 @@ impl SafekeeperReconciler {
|
||||
SafekeeperTimelineOpKind::Exclude => {
|
||||
// TODO actually exclude instead of delete here
|
||||
let tenant_id = req.tenant_id;
|
||||
let timeline_id = req.timeline_id;
|
||||
let Some(timeline_id) = req.timeline_id else {
|
||||
tracing::warn!(
|
||||
"ignoring invalid schedule request: timeline_id is empty for `exclude`"
|
||||
);
|
||||
return;
|
||||
};
|
||||
self.reconcile_inner(
|
||||
req,
|
||||
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|
||||
@@ -256,16 +299,27 @@ impl SafekeeperReconciler {
|
||||
}
|
||||
SafekeeperTimelineOpKind::Delete => {
|
||||
let tenant_id = req.tenant_id;
|
||||
let timeline_id = req.timeline_id;
|
||||
self.reconcile_inner(
|
||||
req,
|
||||
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|
||||
|_resp| {
|
||||
tracing::info!("deleted timeline from {req_host}");
|
||||
},
|
||||
req_cancel,
|
||||
)
|
||||
.await;
|
||||
if let Some(timeline_id) = req.timeline_id {
|
||||
self.reconcile_inner(
|
||||
req,
|
||||
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|
||||
|_resp| {
|
||||
tracing::info!("deleted timeline from {req_host}");
|
||||
},
|
||||
req_cancel,
|
||||
)
|
||||
.await;
|
||||
} else {
|
||||
self.reconcile_inner(
|
||||
req,
|
||||
async |client| client.delete_tenant(tenant_id).await,
|
||||
|_resp| {
|
||||
tracing::info!("deleted tenant from {req_host}");
|
||||
},
|
||||
req_cancel,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -8,6 +9,7 @@ use crate::persistence::{
|
||||
DatabaseError, SafekeeperTimelineOpKind, TimelinePendingOpPersistence, TimelinePersistence,
|
||||
};
|
||||
use crate::safekeeper::Safekeeper;
|
||||
use anyhow::Context;
|
||||
use http_utils::error::ApiError;
|
||||
use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
|
||||
use pageserver_api::models::{self, SafekeeperInfo, SafekeepersInfo, TimelineInfo};
|
||||
@@ -265,7 +267,8 @@ impl Service {
|
||||
.get(&sk.id)
|
||||
.ok_or_else(|| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Couldn't find safekeeper with id {remaining_id} to pull from"
|
||||
"Couldn't find safekeeper with id {} to pull from",
|
||||
sk.id
|
||||
))
|
||||
})?
|
||||
.base_url(),
|
||||
@@ -279,7 +282,7 @@ impl Service {
|
||||
safekeeper: Box::new(sk.clone()),
|
||||
host_list,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
timeline_id: Some(timeline_id),
|
||||
generation: timeline_persist.generation as u32,
|
||||
kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
|
||||
};
|
||||
@@ -340,7 +343,7 @@ impl Service {
|
||||
// we don't use this for this kind, put a dummy value
|
||||
host_list: Vec::new(),
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
timeline_id: Some(timeline_id),
|
||||
generation: tl.generation as u32,
|
||||
kind,
|
||||
};
|
||||
@@ -350,6 +353,86 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Perform tenant deletion on safekeepers.
|
||||
pub(super) async fn tenant_delete_safekeepers(
|
||||
self: &Arc<Self>,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<(), ApiError> {
|
||||
let timeline_list = self
|
||||
.persistence
|
||||
.list_timelines_for_tenant(tenant_id)
|
||||
.await?;
|
||||
|
||||
if timeline_list.is_empty() {
|
||||
// Early exit: the tenant is either empty or not migrated to the storcon yet
|
||||
tracing::info!("Skipping tenant delete as the timeline doesn't exist in db");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let timeline_list = timeline_list
|
||||
.into_iter()
|
||||
.map(|timeline| {
|
||||
let timeline_id = TimelineId::from_str(&timeline.timeline_id)
|
||||
.context("timeline id loaded from db")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
Ok((timeline_id, timeline))
|
||||
})
|
||||
.collect::<Result<Vec<_>, ApiError>>()?;
|
||||
|
||||
// Remove pending ops from db.
|
||||
// We cancel them in a later iteration once we hold the state lock.
|
||||
for (timeline_id, _timeline) in timeline_list.iter() {
|
||||
self.persistence
|
||||
.remove_pending_ops_for_timeline(tenant_id, Some(*timeline_id))
|
||||
.await?;
|
||||
}
|
||||
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
|
||||
// The list of safekeepers that have any of the timelines
|
||||
let mut sk_list = HashSet::new();
|
||||
|
||||
// List all pending ops for all timelines, cancel them
|
||||
for (timeline_id, timeline) in timeline_list.iter() {
|
||||
let sk_iter = timeline
|
||||
.sk_set
|
||||
.iter()
|
||||
.chain(timeline.new_sk_set.iter().flatten())
|
||||
.map(|id| NodeId(*id as u64));
|
||||
for sk_id in sk_iter.clone() {
|
||||
locked
|
||||
.safekeeper_reconcilers
|
||||
.cancel_reconciles_for_timeline(sk_id, tenant_id, Some(*timeline_id));
|
||||
}
|
||||
sk_list.extend(sk_iter);
|
||||
}
|
||||
|
||||
// unwrap is safe: we return above for an empty timeline list
|
||||
let max_generation = timeline_list
|
||||
.iter()
|
||||
.map(|(_tl_id, tl)| tl.generation as u32)
|
||||
.max()
|
||||
.unwrap();
|
||||
|
||||
for sk_id in sk_list {
|
||||
let Some(safekeeper) = locked.safekeepers.get(&sk_id) else {
|
||||
tracing::warn!("Couldn't find safekeeper with id {sk_id}");
|
||||
continue;
|
||||
};
|
||||
// Add pending op for tenant deletion
|
||||
let req = ScheduleRequest {
|
||||
generation: max_generation,
|
||||
host_list: Vec::new(),
|
||||
kind: SafekeeperTimelineOpKind::Delete,
|
||||
safekeeper: Box::new(safekeeper.clone()),
|
||||
tenant_id,
|
||||
timeline_id: None,
|
||||
};
|
||||
locked.safekeeper_reconcilers.schedule_request(self, req);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Choose safekeepers for the new timeline: 3 in different azs.
|
||||
pub(crate) async fn safekeepers_for_new_timeline(
|
||||
&self,
|
||||
|
||||
Reference in New Issue
Block a user