diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index c927b7c366..99b1a1e887 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -1369,6 +1369,65 @@ impl Persistence { Ok(timeline_from_db) } + /// Set `delete_at` for the given timeline + pub(crate) async fn timeline_set_deleted_at( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> DatabaseResult<()> { + use crate::schema::timelines; + + let deletion_time = chrono::Local::now().to_utc(); + self.with_measured_conn(DatabaseOperation::InsertTimeline, move |conn| { + Box::pin(async move { + let updated = diesel::update(timelines::table) + .filter(timelines::tenant_id.eq(tenant_id.to_string())) + .filter(timelines::timeline_id.eq(timeline_id.to_string())) + .set(timelines::deleted_at.eq(Some(deletion_time))) + .execute(conn) + .await?; + + match updated { + 0 => Ok(()), + 1 => Ok(()), + _ => Err(DatabaseError::Logical(format!( + "unexpected number of rows ({})", + updated + ))), + } + }) + }) + .await + } + + /// Load timeline from db. Returns `None` if not present. + /// + /// Only works if `deleted_at` is set, so you should call [`Self::timeline_set_deleted_at`] before. + pub(crate) async fn delete_timeline( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> DatabaseResult<()> { + use crate::schema::timelines::dsl; + + let tenant_id = &tenant_id; + let timeline_id = &timeline_id; + self.with_measured_conn(DatabaseOperation::GetTimeline, move |conn| { + Box::pin(async move { + diesel::delete(dsl::timelines) + .filter(dsl::tenant_id.eq(&tenant_id.to_string())) + .filter(dsl::timeline_id.eq(&timeline_id.to_string())) + .filter(dsl::deleted_at.is_not_null()) + .execute(conn) + .await?; + Ok(()) + }) + }) + .await?; + + Ok(()) + } + /// Loads a list of all timelines from database. pub(crate) async fn list_timelines_for_tenant( &self, @@ -1491,6 +1550,34 @@ impl Persistence { Ok(timeline_from_db) } + /// List pending operations for a given timeline (including tenant-global ones) + pub(crate) async fn list_pending_ops_for_timeline( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> DatabaseResult> { + use crate::schema::safekeeper_timeline_pending_ops::dsl; + + let timelines_from_db = self + .with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| { + Box::pin(async move { + let from_db: Vec = + dsl::safekeeper_timeline_pending_ops + .filter(dsl::tenant_id.eq(tenant_id.to_string())) + .filter( + dsl::timeline_id + .eq(timeline_id.to_string()) + .or(dsl::timeline_id.eq("")), + ) + .load(conn) + .await?; + Ok(from_db) + }) + }) + .await?; + + Ok(timelines_from_db) + } /// Delete all pending ops for the given timeline. /// @@ -1974,7 +2061,7 @@ impl ToSql for LsnWrapper { } } -#[derive(Insertable, AsChangeset, Queryable, Selectable, Clone)] +#[derive(Insertable, AsChangeset, Clone)] #[diesel(table_name = crate::schema::timelines)] pub(crate) struct TimelinePersistence { pub(crate) tenant_id: String, diff --git a/storage_controller/src/service/safekeeper_reconciler.rs b/storage_controller/src/service/safekeeper_reconciler.rs index a60aa6ca53..8e752a8ff1 100644 --- a/storage_controller/src/service/safekeeper_reconciler.rs +++ b/storage_controller/src/service/safekeeper_reconciler.rs @@ -160,9 +160,8 @@ pub(crate) struct ScheduleRequest { } struct ReconcilerHandle { - tx: UnboundedSender<(ScheduleRequest, Arc)>, - #[allow(clippy::type_complexity)] - ongoing_tokens: Arc), Arc>>, + tx: UnboundedSender<(ScheduleRequest, CancellationToken)>, + ongoing_tokens: Arc), CancellationToken>>, cancel: CancellationToken, } @@ -172,13 +171,13 @@ impl ReconcilerHandle { &self, tenant_id: TenantId, timeline_id: Option, - ) -> Arc { + ) -> CancellationToken { let entry = self.ongoing_tokens.entry((tenant_id, timeline_id)); if let Entry::Occupied(entry) = &entry { let cancel: &CancellationToken = entry.get(); cancel.cancel(); } - entry.insert(Arc::new(self.cancel.child_token())).clone() + entry.insert(self.cancel.child_token()).clone() } /// Cancel an ongoing reconciliation fn cancel_reconciliation(&self, tenant_id: TenantId, timeline_id: Option) { @@ -197,7 +196,7 @@ impl ReconcilerHandle { pub(crate) struct SafekeeperReconciler { service: Arc, - rx: UnboundedReceiver<(ScheduleRequest, Arc)>, + rx: UnboundedReceiver<(ScheduleRequest, CancellationToken)>, cancel: CancellationToken, } @@ -243,7 +242,7 @@ impl SafekeeperReconciler { .await; } } - async fn reconcile_one(&self, req: ScheduleRequest, req_cancel: Arc) { + async fn reconcile_one(&self, req: ScheduleRequest, req_cancel: CancellationToken) { let req_host = req.safekeeper.skp.host.clone(); match req.kind { SafekeeperTimelineOpKind::Pull => { @@ -300,36 +299,96 @@ impl SafekeeperReconciler { SafekeeperTimelineOpKind::Delete => { let tenant_id = req.tenant_id; if let Some(timeline_id) = req.timeline_id { - self.reconcile_inner( + let deleted = self.reconcile_inner( req, async |client| client.delete_timeline(tenant_id, timeline_id).await, |_resp| { - tracing::info!("deleted timeline from {req_host}"); + tracing::info!(%tenant_id, %timeline_id, "deleted timeline from {req_host}"); }, req_cancel, ) .await; + if deleted { + self.delete_timeline_from_db(tenant_id, timeline_id).await; + } } else { - self.reconcile_inner( - req, - async |client| client.delete_tenant(tenant_id).await, - |_resp| { - tracing::info!("deleted tenant from {req_host}"); - }, - req_cancel, - ) - .await; + let deleted = self + .reconcile_inner( + req, + async |client| client.delete_tenant(tenant_id).await, + |_resp| { + tracing::info!(%tenant_id, "deleted tenant from {req_host}"); + }, + req_cancel, + ) + .await; + if deleted { + self.delete_tenant_timelines_from_db(tenant_id).await; + } } } } } + async fn delete_timeline_from_db(&self, tenant_id: TenantId, timeline_id: TimelineId) { + match self + .service + .persistence + .list_pending_ops_for_timeline(tenant_id, timeline_id) + .await + { + Ok(list) => { + if !list.is_empty() { + tracing::info!(%tenant_id, %timeline_id, "not deleting timeline from db as there is {} open reconciles", list.len()); + return; + } + } + Err(e) => { + tracing::warn!(%tenant_id, %timeline_id, "couldn't query pending ops: {e}"); + return; + } + } + tracing::info!(%tenant_id, %timeline_id, "deleting timeline from db after all reconciles succeeded"); + // In theory we could crash right after deleting the op from the db and right before reaching this, + // but then we'll boot up with a timeline that has deleted_at set, so hopefully we'll issue deletion ops for it again. + if let Err(err) = self + .service + .persistence + .delete_timeline(tenant_id, timeline_id) + .await + { + tracing::warn!(%tenant_id, %timeline_id, "couldn't delete timeline from db: {err}"); + } + } + async fn delete_tenant_timelines_from_db(&self, tenant_id: TenantId) { + let timeline_list = match self + .service + .persistence + .list_timelines_for_tenant(tenant_id) + .await + { + Ok(timeline_list) => timeline_list, + Err(e) => { + tracing::warn!(%tenant_id, "couldn't query timelines: {e}"); + return; + } + }; + for timeline in timeline_list { + let Ok(timeline_id) = TimelineId::from_str(&timeline.timeline_id) else { + tracing::warn!("Invalid timeline ID in database {}", timeline.timeline_id); + continue; + }; + self.delete_timeline_from_db(tenant_id, timeline_id).await; + } + } + /// Returns whether the reconciliation happened successfully async fn reconcile_inner( &self, req: ScheduleRequest, closure: impl Fn(SafekeeperClient) -> F, log_success: impl FnOnce(T) -> U, - req_cancel: Arc, - ) where + req_cancel: CancellationToken, + ) -> bool + where F: Future>, { let jwt = self @@ -373,11 +432,11 @@ impl SafekeeperReconciler { req.safekeeper.skp.host ); } - return; + return true; } Err(mgmt_api::Error::Cancelled) => { // On cancellation, the code that issued it will take care of removing db entries (if needed) - return; + return false; } Err(e) => { tracing::info!( diff --git a/storage_controller/src/service/safekeeper_service.rs b/storage_controller/src/service/safekeeper_service.rs index 557c684f6b..7f2c63b9af 100644 --- a/storage_controller/src/service/safekeeper_service.rs +++ b/storage_controller/src/service/safekeeper_service.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; @@ -313,25 +313,32 @@ impl Service { ); return Ok(()); }; + self.persistence + .timeline_set_deleted_at(tenant_id, timeline_id) + .await?; let all_sks = tl .new_sk_set .iter() - .flat_map(|sks| { - sks.iter() - .map(|sk| (*sk, SafekeeperTimelineOpKind::Exclude)) - }) - .chain( - tl.sk_set - .iter() - .map(|v| (*v, SafekeeperTimelineOpKind::Delete)), - ) - .collect::>(); + .flatten() + .chain(tl.sk_set.iter()) + .collect::>(); // Schedule reconciliations + for &sk_id in all_sks.iter() { + let pending_op = TimelinePendingOpPersistence { + tenant_id: tenant_id.to_string(), + timeline_id: timeline_id.to_string(), + generation: tl.generation, + op_kind: SafekeeperTimelineOpKind::Delete, + sk_id: *sk_id, + }; + tracing::info!("writing pending op for sk id {sk_id}"); + self.persistence.insert_pending_op(pending_op).await?; + } { let mut locked = self.inner.write().unwrap(); - for (sk_id, kind) in all_sks { - let sk_id = NodeId(sk_id as u64); + for sk_id in all_sks { + let sk_id = NodeId(*sk_id as u64); let Some(sk) = locked.safekeepers.get(&sk_id) else { return Err(ApiError::InternalServerError(anyhow::anyhow!( "Couldn't find safekeeper with id {sk_id}" @@ -345,7 +352,7 @@ impl Service { tenant_id, timeline_id: Some(timeline_id), generation: tl.generation as u32, - kind, + kind: SafekeeperTimelineOpKind::Delete, }; locked.safekeeper_reconcilers.schedule_request(self, req); } @@ -379,32 +386,50 @@ impl Service { }) .collect::, ApiError>>()?; - // Remove pending ops from db. + // Remove pending ops from db, and set `deleted_at`. // We cancel them in a later iteration once we hold the state lock. for (timeline_id, _timeline) in timeline_list.iter() { self.persistence .remove_pending_ops_for_timeline(tenant_id, Some(*timeline_id)) .await?; + self.persistence + .timeline_set_deleted_at(tenant_id, *timeline_id) + .await?; } - let mut locked = self.inner.write().unwrap(); - // The list of safekeepers that have any of the timelines let mut sk_list = HashSet::new(); // List all pending ops for all timelines, cancel them - for (timeline_id, timeline) in timeline_list.iter() { + for (_timeline_id, timeline) in timeline_list.iter() { let sk_iter = timeline .sk_set .iter() .chain(timeline.new_sk_set.iter().flatten()) .map(|id| NodeId(*id as u64)); - for sk_id in sk_iter.clone() { + sk_list.extend(sk_iter); + } + + for &sk_id in sk_list.iter() { + let pending_op = TimelinePendingOpPersistence { + tenant_id: tenant_id.to_string(), + timeline_id: String::new(), + generation: i32::MAX, + op_kind: SafekeeperTimelineOpKind::Delete, + sk_id: sk_id.0 as i64, + }; + tracing::info!("writing pending op for sk id {sk_id}"); + self.persistence.insert_pending_op(pending_op).await?; + } + + let mut locked = self.inner.write().unwrap(); + + for (timeline_id, _timeline) in timeline_list.iter() { + for sk_id in sk_list.iter() { locked .safekeeper_reconcilers - .cancel_reconciles_for_timeline(sk_id, tenant_id, Some(*timeline_id)); + .cancel_reconciles_for_timeline(*sk_id, tenant_id, Some(*timeline_id)); } - sk_list.extend(sk_iter); } // unwrap is safe: we return above for an empty timeline list