storcon: timeline deletion improvements and fixes (#11334)

This PR contains a bunch of smaller followups and fixes of the original
PR #11058. most of these implement suggestions from Arseny:

* remove `Queryable, Selectable` from `TimelinePersistence`: they are
not needed.
* no `Arc` around `CancellationToken`: it itself is an arc wrapper
* only schedule deletes instead of scheduling excludes and deletes
* persist and delete deletion ops
* delete rows in timelines table upon tenant and timeline deletion
* set `deleted_at` for timelines we are deleting before we start any
reconciles: this flag will help us later to recognize half-executed
deletions, or when we crashed before we could remove the timeline row
but after we removed the last pending op (handling these situations are
left for later).

Part of #9011
This commit is contained in:
Arpad Müller
2025-04-01 17:16:33 +02:00
committed by GitHub
parent 016068b966
commit 1fad1abb24
3 changed files with 215 additions and 44 deletions

View File

@@ -1369,6 +1369,65 @@ impl Persistence {
Ok(timeline_from_db)
}
/// Set `delete_at` for the given timeline
pub(crate) async fn timeline_set_deleted_at(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> DatabaseResult<()> {
use crate::schema::timelines;
let deletion_time = chrono::Local::now().to_utc();
self.with_measured_conn(DatabaseOperation::InsertTimeline, move |conn| {
Box::pin(async move {
let updated = diesel::update(timelines::table)
.filter(timelines::tenant_id.eq(tenant_id.to_string()))
.filter(timelines::timeline_id.eq(timeline_id.to_string()))
.set(timelines::deleted_at.eq(Some(deletion_time)))
.execute(conn)
.await?;
match updated {
0 => Ok(()),
1 => Ok(()),
_ => Err(DatabaseError::Logical(format!(
"unexpected number of rows ({})",
updated
))),
}
})
})
.await
}
/// Load timeline from db. Returns `None` if not present.
///
/// Only works if `deleted_at` is set, so you should call [`Self::timeline_set_deleted_at`] before.
pub(crate) async fn delete_timeline(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> DatabaseResult<()> {
use crate::schema::timelines::dsl;
let tenant_id = &tenant_id;
let timeline_id = &timeline_id;
self.with_measured_conn(DatabaseOperation::GetTimeline, move |conn| {
Box::pin(async move {
diesel::delete(dsl::timelines)
.filter(dsl::tenant_id.eq(&tenant_id.to_string()))
.filter(dsl::timeline_id.eq(&timeline_id.to_string()))
.filter(dsl::deleted_at.is_not_null())
.execute(conn)
.await?;
Ok(())
})
})
.await?;
Ok(())
}
/// Loads a list of all timelines from database.
pub(crate) async fn list_timelines_for_tenant(
&self,
@@ -1491,6 +1550,34 @@ impl Persistence {
Ok(timeline_from_db)
}
/// List pending operations for a given timeline (including tenant-global ones)
pub(crate) async fn list_pending_ops_for_timeline(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> DatabaseResult<Vec<TimelinePendingOpPersistence>> {
use crate::schema::safekeeper_timeline_pending_ops::dsl;
let timelines_from_db = self
.with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| {
Box::pin(async move {
let from_db: Vec<TimelinePendingOpPersistence> =
dsl::safekeeper_timeline_pending_ops
.filter(dsl::tenant_id.eq(tenant_id.to_string()))
.filter(
dsl::timeline_id
.eq(timeline_id.to_string())
.or(dsl::timeline_id.eq("")),
)
.load(conn)
.await?;
Ok(from_db)
})
})
.await?;
Ok(timelines_from_db)
}
/// Delete all pending ops for the given timeline.
///
@@ -1974,7 +2061,7 @@ impl ToSql<crate::schema::sql_types::PgLsn, Pg> for LsnWrapper {
}
}
#[derive(Insertable, AsChangeset, Queryable, Selectable, Clone)]
#[derive(Insertable, AsChangeset, Clone)]
#[diesel(table_name = crate::schema::timelines)]
pub(crate) struct TimelinePersistence {
pub(crate) tenant_id: String,

View File

@@ -160,9 +160,8 @@ pub(crate) struct ScheduleRequest {
}
struct ReconcilerHandle {
tx: UnboundedSender<(ScheduleRequest, Arc<CancellationToken>)>,
#[allow(clippy::type_complexity)]
ongoing_tokens: Arc<ClashMap<(TenantId, Option<TimelineId>), Arc<CancellationToken>>>,
tx: UnboundedSender<(ScheduleRequest, CancellationToken)>,
ongoing_tokens: Arc<ClashMap<(TenantId, Option<TimelineId>), CancellationToken>>,
cancel: CancellationToken,
}
@@ -172,13 +171,13 @@ impl ReconcilerHandle {
&self,
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
) -> Arc<CancellationToken> {
) -> CancellationToken {
let entry = self.ongoing_tokens.entry((tenant_id, timeline_id));
if let Entry::Occupied(entry) = &entry {
let cancel: &CancellationToken = entry.get();
cancel.cancel();
}
entry.insert(Arc::new(self.cancel.child_token())).clone()
entry.insert(self.cancel.child_token()).clone()
}
/// Cancel an ongoing reconciliation
fn cancel_reconciliation(&self, tenant_id: TenantId, timeline_id: Option<TimelineId>) {
@@ -197,7 +196,7 @@ impl ReconcilerHandle {
pub(crate) struct SafekeeperReconciler {
service: Arc<Service>,
rx: UnboundedReceiver<(ScheduleRequest, Arc<CancellationToken>)>,
rx: UnboundedReceiver<(ScheduleRequest, CancellationToken)>,
cancel: CancellationToken,
}
@@ -243,7 +242,7 @@ impl SafekeeperReconciler {
.await;
}
}
async fn reconcile_one(&self, req: ScheduleRequest, req_cancel: Arc<CancellationToken>) {
async fn reconcile_one(&self, req: ScheduleRequest, req_cancel: CancellationToken) {
let req_host = req.safekeeper.skp.host.clone();
match req.kind {
SafekeeperTimelineOpKind::Pull => {
@@ -300,36 +299,96 @@ impl SafekeeperReconciler {
SafekeeperTimelineOpKind::Delete => {
let tenant_id = req.tenant_id;
if let Some(timeline_id) = req.timeline_id {
self.reconcile_inner(
let deleted = self.reconcile_inner(
req,
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|_resp| {
tracing::info!("deleted timeline from {req_host}");
tracing::info!(%tenant_id, %timeline_id, "deleted timeline from {req_host}");
},
req_cancel,
)
.await;
if deleted {
self.delete_timeline_from_db(tenant_id, timeline_id).await;
}
} else {
self.reconcile_inner(
req,
async |client| client.delete_tenant(tenant_id).await,
|_resp| {
tracing::info!("deleted tenant from {req_host}");
},
req_cancel,
)
.await;
let deleted = self
.reconcile_inner(
req,
async |client| client.delete_tenant(tenant_id).await,
|_resp| {
tracing::info!(%tenant_id, "deleted tenant from {req_host}");
},
req_cancel,
)
.await;
if deleted {
self.delete_tenant_timelines_from_db(tenant_id).await;
}
}
}
}
}
async fn delete_timeline_from_db(&self, tenant_id: TenantId, timeline_id: TimelineId) {
match self
.service
.persistence
.list_pending_ops_for_timeline(tenant_id, timeline_id)
.await
{
Ok(list) => {
if !list.is_empty() {
tracing::info!(%tenant_id, %timeline_id, "not deleting timeline from db as there is {} open reconciles", list.len());
return;
}
}
Err(e) => {
tracing::warn!(%tenant_id, %timeline_id, "couldn't query pending ops: {e}");
return;
}
}
tracing::info!(%tenant_id, %timeline_id, "deleting timeline from db after all reconciles succeeded");
// In theory we could crash right after deleting the op from the db and right before reaching this,
// but then we'll boot up with a timeline that has deleted_at set, so hopefully we'll issue deletion ops for it again.
if let Err(err) = self
.service
.persistence
.delete_timeline(tenant_id, timeline_id)
.await
{
tracing::warn!(%tenant_id, %timeline_id, "couldn't delete timeline from db: {err}");
}
}
async fn delete_tenant_timelines_from_db(&self, tenant_id: TenantId) {
let timeline_list = match self
.service
.persistence
.list_timelines_for_tenant(tenant_id)
.await
{
Ok(timeline_list) => timeline_list,
Err(e) => {
tracing::warn!(%tenant_id, "couldn't query timelines: {e}");
return;
}
};
for timeline in timeline_list {
let Ok(timeline_id) = TimelineId::from_str(&timeline.timeline_id) else {
tracing::warn!("Invalid timeline ID in database {}", timeline.timeline_id);
continue;
};
self.delete_timeline_from_db(tenant_id, timeline_id).await;
}
}
/// Returns whether the reconciliation happened successfully
async fn reconcile_inner<T, F, U>(
&self,
req: ScheduleRequest,
closure: impl Fn(SafekeeperClient) -> F,
log_success: impl FnOnce(T) -> U,
req_cancel: Arc<CancellationToken>,
) where
req_cancel: CancellationToken,
) -> bool
where
F: Future<Output = Result<T, safekeeper_client::mgmt_api::Error>>,
{
let jwt = self
@@ -373,11 +432,11 @@ impl SafekeeperReconciler {
req.safekeeper.skp.host
);
}
return;
return true;
}
Err(mgmt_api::Error::Cancelled) => {
// On cancellation, the code that issued it will take care of removing db entries (if needed)
return;
return false;
}
Err(e) => {
tracing::info!(

View File

@@ -1,4 +1,4 @@
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
@@ -313,25 +313,32 @@ impl Service {
);
return Ok(());
};
self.persistence
.timeline_set_deleted_at(tenant_id, timeline_id)
.await?;
let all_sks = tl
.new_sk_set
.iter()
.flat_map(|sks| {
sks.iter()
.map(|sk| (*sk, SafekeeperTimelineOpKind::Exclude))
})
.chain(
tl.sk_set
.iter()
.map(|v| (*v, SafekeeperTimelineOpKind::Delete)),
)
.collect::<HashMap<_, _>>();
.flatten()
.chain(tl.sk_set.iter())
.collect::<HashSet<_>>();
// Schedule reconciliations
for &sk_id in all_sks.iter() {
let pending_op = TimelinePendingOpPersistence {
tenant_id: tenant_id.to_string(),
timeline_id: timeline_id.to_string(),
generation: tl.generation,
op_kind: SafekeeperTimelineOpKind::Delete,
sk_id: *sk_id,
};
tracing::info!("writing pending op for sk id {sk_id}");
self.persistence.insert_pending_op(pending_op).await?;
}
{
let mut locked = self.inner.write().unwrap();
for (sk_id, kind) in all_sks {
let sk_id = NodeId(sk_id as u64);
for sk_id in all_sks {
let sk_id = NodeId(*sk_id as u64);
let Some(sk) = locked.safekeepers.get(&sk_id) else {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Couldn't find safekeeper with id {sk_id}"
@@ -345,7 +352,7 @@ impl Service {
tenant_id,
timeline_id: Some(timeline_id),
generation: tl.generation as u32,
kind,
kind: SafekeeperTimelineOpKind::Delete,
};
locked.safekeeper_reconcilers.schedule_request(self, req);
}
@@ -379,32 +386,50 @@ impl Service {
})
.collect::<Result<Vec<_>, ApiError>>()?;
// Remove pending ops from db.
// Remove pending ops from db, and set `deleted_at`.
// We cancel them in a later iteration once we hold the state lock.
for (timeline_id, _timeline) in timeline_list.iter() {
self.persistence
.remove_pending_ops_for_timeline(tenant_id, Some(*timeline_id))
.await?;
self.persistence
.timeline_set_deleted_at(tenant_id, *timeline_id)
.await?;
}
let mut locked = self.inner.write().unwrap();
// The list of safekeepers that have any of the timelines
let mut sk_list = HashSet::new();
// List all pending ops for all timelines, cancel them
for (timeline_id, timeline) in timeline_list.iter() {
for (_timeline_id, timeline) in timeline_list.iter() {
let sk_iter = timeline
.sk_set
.iter()
.chain(timeline.new_sk_set.iter().flatten())
.map(|id| NodeId(*id as u64));
for sk_id in sk_iter.clone() {
sk_list.extend(sk_iter);
}
for &sk_id in sk_list.iter() {
let pending_op = TimelinePendingOpPersistence {
tenant_id: tenant_id.to_string(),
timeline_id: String::new(),
generation: i32::MAX,
op_kind: SafekeeperTimelineOpKind::Delete,
sk_id: sk_id.0 as i64,
};
tracing::info!("writing pending op for sk id {sk_id}");
self.persistence.insert_pending_op(pending_op).await?;
}
let mut locked = self.inner.write().unwrap();
for (timeline_id, _timeline) in timeline_list.iter() {
for sk_id in sk_list.iter() {
locked
.safekeeper_reconcilers
.cancel_reconciles_for_timeline(sk_id, tenant_id, Some(*timeline_id));
.cancel_reconciles_for_timeline(*sk_id, tenant_id, Some(*timeline_id));
}
sk_list.extend(sk_iter);
}
// unwrap is safe: we return above for an empty timeline list