diff --git a/pageserver/src/tenant/timeline/detach_ancestor.rs b/pageserver/src/tenant/timeline/detach_ancestor.rs index ac951227b5..e3dad3e3cb 100644 --- a/pageserver/src/tenant/timeline/detach_ancestor.rs +++ b/pageserver/src/tenant/timeline/detach_ancestor.rs @@ -1179,75 +1179,54 @@ pub(super) async fn detach_and_reparent( // because we are now keeping the slot in progress, it is unlikely that there will be any // timeline deletions during this time. if we raced one, then we'll just ignore it. - tenant - .timelines - .lock() - .unwrap() - .values() - .filter_map(|tl| { - if Arc::ptr_eq(tl, detached) { - return None; - } + { + let g = tenant.timelines.lock().unwrap(); + reparentable_timelines(g.values(), detached, &ancestor, ancestor_lsn) + .cloned() + .for_each(|timeline| { + // important in this scope: we are holding the Tenant::timelines lock + let span = tracing::info_span!("reparent", reparented=%timeline.timeline_id); + let new_parent = detached.timeline_id; + #[cfg(feature = "testing")] + let failpoint_sem = failpoint_sem.clone(); - let tl_ancestor = tl.ancestor_timeline.as_ref()?; - let is_same = Arc::ptr_eq(&ancestor, tl_ancestor); - let is_earlier = tl.get_ancestor_lsn() <= ancestor_lsn; + tasks.spawn( + async move { + let res = async { + #[cfg(feature = "testing")] + if let Some(failpoint_sem) = failpoint_sem { + let _permit = failpoint_sem.acquire().await.map_err(|_| { + anyhow::anyhow!( + "failpoint: timeline-detach-ancestor::allow_one_reparented", + ) + })?; + failpoint_sem.close(); + } - let is_deleting = tl - .delete_progress - .try_lock() - .map(|flow| !flow.is_not_started()) - .unwrap_or(true); - - if is_same && is_earlier && !is_deleting { - Some(tl.clone()) - } else { - None - } - }) - .for_each(|timeline| { - // important in this scope: we are holding the Tenant::timelines lock - let span = tracing::info_span!("reparent", reparented=%timeline.timeline_id); - let new_parent = detached.timeline_id; - #[cfg(feature = "testing")] - let failpoint_sem = failpoint_sem.clone(); - - tasks.spawn( - async move { - let res = async { - #[cfg(feature = "testing")] - if let Some(failpoint_sem) = failpoint_sem { - let _permit = failpoint_sem.acquire().await.map_err(|_| { - anyhow::anyhow!( - "failpoint: timeline-detach-ancestor::allow_one_reparented", - ) - })?; - failpoint_sem.close(); + timeline + .remote_client + .schedule_reparenting_and_wait(&new_parent) + .await } + .await; - timeline - .remote_client - .schedule_reparenting_and_wait(&new_parent) - .await - } - .await; - - match res { - Ok(()) => { - tracing::info!("reparented"); - Some(timeline) - } - Err(e) => { - // with the use of tenant slot, raced timeline deletion is the most - // likely reason. - tracing::warn!("reparenting failed: {e:#}"); - None + match res { + Ok(()) => { + tracing::info!("reparented"); + Some(timeline) + } + Err(e) => { + // with the use of tenant slot, raced timeline deletion is the most + // likely reason. + tracing::warn!("reparenting failed: {e:#}"); + None + } } } - } - .instrument(span), - ); - }); + .instrument(span), + ); + }); + } let reparenting_candidates = tasks.len(); let mut reparented = Vec::with_capacity(tasks.len()); @@ -1295,3 +1274,35 @@ pub(super) async fn detach_and_reparent( Ok(DetachingAndReparenting::SomeReparentingFailed { must_restart }) } } + +fn reparentable_timelines<'a, I>( + timelines: I, + detached: &'a Arc, + ancestor: &'a Arc, + ancestor_lsn: Lsn, +) -> impl Iterator> + 'a +where + I: Iterator> + 'a, +{ + timelines.filter_map(move |tl| { + if Arc::ptr_eq(tl, detached) { + return None; + } + + let tl_ancestor = tl.ancestor_timeline.as_ref()?; + let is_same = Arc::ptr_eq(ancestor, tl_ancestor); + let is_earlier = tl.get_ancestor_lsn() <= ancestor_lsn; + + let is_deleting = tl + .delete_progress + .try_lock() + .map(|flow| !flow.is_not_started()) + .unwrap_or(true); + + if is_same && is_earlier && !is_deleting { + Some(tl) + } else { + None + } + }) +}