storcon: remove finished safekeeper reconciliations from in-memory hashmap (#11876)

## Problem

Currently there is a memory leak, in that finished safekeeper
reconciliations leave a cancellation token behind which is never cleaned
up.

## Summary of changes

The change adds cleanup after finishing of a reconciliation. In order to
ensure we remove the correct cancellation token, and we haven't raced
with another reconciliation, we introduce a `TokenId` counter to tell
tokens apart.

Part of https://github.com/neondatabase/neon/issues/11670
This commit is contained in:
Arpad Müller
2025-05-09 15:34:22 +02:00
committed by GitHub
parent 93b964f829
commit 33abfc2b74

View File

@@ -1,4 +1,9 @@
use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
use std::{
collections::HashMap,
str::FromStr,
sync::{Arc, atomic::AtomicU64},
time::Duration,
};
use clashmap::{ClashMap, Entry};
use safekeeper_api::models::PullTimelineRequest;
@@ -169,10 +174,17 @@ pub(crate) struct ScheduleRequest {
pub(crate) kind: SafekeeperTimelineOpKind,
}
/// A way to keep ongoing/queued reconcile requests apart
#[derive(Copy, Clone, PartialEq, Eq)]
struct TokenId(u64);
type OngoingTokens = ClashMap<(TenantId, Option<TimelineId>), (CancellationToken, TokenId)>;
/// Handle to per safekeeper reconciler.
struct ReconcilerHandle {
tx: UnboundedSender<(ScheduleRequest, CancellationToken)>,
ongoing_tokens: Arc<ClashMap<(TenantId, Option<TimelineId>), CancellationToken>>,
tx: UnboundedSender<(ScheduleRequest, CancellationToken, TokenId)>,
ongoing_tokens: Arc<OngoingTokens>,
token_id_counter: AtomicU64,
cancel: CancellationToken,
}
@@ -185,24 +197,28 @@ impl ReconcilerHandle {
&self,
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
) -> CancellationToken {
) -> (CancellationToken, TokenId) {
let token_id = self
.token_id_counter
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let token_id = TokenId(token_id);
let entry = self.ongoing_tokens.entry((tenant_id, timeline_id));
if let Entry::Occupied(entry) = &entry {
let cancel: &CancellationToken = entry.get();
let (cancel, _) = entry.get();
cancel.cancel();
}
entry.insert(self.cancel.child_token()).clone()
entry.insert((self.cancel.child_token(), token_id)).clone()
}
/// Cancel an ongoing reconciliation
fn cancel_reconciliation(&self, tenant_id: TenantId, timeline_id: Option<TimelineId>) {
if let Some((_, cancel)) = self.ongoing_tokens.remove(&(tenant_id, timeline_id)) {
if let Some((_, (cancel, _id))) = self.ongoing_tokens.remove(&(tenant_id, timeline_id)) {
cancel.cancel();
}
}
fn schedule_reconcile(&self, req: ScheduleRequest) {
let cancel = self.new_token_slot(req.tenant_id, req.timeline_id);
let (cancel, token_id) = self.new_token_slot(req.tenant_id, req.timeline_id);
let hostname = req.safekeeper.skp.host.clone();
if let Err(err) = self.tx.send((req, cancel)) {
if let Err(err) = self.tx.send((req, cancel, token_id)) {
tracing::info!("scheduling request onto {hostname} returned error: {err}");
}
}
@@ -211,13 +227,14 @@ impl ReconcilerHandle {
pub(crate) struct SafekeeperReconciler {
inner: SafekeeperReconcilerInner,
concurrency_limiter: Arc<Semaphore>,
rx: UnboundedReceiver<(ScheduleRequest, CancellationToken)>,
rx: UnboundedReceiver<(ScheduleRequest, CancellationToken, TokenId)>,
cancel: CancellationToken,
}
/// Thin wrapper over `Service` to not clutter its inherent functions
#[derive(Clone)]
struct SafekeeperReconcilerInner {
ongoing_tokens: Arc<OngoingTokens>,
service: Arc<Service>,
}
@@ -226,15 +243,20 @@ impl SafekeeperReconciler {
// We hold the ServiceInner lock so we don't want to make sending to the reconciler channel to be blocking.
let (tx, rx) = mpsc::unbounded_channel();
let concurrency = service.config.safekeeper_reconciler_concurrency;
let ongoing_tokens = Arc::new(ClashMap::new());
let mut reconciler = SafekeeperReconciler {
inner: SafekeeperReconcilerInner { service },
inner: SafekeeperReconcilerInner {
service,
ongoing_tokens: ongoing_tokens.clone(),
},
rx,
concurrency_limiter: Arc::new(Semaphore::new(concurrency)),
cancel: cancel.clone(),
};
let handle = ReconcilerHandle {
tx,
ongoing_tokens: Arc::new(ClashMap::new()),
ongoing_tokens,
token_id_counter: AtomicU64::new(0),
cancel,
};
tokio::spawn(async move { reconciler.run().await });
@@ -246,7 +268,9 @@ impl SafekeeperReconciler {
req = self.rx.recv() => req,
_ = self.cancel.cancelled() => break,
};
let Some((req, req_cancel)) = req else { break };
let Some((req, req_cancel, req_token_id)) = req else {
break;
};
let permit_res = tokio::select! {
req = self.concurrency_limiter.clone().acquire_owned() => req,
@@ -265,7 +289,7 @@ impl SafekeeperReconciler {
let timeline_id = req.timeline_id;
let node_id = req.safekeeper.skp.id;
inner
.reconcile_one(req, req_cancel)
.reconcile_one(req, req_cancel, req_token_id)
.instrument(tracing::info_span!(
"reconcile_one",
?kind,
@@ -280,8 +304,14 @@ impl SafekeeperReconciler {
}
impl SafekeeperReconcilerInner {
async fn reconcile_one(&self, req: ScheduleRequest, req_cancel: CancellationToken) {
async fn reconcile_one(
&self,
req: ScheduleRequest,
req_cancel: CancellationToken,
req_token_id: TokenId,
) {
let req_host = req.safekeeper.skp.host.clone();
let success;
match req.kind {
SafekeeperTimelineOpKind::Pull => {
let Some(timeline_id) = req.timeline_id else {
@@ -302,19 +332,22 @@ impl SafekeeperReconcilerInner {
tenant_id: req.tenant_id,
timeline_id,
};
self.reconcile_inner(
req,
async |client| client.pull_timeline(&pull_req).await,
|resp| {
if let Some(host) = resp.safekeeper_host {
tracing::info!("pulled timeline from {host} onto {req_host}");
} else {
tracing::info!("timeline already present on safekeeper on {req_host}");
}
},
req_cancel,
)
.await;
success = self
.reconcile_inner(
&req,
async |client| client.pull_timeline(&pull_req).await,
|resp| {
if let Some(host) = resp.safekeeper_host {
tracing::info!("pulled timeline from {host} onto {req_host}");
} else {
tracing::info!(
"timeline already present on safekeeper on {req_host}"
);
}
},
req_cancel,
)
.await;
}
SafekeeperTimelineOpKind::Exclude => {
// TODO actually exclude instead of delete here
@@ -325,22 +358,23 @@ impl SafekeeperReconcilerInner {
);
return;
};
self.reconcile_inner(
req,
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|_resp| {
tracing::info!("deleted timeline from {req_host}");
},
req_cancel,
)
.await;
success = self
.reconcile_inner(
&req,
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|_resp| {
tracing::info!("deleted timeline from {req_host}");
},
req_cancel,
)
.await;
}
SafekeeperTimelineOpKind::Delete => {
let tenant_id = req.tenant_id;
if let Some(timeline_id) = req.timeline_id {
let deleted = self
success = self
.reconcile_inner(
req,
&req,
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|_resp| {
tracing::info!("deleted timeline from {req_host}");
@@ -348,13 +382,13 @@ impl SafekeeperReconcilerInner {
req_cancel,
)
.await;
if deleted {
if success {
self.delete_timeline_from_db(tenant_id, timeline_id).await;
}
} else {
let deleted = self
success = self
.reconcile_inner(
req,
&req,
async |client| client.delete_tenant(tenant_id).await,
|_resp| {
tracing::info!(%tenant_id, "deleted tenant from {req_host}");
@@ -362,12 +396,21 @@ impl SafekeeperReconcilerInner {
req_cancel,
)
.await;
if deleted {
if success {
self.delete_tenant_timelines_from_db(tenant_id).await;
}
}
}
}
if success {
self.ongoing_tokens.remove_if(
&(req.tenant_id, req.timeline_id),
|_ttid, (_cancel, token_id)| {
// Ensure that this request is indeed the request we just finished and not a new one
req_token_id == *token_id
},
);
}
}
async fn delete_timeline_from_db(&self, tenant_id: TenantId, timeline_id: TimelineId) {
match self
@@ -421,10 +464,10 @@ impl SafekeeperReconcilerInner {
self.delete_timeline_from_db(tenant_id, timeline_id).await;
}
}
/// Returns whether the reconciliation happened successfully
/// Returns whether the reconciliation happened successfully (or we got cancelled)
async fn reconcile_inner<T, F, U>(
&self,
req: ScheduleRequest,
req: &ScheduleRequest,
closure: impl Fn(SafekeeperClient) -> F,
log_success: impl FnOnce(T) -> U,
req_cancel: CancellationToken,