From 82583853018c867acc8ddfe6e57de167d9c9dc9d Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Fri, 19 Jul 2024 10:03:46 +0000 Subject: [PATCH] remove indentation level with exhaustive match --- .../src/tenant/timeline/detach_ancestor.rs | 278 +++++++++--------- 1 file changed, 134 insertions(+), 144 deletions(-) diff --git a/pageserver/src/tenant/timeline/detach_ancestor.rs b/pageserver/src/tenant/timeline/detach_ancestor.rs index ce170451c2..1a4c58b7ce 100644 --- a/pageserver/src/tenant/timeline/detach_ancestor.rs +++ b/pageserver/src/tenant/timeline/detach_ancestor.rs @@ -987,17 +987,6 @@ pub(super) async fn complete( Deleted(TimelineId, Lsn), } - impl Ancestor { - fn as_branchpoint(&self) -> Option<(&Arc, &Lsn)> { - use Ancestor::*; - - match self { - NotDetached(tl, lsn) | Detached(tl, lsn) => Some((tl, lsn)), - Deleted(..) => None, - } - } - } - let (recorded_branchpoint, detach_is_ongoing) = { let access = detached.remote_client.initialized_upload_queue()?; let latest = access.latest_uploaded_index_part(); @@ -1045,153 +1034,154 @@ pub(super) async fn complete( // this is not perfect, but it avoids us a retry happening after a compaction or gc on restart // which could give us a completely wrong layer combination. - if let Ancestor::NotDetached(ancestor, ancestor_lsn) = &ancestor { - detached - .remote_client - .schedule_adding_existing_layers_to_index_detach_and_wait( - &layers, - (ancestor.timeline_id, *ancestor_lsn), - ) - .await - .context("publish layers and detach ancestor")?; + let (ancestor, ancestor_lsn) = match ancestor { + Ancestor::NotDetached(ancestor, ancestor_lsn) => { + detached + .remote_client + .schedule_adding_existing_layers_to_index_detach_and_wait( + &layers, + (ancestor.timeline_id, ancestor_lsn), + ) + .await + .context("publish layers and detach ancestor")?; + + (ancestor, ancestor_lsn) + } + Ancestor::Detached(ancestor, ancestor_lsn) => (ancestor, ancestor_lsn), + Ancestor::Deleted(..) => { + return Ok((reparented_direct_children(detached, tenant)?, true)); + } + }; + + assert!(detach_is_ongoing, "to reparent, gc must still be blocked"); + let mut tasks = tokio::task::JoinSet::new(); + + fn fail_all_but_one() -> bool { + fail::fail_point!("timeline-detach-ancestor::allow_one_reparented", |_| true); + false } - let (reparented, reparented_all) = - if let Some((ancestor, ancestor_lsn)) = ancestor.as_branchpoint() { - assert!(detach_is_ongoing, "to reparent, gc must still be blocked"); - let mut tasks = tokio::task::JoinSet::new(); + let failpoint_sem = if fail_all_but_one() { + Some(Arc::new(tokio::sync::Semaphore::new(1))) + } else { + None + }; - fn fail_all_but_one() -> bool { - fail::fail_point!("timeline-detach-ancestor::allow_one_reparented", |_| true); - false + // because we are now keeping the slot in progress, it is unlikely that there will be any + // timeline deletions during this time. if we raced one, then we'll just ignore it. + tenant + .timelines + .lock() + .unwrap() + .values() + .filter_map(|tl| { + if Arc::ptr_eq(tl, detached) { + return None; } - let failpoint_sem = if fail_all_but_one() { - Some(Arc::new(tokio::sync::Semaphore::new(1))) + let tl_ancestor = tl.ancestor_timeline.as_ref()?; + let is_same = Arc::ptr_eq(&ancestor, tl_ancestor); + let is_earlier = tl.get_ancestor_lsn() <= ancestor_lsn; + + let is_deleting = tl + .delete_progress + .try_lock() + .map(|flow| !flow.is_not_started()) + .unwrap_or(true); + + if is_same && is_earlier && !is_deleting { + Some(tl.clone()) } else { None - }; + } + }) + .for_each(|timeline| { + // important in this scope: we are holding the Tenant::timelines lock + let span = tracing::info_span!("reparent", reparented=%timeline.timeline_id); + let new_parent = detached.timeline_id; + let failpoint_sem = failpoint_sem.clone(); - // because we are now keeping the slot in progress, it is unlikely that there will be any - // timeline deletions during this time. if we raced one, then we'll just ignore it. - tenant - .timelines - .lock() - .unwrap() - .values() - .filter_map(|tl| { - if Arc::ptr_eq(tl, detached) { - return None; - } - - let tl_ancestor = tl.ancestor_timeline.as_ref()?; - let is_same = Arc::ptr_eq(&ancestor, tl_ancestor); - let is_earlier = tl.get_ancestor_lsn() <= *ancestor_lsn; - - let is_deleting = tl - .delete_progress - .try_lock() - .map(|flow| !flow.is_not_started()) - .unwrap_or(true); - - if is_same && is_earlier && !is_deleting { - Some(tl.clone()) - } else { - None - } - }) - .for_each(|timeline| { - // important in this scope: we are holding the Tenant::timelines lock - let span = tracing::info_span!("reparent", reparented=%timeline.timeline_id); - let new_parent = detached.timeline_id; - let failpoint_sem = failpoint_sem.clone(); - - tasks.spawn( - async move { - let res = async { - if let Some(failpoint_sem) = failpoint_sem { - let permit = failpoint_sem.acquire().await.map_err(|_| { - anyhow::anyhow!( - "failpoint: timeline-detach-ancestor::allow_one_reparented", - ) - })?; - permit.forget(); - failpoint_sem.close(); - } - - timeline - .remote_client - .schedule_reparenting_and_wait(&new_parent) - .await - } - .await; - - match res { - Ok(()) => { - tracing::info!("reparented"); - Some(timeline) - } - Err(e) => { - // with the use of tenant slot, timeline deletion is the most likely - // reason. - tracing::warn!("reparenting failed: {e:#}"); - None - } - } + tasks.spawn( + async move { + let res = async { + if let Some(failpoint_sem) = failpoint_sem { + let permit = failpoint_sem.acquire().await.map_err(|_| { + anyhow::anyhow!( + "failpoint: timeline-detach-ancestor::allow_one_reparented", + ) + })?; + permit.forget(); + failpoint_sem.close(); } - .instrument(span), - ); - }); - let reparenting_candidates = tasks.len(); - let mut reparented = Vec::with_capacity(tasks.len()); + timeline + .remote_client + .schedule_reparenting_and_wait(&new_parent) + .await + } + .await; - while let Some(res) = tasks.join_next().await { - match res { - Ok(Some(timeline)) => { - reparented.push((timeline.ancestor_lsn, timeline.timeline_id)); + match res { + Ok(()) => { + tracing::info!("reparented"); + Some(timeline) + } + Err(e) => { + // with the use of tenant slot, timeline deletion is the most likely + // reason. + tracing::warn!("reparenting failed: {e:#}"); + None + } } - Ok(None) => { - // lets just ignore this for now. one or all reparented timelines could had - // started deletion, and that is fine. deleting a timeline is the most likely - // reason for this. - } - Err(je) if je.is_cancelled() => unreachable!("not used"), - Err(je) if je.is_panic() => { - // ignore; it's better to continue with a single reparenting failing (or even - // all of them) in order to get to the goal state. we will retry this after - // restart. - } - Err(je) => tracing::error!("unexpected join error: {je:?}"), } + .instrument(span), + ); + }); + + let reparenting_candidates = tasks.len(); + let mut reparented = Vec::with_capacity(tasks.len()); + + while let Some(res) = tasks.join_next().await { + match res { + Ok(Some(timeline)) => { + reparented.push((timeline.ancestor_lsn, timeline.timeline_id)); } - - let reparented_all = reparenting_candidates == reparented.len(); - - if reparented_all { - // FIXME: we must return 503 kind of response in the end; do the restart anyways because - // otherwise the runtime state remains diverged - tracing::info!( - reparented = reparented.len(), - candidates = reparenting_candidates, - "failed to reparent all candidates; they will be retried after the restart", - ); - - // TODO: two-state Ok(return_value)? - (Vec::new(), false) - } else { - reparented.sort_unstable(); - - let reparented = reparented - .into_iter() - .map(|(_, timeline_id)| timeline_id) - .collect(); - - (reparented, true) + Ok(None) => { + // lets just ignore this for now. one or all reparented timelines could had + // started deletion, and that is fine. deleting a timeline is the most likely + // reason for this. } - } else { - (reparented_direct_children(detached, tenant)?, true) - }; + Err(je) if je.is_cancelled() => unreachable!("not used"), + Err(je) if je.is_panic() => { + // ignore; it's better to continue with a single reparenting failing (or even + // all of them) in order to get to the goal state. we will retry this after + // restart. + } + Err(je) => tracing::error!("unexpected join error: {je:?}"), + } + } - Ok((reparented, reparented_all)) + let reparented_all = reparenting_candidates == reparented.len(); + + if reparented_all { + // FIXME: we must return 503 kind of response in the end; do the restart anyways because + // otherwise the runtime state remains diverged + tracing::info!( + reparented = reparented.len(), + candidates = reparenting_candidates, + "failed to reparent all candidates; they will be retried after the restart", + ); + + reparented.sort_unstable(); + + let reparented = reparented + .into_iter() + .map(|(_, timeline_id)| timeline_id) + .collect(); + + Ok((reparented, true)) + } else { + // TODO: two-state Ok(return_value)? + Ok((Vec::new(), false)) + } }