diff --git a/safekeeper/client/src/mgmt_api.rs b/safekeeper/client/src/mgmt_api.rs index 424cd89221..b5e407995d 100644 --- a/safekeeper/client/src/mgmt_api.rs +++ b/safekeeper/client/src/mgmt_api.rs @@ -120,6 +120,12 @@ impl Client { resp.json().await.map_err(Error::ReceiveBody) } + pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result { + let uri = format!("{}/v1/tenant/{}", self.mgmt_api_endpoint, tenant_id); + let resp = self.request(Method::DELETE, &uri, ()).await?; + resp.json().await.map_err(Error::ReceiveBody) + } + pub async fn bump_timeline_term( &self, tenant_id: TenantId, diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 85d9c574a1..0e6f80a060 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -1367,6 +1367,34 @@ impl Persistence { Ok(timeline_from_db) } + + /// Loads a list of all timelines from database. + pub(crate) async fn list_timelines_for_tenant( + &self, + tenant_id: TenantId, + ) -> DatabaseResult> { + use crate::schema::timelines::dsl; + + let tenant_id = &tenant_id; + let timelines = self + .with_measured_conn(DatabaseOperation::GetTimeline, move |conn| { + Box::pin(async move { + let timelines: Vec = dsl::timelines + .filter(dsl::tenant_id.eq(&tenant_id.to_string())) + .load(conn) + .await?; + Ok(timelines) + }) + }) + .await?; + + let timelines = timelines + .into_iter() + .map(TimelineFromDb::into_persistence) + .collect(); + Ok(timelines) + } + /// Persist pending op. Returns if it was newly inserted. If it wasn't, we haven't done any writes. pub(crate) async fn insert_pending_op( &self, @@ -1409,7 +1437,7 @@ impl Persistence { pub(crate) async fn remove_pending_op( &self, tenant_id: TenantId, - timeline_id: TimelineId, + timeline_id: Option, sk_id: NodeId, generation: u32, ) -> DatabaseResult<()> { @@ -1418,10 +1446,11 @@ impl Persistence { let tenant_id = &tenant_id; let timeline_id = &timeline_id; self.with_measured_conn(DatabaseOperation::RemoveTimelineReconcile, move |conn| { + let timeline_id_str = timeline_id.map(|tid| tid.to_string()).unwrap_or_default(); Box::pin(async move { diesel::delete(dsl::safekeeper_timeline_pending_ops) .filter(dsl::tenant_id.eq(tenant_id.to_string())) - .filter(dsl::timeline_id.eq(timeline_id.to_string())) + .filter(dsl::timeline_id.eq(timeline_id_str)) .filter(dsl::sk_id.eq(sk_id.0 as i64)) .filter(dsl::generation.eq(generation as i32)) .execute(conn) @@ -1461,6 +1490,34 @@ impl Persistence { Ok(timeline_from_db) } + + /// Delete all pending ops for the given timeline. + /// + /// Use this only at timeline deletion, otherwise use generation based APIs + pub(crate) async fn remove_pending_ops_for_timeline( + &self, + tenant_id: TenantId, + timeline_id: Option, + ) -> DatabaseResult<()> { + use crate::schema::safekeeper_timeline_pending_ops::dsl; + + let tenant_id = &tenant_id; + let timeline_id = &timeline_id; + self.with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| { + let timeline_id_str = timeline_id.map(|tid| tid.to_string()).unwrap_or_default(); + Box::pin(async move { + diesel::delete(dsl::safekeeper_timeline_pending_ops) + .filter(dsl::tenant_id.eq(tenant_id.to_string())) + .filter(dsl::timeline_id.eq(timeline_id_str)) + .execute(conn) + .await?; + Ok(()) + }) + }) + .await?; + + Ok(()) + } } pub(crate) fn load_certs() -> anyhow::Result> { diff --git a/storage_controller/src/safekeeper_client.rs b/storage_controller/src/safekeeper_client.rs index b30237e404..98e3f74071 100644 --- a/storage_controller/src/safekeeper_client.rs +++ b/storage_controller/src/safekeeper_client.rs @@ -98,6 +98,18 @@ impl SafekeeperClient { ) } + pub(crate) async fn delete_tenant( + &self, + tenant_id: TenantId, + ) -> Result { + measured_request!( + "delete_tenant", + crate::metrics::Method::Delete, + &self.node_id_label, + self.inner.delete_tenant(tenant_id).await + ) + } + pub(crate) async fn pull_timeline( &self, req: &PullTimelineRequest, diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 087b77a957..82efbe94f7 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -3323,7 +3323,10 @@ impl Service { } } - pub(crate) async fn tenant_delete(&self, tenant_id: TenantId) -> Result { + pub(crate) async fn tenant_delete( + self: &Arc, + tenant_id: TenantId, + ) -> Result { let _tenant_lock = trace_exclusive_lock(&self.tenant_op_locks, tenant_id, TenantOperations::Delete).await; @@ -3431,6 +3434,11 @@ impl Service { ); }; + // Delete the tenant from safekeepers (if needed) + self.tenant_delete_safekeepers(tenant_id) + .instrument(tracing::info_span!("tenant_delete_safekeepers", %tenant_id)) + .await?; + // Success is represented as 404, to imitate the existing pageserver deletion API Ok(StatusCode::NOT_FOUND) } diff --git a/storage_controller/src/service/safekeeper_reconciler.rs b/storage_controller/src/service/safekeeper_reconciler.rs index 4fa465c307..3f92bb391e 100644 --- a/storage_controller/src/service/safekeeper_reconciler.rs +++ b/storage_controller/src/service/safekeeper_reconciler.rs @@ -51,6 +51,22 @@ impl SafekeeperReconcilers { handle.cancel.cancel(); } } + /// Cancel ongoing reconciles for the given timeline + /// + /// Specifying `None` here only removes reconciles for the tenant-global reconciliation, + /// instead of doing this for all timelines of the tenant. + /// + /// Callers must remove the reconciles from the db manually + pub(crate) fn cancel_reconciles_for_timeline( + &mut self, + node_id: NodeId, + tenant_id: TenantId, + timeline_id: Option, + ) { + if let Some(handle) = self.reconcilers.get(&node_id) { + handle.cancel_reconciliation(tenant_id, timeline_id); + } + } } /// Initial load of the pending operations from the db @@ -73,12 +89,21 @@ pub(crate) async fn load_schedule_requests( }; let sk = Box::new(sk.clone()); let tenant_id = TenantId::from_str(&op_persist.tenant_id)?; - let timeline_id = TimelineId::from_str(&op_persist.timeline_id)?; + let timeline_id = if !op_persist.timeline_id.is_empty() { + Some(TimelineId::from_str(&op_persist.timeline_id)?) + } else { + None + }; let host_list = match op_persist.op_kind { SafekeeperTimelineOpKind::Delete => Vec::new(), SafekeeperTimelineOpKind::Exclude => Vec::new(), SafekeeperTimelineOpKind::Pull => { // TODO this code is super hacky, it doesn't take migrations into account + let Some(timeline_id) = timeline_id else { + anyhow::bail!( + "timeline_id is empty for `pull` schedule request for {tenant_id}" + ); + }; let timeline_persist = service .persistence .get_timeline(tenant_id, timeline_id) @@ -129,14 +154,15 @@ pub(crate) struct ScheduleRequest { pub(crate) safekeeper: Box, pub(crate) host_list: Vec<(NodeId, String)>, pub(crate) tenant_id: TenantId, - pub(crate) timeline_id: TimelineId, + pub(crate) timeline_id: Option, pub(crate) generation: u32, pub(crate) kind: SafekeeperTimelineOpKind, } struct ReconcilerHandle { tx: UnboundedSender<(ScheduleRequest, Arc)>, - ongoing_tokens: Arc>>, + #[allow(clippy::type_complexity)] + ongoing_tokens: Arc), Arc>>, cancel: CancellationToken, } @@ -145,7 +171,7 @@ impl ReconcilerHandle { fn new_token_slot( &self, tenant_id: TenantId, - timeline_id: TimelineId, + timeline_id: Option, ) -> Arc { let entry = self.ongoing_tokens.entry((tenant_id, timeline_id)); if let Entry::Occupied(entry) = &entry { @@ -154,6 +180,12 @@ impl ReconcilerHandle { } entry.insert(Arc::new(self.cancel.child_token())).clone() } + /// Cancel an ongoing reconciliation + fn cancel_reconciliation(&self, tenant_id: TenantId, timeline_id: Option) { + if let Some((_, cancel)) = self.ongoing_tokens.remove(&(tenant_id, timeline_id)) { + cancel.cancel(); + } + } fn schedule_reconcile(&self, req: ScheduleRequest) { let cancel = self.new_token_slot(req.tenant_id, req.timeline_id); let hostname = req.safekeeper.skp.host.clone(); @@ -206,7 +238,7 @@ impl SafekeeperReconciler { "reconcile_one", ?kind, %tenant_id, - %timeline_id + ?timeline_id )) .await; } @@ -215,6 +247,12 @@ impl SafekeeperReconciler { let req_host = req.safekeeper.skp.host.clone(); match req.kind { SafekeeperTimelineOpKind::Pull => { + let Some(timeline_id) = req.timeline_id else { + tracing::warn!( + "ignoring invalid schedule request: timeline_id is empty for `pull`" + ); + return; + }; let our_id = req.safekeeper.get_id(); let http_hosts = req .host_list @@ -225,7 +263,7 @@ impl SafekeeperReconciler { let pull_req = PullTimelineRequest { http_hosts, tenant_id: req.tenant_id, - timeline_id: req.timeline_id, + timeline_id, }; self.reconcile_inner( req, @@ -243,7 +281,12 @@ impl SafekeeperReconciler { SafekeeperTimelineOpKind::Exclude => { // TODO actually exclude instead of delete here let tenant_id = req.tenant_id; - let timeline_id = req.timeline_id; + let Some(timeline_id) = req.timeline_id else { + tracing::warn!( + "ignoring invalid schedule request: timeline_id is empty for `exclude`" + ); + return; + }; self.reconcile_inner( req, async |client| client.delete_timeline(tenant_id, timeline_id).await, @@ -256,16 +299,27 @@ impl SafekeeperReconciler { } SafekeeperTimelineOpKind::Delete => { let tenant_id = req.tenant_id; - let timeline_id = req.timeline_id; - self.reconcile_inner( - req, - async |client| client.delete_timeline(tenant_id, timeline_id).await, - |_resp| { - tracing::info!("deleted timeline from {req_host}"); - }, - req_cancel, - ) - .await; + if let Some(timeline_id) = req.timeline_id { + self.reconcile_inner( + req, + async |client| client.delete_timeline(tenant_id, timeline_id).await, + |_resp| { + tracing::info!("deleted timeline from {req_host}"); + }, + req_cancel, + ) + .await; + } else { + self.reconcile_inner( + req, + async |client| client.delete_tenant(tenant_id).await, + |_resp| { + tracing::info!("deleted tenant from {req_host}"); + }, + req_cancel, + ) + .await; + } } } } diff --git a/storage_controller/src/service/safekeeper_service.rs b/storage_controller/src/service/safekeeper_service.rs index b5fb00a469..bcfd035883 100644 --- a/storage_controller/src/service/safekeeper_service.rs +++ b/storage_controller/src/service/safekeeper_service.rs @@ -1,4 +1,5 @@ use std::collections::{HashMap, HashSet}; +use std::str::FromStr; use std::sync::Arc; use std::time::Duration; @@ -8,6 +9,7 @@ use crate::persistence::{ DatabaseError, SafekeeperTimelineOpKind, TimelinePendingOpPersistence, TimelinePersistence, }; use crate::safekeeper::Safekeeper; +use anyhow::Context; use http_utils::error::ApiError; use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy}; use pageserver_api::models::{self, SafekeeperInfo, SafekeepersInfo, TimelineInfo}; @@ -265,7 +267,8 @@ impl Service { .get(&sk.id) .ok_or_else(|| { ApiError::InternalServerError(anyhow::anyhow!( - "Couldn't find safekeeper with id {remaining_id} to pull from" + "Couldn't find safekeeper with id {} to pull from", + sk.id )) })? .base_url(), @@ -279,7 +282,7 @@ impl Service { safekeeper: Box::new(sk.clone()), host_list, tenant_id, - timeline_id, + timeline_id: Some(timeline_id), generation: timeline_persist.generation as u32, kind: crate::persistence::SafekeeperTimelineOpKind::Pull, }; @@ -340,7 +343,7 @@ impl Service { // we don't use this for this kind, put a dummy value host_list: Vec::new(), tenant_id, - timeline_id, + timeline_id: Some(timeline_id), generation: tl.generation as u32, kind, }; @@ -350,6 +353,86 @@ impl Service { Ok(()) } + /// Perform tenant deletion on safekeepers. + pub(super) async fn tenant_delete_safekeepers( + self: &Arc, + tenant_id: TenantId, + ) -> Result<(), ApiError> { + let timeline_list = self + .persistence + .list_timelines_for_tenant(tenant_id) + .await?; + + if timeline_list.is_empty() { + // Early exit: the tenant is either empty or not migrated to the storcon yet + tracing::info!("Skipping tenant delete as the timeline doesn't exist in db"); + return Ok(()); + } + + let timeline_list = timeline_list + .into_iter() + .map(|timeline| { + let timeline_id = TimelineId::from_str(&timeline.timeline_id) + .context("timeline id loaded from db") + .map_err(ApiError::InternalServerError)?; + Ok((timeline_id, timeline)) + }) + .collect::, ApiError>>()?; + + // Remove pending ops from db. + // We cancel them in a later iteration once we hold the state lock. + for (timeline_id, _timeline) in timeline_list.iter() { + self.persistence + .remove_pending_ops_for_timeline(tenant_id, Some(*timeline_id)) + .await?; + } + + let mut locked = self.inner.write().unwrap(); + + // The list of safekeepers that have any of the timelines + let mut sk_list = HashSet::new(); + + // List all pending ops for all timelines, cancel them + for (timeline_id, timeline) in timeline_list.iter() { + let sk_iter = timeline + .sk_set + .iter() + .chain(timeline.new_sk_set.iter().flatten()) + .map(|id| NodeId(*id as u64)); + for sk_id in sk_iter.clone() { + locked + .safekeeper_reconcilers + .cancel_reconciles_for_timeline(sk_id, tenant_id, Some(*timeline_id)); + } + sk_list.extend(sk_iter); + } + + // unwrap is safe: we return above for an empty timeline list + let max_generation = timeline_list + .iter() + .map(|(_tl_id, tl)| tl.generation as u32) + .max() + .unwrap(); + + for sk_id in sk_list { + let Some(safekeeper) = locked.safekeepers.get(&sk_id) else { + tracing::warn!("Couldn't find safekeeper with id {sk_id}"); + continue; + }; + // Add pending op for tenant deletion + let req = ScheduleRequest { + generation: max_generation, + host_list: Vec::new(), + kind: SafekeeperTimelineOpKind::Delete, + safekeeper: Box::new(safekeeper.clone()), + tenant_id, + timeline_id: None, + }; + locked.safekeeper_reconcilers.schedule_request(self, req); + } + Ok(()) + } + /// Choose safekeepers for the new timeline: 3 in different azs. pub(crate) async fn safekeepers_for_new_timeline( &self,