From 00380cedd7187baabf44e0107fbf3b1ac522d3fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Tue, 21 Jan 2025 15:32:48 +0100 Subject: [PATCH] Add support for timeline deletion --- storage_controller/src/persistence.rs | 15 +- storage_controller/src/safekeeper_client.rs | 18 +- storage_controller/src/service.rs | 189 +++++++++++++++++- .../src/service/safekeeper_reconciler.rs | 9 +- 4 files changed, 221 insertions(+), 10 deletions(-) diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index fbe79cc9b2..e94c9960ac 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -1224,11 +1224,12 @@ impl Persistence { ) .await } - pub(crate) async fn update_timeline( + pub(crate) async fn update_timeline_status( &self, tenant_id: TenantId, timeline_id: TimelineId, - timeline_status: TimelineStatusKind, + status_kind: TimelineStatusKind, + status: String, ) -> DatabaseResult<()> { use crate::schema::timelines; @@ -1238,7 +1239,10 @@ impl Persistence { let inserted_updated = diesel::update(timelines::table) .filter(timelines::tenant_id.eq(tenant_id.to_string())) .filter(timelines::timeline_id.eq(timeline_id.to_string())) - .set(timelines::status.eq(String::from(timeline_status))) + .set(( + timelines::status_kind.eq(String::from(status_kind)), + timelines::status.eq(status.clone()), + )) .execute(conn)?; if inserted_updated != 1 { @@ -1297,7 +1301,8 @@ impl Persistence { .filter( timelines::status .eq(String::from(TimelineStatusKind::Creating)) - .or(timelines::status.eq(String::from(TimelineStatusKind::Deleting))), + .or(timelines::status + .eq(String::from(TimelineStatusKind::Deleting))), ) .load::(conn)?) }, @@ -1621,4 +1626,4 @@ impl From for String { pub(crate) struct TimelineStatusCreating { pub(crate) pg_version: u32, pub(crate) start_lsn: Lsn, -} \ No newline at end of file +} diff --git a/storage_controller/src/safekeeper_client.rs b/storage_controller/src/safekeeper_client.rs index 2ade4c1f33..c65c967a5a 100644 --- a/storage_controller/src/safekeeper_client.rs +++ b/storage_controller/src/safekeeper_client.rs @@ -1,6 +1,9 @@ use safekeeper_api::models::{TimelineCreateRequest, TimelineStatus}; use safekeeper_client::mgmt_api::{Client, Result}; -use utils::{id::NodeId, logging::SecretString}; +use utils::{ + id::{NodeId, TenantId, TimelineId}, + logging::SecretString, +}; /// Thin wrapper around [`safekeeper_client::mgmt_api::Client`]. It allows the storage /// controller to collect metrics in a non-intrusive manner. @@ -74,4 +77,17 @@ impl SafekeeperClient { self.inner.create_timeline(&req).await ) } + + pub(crate) async fn delete_timeline( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> Result { + measured_request!( + "delete_timeline", + crate::metrics::Method::Delete, + &self.node_id_label, + self.inner.delete_timeline(tenant_id, timeline_id).await + ) + } } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 83de6a1a71..1e0ba5b7e4 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -3550,7 +3550,7 @@ impl Service { // TODO self.persistence - .update_timeline(tenant_id, timeline_id, status_kind) + .update_timeline_status(tenant_id, timeline_id, status_kind, "{}".to_owned()) .await?; Ok(()) } @@ -4118,6 +4118,183 @@ impl Service { Ok(result) } + async fn tenant_timeline_delete_safekeepers_reconcile( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + tl_p: &TimelinePersistence, + sk_persistences: &HashMap, + ) -> Result<(), ApiError> { + // If at least one deletion succeeded, return if we are outside of a specified timeout + let jwt = self.config.jwt_token.clone().map(SecretString::from); + let mut joinset = JoinSet::new(); + + let mut members = Vec::new(); + for sk in tl_p.sk_set.iter() { + let Some(sk_p) = sk_persistences.get(&sk) else { + return Err(ApiError::InternalServerError(anyhow!( + "couldn't find persisted entry for safekeeper with id {sk}" + )))?; + }; + members.push(SafekeeperId { + id: NodeId(sk_p.id as u64), + host: sk_p.host.clone(), + pg_port: sk_p.port as u16, + }); + } + + let sks_to_reconcile = &tl_p.sk_set; + for sk in sks_to_reconcile.iter() { + // Unwrap is fine as we already would have returned error above + let sk_p = sk_persistences.get(&sk).unwrap(); + let sk_clone = NodeId(*sk as u64); + let base_url = sk_p.base_url(); + let jwt = jwt.clone(); + let cancel = self.cancel.clone(); + joinset.spawn(async move { + let client = SafekeeperClient::new(sk_clone, base_url, jwt); + let retry_result = backoff::retry( + || client.delete_timeline(tenant_id, timeline_id), + |_e| { + // TODO find right criteria here for deciding on retries + false + }, + 3, + 5, + "delete timeline on safekeeper", + &cancel, + ) + .await; + if let Some(res) = retry_result { + res.map_err(|e| { + ApiError::InternalServerError( + anyhow::Error::new(e).context("error deleting timeline on safekeeper"), + ) + }) + } else { + Err(ApiError::Cancelled) + } + }); + } + // After we have built the joinset, we now wait for the tasks to complete, + // but with a specified timeout to make sure we return swiftly, either with + // a failure or success. + const SK_DELETE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30); + let reconcile_deadline = tokio::time::Instant::now() + SK_DELETE_TIMELINE_RECONCILE_TIMEOUT; + + // Treat the first task to finish differently, mostly when it times out, + // because then we won't have any successful deletion. + // For the second and third task, we don't rely on them succeeding, and we need this to support + // continuing operations even if a safekeeper is down. + let timeout_or_first = tokio::time::timeout_at(reconcile_deadline, async { + joinset.join_next().await.unwrap() + }) + .await; + let mut reconcile_results = Vec::new(); + match timeout_or_first { + Ok(Ok(res_1)) => { + reconcile_results.push(res_1); + } + Ok(Err(_)) => { + return Err(ApiError::InternalServerError(anyhow!( + "task was cancelled while reconciling timeline deletion" + ))); + } + Err(_) => { + return Err(ApiError::InternalServerError(anyhow!( + "couldn't reconcile timeline deletion on safekeepers within timeout" + ))); + } + } + let timeout_or_last = tokio::time::timeout_at(reconcile_deadline, async { + while let Some(next_res) = joinset.join_next().await { + match next_res { + Ok(res) => { + reconcile_results.push(res); + } + Err(e) => { + tracing::info!("aborting reconciliation due to join error: {e:?}"); + break; + } + } + } + }); + if let Err(_) = timeout_or_last.await { + // No error if cancelled or timed out: we already have feedback from a quorum of safekeepers + tracing::info!( + "timeout for last {} reconciliations", + sks_to_reconcile.len() - 1 + ); + } + // check now if quorum was reached in reconcile_results + let successful = reconcile_results + .into_iter() + .filter_map(|res| res.ok()) + .collect::>(); + tracing::info!( + "Got {} successful results from reconciliation", + successful.len() + ); + let new_status_kind = if successful.len() < 1 { + // Failure + return Err(ApiError::InternalServerError(anyhow!( + "not enough successful reconciliations to reach quorum, please retry: {}", + successful.len() + ))); + } else if successful.len() == sks_to_reconcile.len() { + // Success, state of timeline is Deleted + TimelineStatusKind::Deleted + } else if successful.len() == 2 { + // Success, state of timeline remains Creating + TimelineStatusKind::Deleting + } else { + unreachable!( + "unexpected number of successful reconciliations {}", + successful.len() + ); + }; + + if new_status_kind == TimelineStatusKind::Deleted { + self.persistence + .update_timeline_status(tenant_id, timeline_id, new_status_kind, "{}".to_owned()) + .await?; + } + Ok(()) + } + async fn tenant_timeline_delete_safekeepers( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> Result<(), ApiError> { + let tl = self + .persistence + .get_timeline(tenant_id, timeline_id) + .await?; + let status_kind = + TimelineStatusKind::from_str(&tl.status_kind).map_err(ApiError::InternalServerError)?; + if status_kind != TimelineStatusKind::Deleting { + // Set status to deleting + let new_status_kind = TimelineStatusKind::Deleting; + self.persistence + .update_timeline_status(tenant_id, timeline_id, new_status_kind, "{}".to_owned()) + .await?; + } + let sk_persistences = self + .persistence + .list_safekeepers() + .await? + .into_iter() + .map(|p| (p.id, p)) + .collect::>(); + self.tenant_timeline_delete_safekeepers_reconcile( + tenant_id, + timeline_id, + &tl, + &sk_persistences, + ) + .await?; + Ok(()) + } pub(crate) async fn tenant_timeline_delete( &self, tenant_id: TenantId, @@ -4131,7 +4308,7 @@ impl Service { ) .await; - self.tenant_remote_mutation(tenant_id, move |mut targets| async move { + let ps_fut = self.tenant_remote_mutation(tenant_id, move |mut targets| async move { if targets.0.is_empty() { return Err(ApiError::NotFound( anyhow::anyhow!("Tenant not found").into(), @@ -4203,7 +4380,13 @@ impl Service { ) .await?; Ok(shard_zero_status) - }).await? + }); + + let sk_fut = self.tenant_timeline_delete_safekeepers(tenant_id, timeline_id); + + let (ps_res, sk_res) = tokio::join!(ps_fut, sk_fut); + sk_res?; + ps_res? } /// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this diff --git a/storage_controller/src/service/safekeeper_reconciler.rs b/storage_controller/src/service/safekeeper_reconciler.rs index 74baea31c1..c5208c5f36 100644 --- a/storage_controller/src/service/safekeeper_reconciler.rs +++ b/storage_controller/src/service/safekeeper_reconciler.rs @@ -114,7 +114,14 @@ impl SafekeeperReconciler { .await?; } TimelineStatusKind::Deleting => { - todo!() + self.service + .tenant_timeline_delete_safekeepers_reconcile( + tenant_id, + timeline_id, + &tl, + sk_persistences, + ) + .await?; } } Ok(())