diff --git a/storage_controller/src/service/safekeeper_reconciler.rs b/storage_controller/src/service/safekeeper_reconciler.rs index 71c73a0112..17bb132982 100644 --- a/storage_controller/src/service/safekeeper_reconciler.rs +++ b/storage_controller/src/service/safekeeper_reconciler.rs @@ -1,4 +1,9 @@ -use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, + str::FromStr, + sync::{Arc, atomic::AtomicU64}, + time::Duration, +}; use clashmap::{ClashMap, Entry}; use safekeeper_api::models::PullTimelineRequest; @@ -169,10 +174,17 @@ pub(crate) struct ScheduleRequest { pub(crate) kind: SafekeeperTimelineOpKind, } +/// A way to keep ongoing/queued reconcile requests apart +#[derive(Copy, Clone, PartialEq, Eq)] +struct TokenId(u64); + +type OngoingTokens = ClashMap<(TenantId, Option), (CancellationToken, TokenId)>; + /// Handle to per safekeeper reconciler. struct ReconcilerHandle { - tx: UnboundedSender<(ScheduleRequest, CancellationToken)>, - ongoing_tokens: Arc), CancellationToken>>, + tx: UnboundedSender<(ScheduleRequest, CancellationToken, TokenId)>, + ongoing_tokens: Arc, + token_id_counter: AtomicU64, cancel: CancellationToken, } @@ -185,24 +197,28 @@ impl ReconcilerHandle { &self, tenant_id: TenantId, timeline_id: Option, - ) -> CancellationToken { + ) -> (CancellationToken, TokenId) { + let token_id = self + .token_id_counter + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let token_id = TokenId(token_id); let entry = self.ongoing_tokens.entry((tenant_id, timeline_id)); if let Entry::Occupied(entry) = &entry { - let cancel: &CancellationToken = entry.get(); + let (cancel, _) = entry.get(); cancel.cancel(); } - entry.insert(self.cancel.child_token()).clone() + entry.insert((self.cancel.child_token(), token_id)).clone() } /// Cancel an ongoing reconciliation fn cancel_reconciliation(&self, tenant_id: TenantId, timeline_id: Option) { - if let Some((_, cancel)) = self.ongoing_tokens.remove(&(tenant_id, timeline_id)) { + if let Some((_, (cancel, _id))) = self.ongoing_tokens.remove(&(tenant_id, timeline_id)) { cancel.cancel(); } } fn schedule_reconcile(&self, req: ScheduleRequest) { - let cancel = self.new_token_slot(req.tenant_id, req.timeline_id); + let (cancel, token_id) = self.new_token_slot(req.tenant_id, req.timeline_id); let hostname = req.safekeeper.skp.host.clone(); - if let Err(err) = self.tx.send((req, cancel)) { + if let Err(err) = self.tx.send((req, cancel, token_id)) { tracing::info!("scheduling request onto {hostname} returned error: {err}"); } } @@ -211,13 +227,14 @@ impl ReconcilerHandle { pub(crate) struct SafekeeperReconciler { inner: SafekeeperReconcilerInner, concurrency_limiter: Arc, - rx: UnboundedReceiver<(ScheduleRequest, CancellationToken)>, + rx: UnboundedReceiver<(ScheduleRequest, CancellationToken, TokenId)>, cancel: CancellationToken, } /// Thin wrapper over `Service` to not clutter its inherent functions #[derive(Clone)] struct SafekeeperReconcilerInner { + ongoing_tokens: Arc, service: Arc, } @@ -226,15 +243,20 @@ impl SafekeeperReconciler { // We hold the ServiceInner lock so we don't want to make sending to the reconciler channel to be blocking. let (tx, rx) = mpsc::unbounded_channel(); let concurrency = service.config.safekeeper_reconciler_concurrency; + let ongoing_tokens = Arc::new(ClashMap::new()); let mut reconciler = SafekeeperReconciler { - inner: SafekeeperReconcilerInner { service }, + inner: SafekeeperReconcilerInner { + service, + ongoing_tokens: ongoing_tokens.clone(), + }, rx, concurrency_limiter: Arc::new(Semaphore::new(concurrency)), cancel: cancel.clone(), }; let handle = ReconcilerHandle { tx, - ongoing_tokens: Arc::new(ClashMap::new()), + ongoing_tokens, + token_id_counter: AtomicU64::new(0), cancel, }; tokio::spawn(async move { reconciler.run().await }); @@ -246,7 +268,9 @@ impl SafekeeperReconciler { req = self.rx.recv() => req, _ = self.cancel.cancelled() => break, }; - let Some((req, req_cancel)) = req else { break }; + let Some((req, req_cancel, req_token_id)) = req else { + break; + }; let permit_res = tokio::select! { req = self.concurrency_limiter.clone().acquire_owned() => req, @@ -265,7 +289,7 @@ impl SafekeeperReconciler { let timeline_id = req.timeline_id; let node_id = req.safekeeper.skp.id; inner - .reconcile_one(req, req_cancel) + .reconcile_one(req, req_cancel, req_token_id) .instrument(tracing::info_span!( "reconcile_one", ?kind, @@ -280,8 +304,14 @@ impl SafekeeperReconciler { } impl SafekeeperReconcilerInner { - async fn reconcile_one(&self, req: ScheduleRequest, req_cancel: CancellationToken) { + async fn reconcile_one( + &self, + req: ScheduleRequest, + req_cancel: CancellationToken, + req_token_id: TokenId, + ) { let req_host = req.safekeeper.skp.host.clone(); + let success; match req.kind { SafekeeperTimelineOpKind::Pull => { let Some(timeline_id) = req.timeline_id else { @@ -302,19 +332,22 @@ impl SafekeeperReconcilerInner { tenant_id: req.tenant_id, timeline_id, }; - self.reconcile_inner( - req, - async |client| client.pull_timeline(&pull_req).await, - |resp| { - if let Some(host) = resp.safekeeper_host { - tracing::info!("pulled timeline from {host} onto {req_host}"); - } else { - tracing::info!("timeline already present on safekeeper on {req_host}"); - } - }, - req_cancel, - ) - .await; + success = self + .reconcile_inner( + &req, + async |client| client.pull_timeline(&pull_req).await, + |resp| { + if let Some(host) = resp.safekeeper_host { + tracing::info!("pulled timeline from {host} onto {req_host}"); + } else { + tracing::info!( + "timeline already present on safekeeper on {req_host}" + ); + } + }, + req_cancel, + ) + .await; } SafekeeperTimelineOpKind::Exclude => { // TODO actually exclude instead of delete here @@ -325,22 +358,23 @@ impl SafekeeperReconcilerInner { ); return; }; - self.reconcile_inner( - req, - async |client| client.delete_timeline(tenant_id, timeline_id).await, - |_resp| { - tracing::info!("deleted timeline from {req_host}"); - }, - req_cancel, - ) - .await; + success = self + .reconcile_inner( + &req, + async |client| client.delete_timeline(tenant_id, timeline_id).await, + |_resp| { + tracing::info!("deleted timeline from {req_host}"); + }, + req_cancel, + ) + .await; } SafekeeperTimelineOpKind::Delete => { let tenant_id = req.tenant_id; if let Some(timeline_id) = req.timeline_id { - let deleted = self + success = self .reconcile_inner( - req, + &req, async |client| client.delete_timeline(tenant_id, timeline_id).await, |_resp| { tracing::info!("deleted timeline from {req_host}"); @@ -348,13 +382,13 @@ impl SafekeeperReconcilerInner { req_cancel, ) .await; - if deleted { + if success { self.delete_timeline_from_db(tenant_id, timeline_id).await; } } else { - let deleted = self + success = self .reconcile_inner( - req, + &req, async |client| client.delete_tenant(tenant_id).await, |_resp| { tracing::info!(%tenant_id, "deleted tenant from {req_host}"); @@ -362,12 +396,21 @@ impl SafekeeperReconcilerInner { req_cancel, ) .await; - if deleted { + if success { self.delete_tenant_timelines_from_db(tenant_id).await; } } } } + if success { + self.ongoing_tokens.remove_if( + &(req.tenant_id, req.timeline_id), + |_ttid, (_cancel, token_id)| { + // Ensure that this request is indeed the request we just finished and not a new one + req_token_id == *token_id + }, + ); + } } async fn delete_timeline_from_db(&self, tenant_id: TenantId, timeline_id: TimelineId) { match self @@ -421,10 +464,10 @@ impl SafekeeperReconcilerInner { self.delete_timeline_from_db(tenant_id, timeline_id).await; } } - /// Returns whether the reconciliation happened successfully + /// Returns whether the reconciliation happened successfully (or we got cancelled) async fn reconcile_inner( &self, - req: ScheduleRequest, + req: &ScheduleRequest, closure: impl Fn(SafekeeperClient) -> F, log_success: impl FnOnce(T) -> U, req_cancel: CancellationToken,