fix: bifurcate the detach+reparent step

This commit is contained in:
Joonas Koivunen
2024-07-19 09:54:21 +00:00
parent 881e1ad056
commit cdfaf0700f

View File

@@ -969,10 +969,60 @@ pub(super) async fn complete(
) -> Result<(Vec<TimelineId>, bool), anyhow::Error> {
let PreparedTimelineDetach { layers } = prepared;
let ancestor = detached
.get_ancestor_timeline()
.expect("must still have a ancestor");
let ancestor_lsn = detached.get_ancestor_lsn();
#[derive(Debug)]
enum Ancestor {
NotDetached(Arc<Timeline>, Lsn),
Detached(Arc<Timeline>, Lsn),
Deleted(TimelineId, Lsn),
}
impl Ancestor {
fn as_branchpoint(&self) -> Option<(&Arc<Timeline>, &Lsn)> {
use Ancestor::*;
match self {
NotDetached(tl, lsn) | Detached(tl, lsn) => Some((tl, lsn)),
Deleted(..) => None,
}
}
}
let (recorded_branchpoint, detach_is_ongoing) = {
let access = detached.remote_client.initialized_upload_queue()?;
let latest = access.latest_uploaded_index_part();
(
latest.lineage.detached_previous_ancestor(),
latest.ongoing_detach_ancestor.is_some(),
)
};
let ancestor = if let Some(ancestor) = detached.ancestor_timeline.as_ref() {
assert!(
recorded_branchpoint.is_none(),
"it should be impossible to get to here without having gone through the tenant reset"
);
Ancestor::NotDetached(ancestor.clone(), detached.ancestor_lsn)
} else if let Some((ancestor_id, ancestor_lsn)) = recorded_branchpoint {
// it has been either:
// - detached but still exists => we can try reparenting
// - detached and deleted
//
// either way, we must complete
assert!(
layers.is_empty(),
"no layers should had been copied as detach is done"
);
let existing = tenant.timelines.lock().unwrap().get(&ancestor_id).cloned();
if let Some(ancestor) = existing {
Ancestor::Detached(ancestor, ancestor_lsn)
} else {
Ancestor::Deleted(ancestor_id, ancestor_lsn)
}
} else {
panic!("bug: complete called on a timeline which has not been detached or which has no live ancestor");
};
// publish the prepared layers before we reparent any of the timelines, so that on restart
// reparented timelines find layers. also do the actual detaching.
@@ -983,140 +1033,144 @@ pub(super) async fn complete(
//
// this is not perfect, but it avoids us a retry happening after a compaction or gc on restart
// which could give us a completely wrong layer combination.
detached
.remote_client
.schedule_adding_existing_layers_to_index_detach_and_wait(
&layers,
(ancestor.timeline_id, ancestor_lsn),
)
.await
.context("publish layers and detach ancestor")?;
// FIXME: assert that the persistent record of inprogress detach exists
// FIXME: assert that gc is still blocked
let mut tasks = tokio::task::JoinSet::new();
fn fail_all_but_one() -> bool {
fail::fail_point!("timeline-detach-ancestor::allow_one_reparented", |_| true);
false
if let Ancestor::NotDetached(ancestor, ancestor_lsn) = &ancestor {
detached
.remote_client
.schedule_adding_existing_layers_to_index_detach_and_wait(
&layers,
(ancestor.timeline_id, *ancestor_lsn),
)
.await
.context("publish layers and detach ancestor")?;
}
let failpoint_sem = if fail_all_but_one() {
Some(Arc::new(tokio::sync::Semaphore::new(1)))
} else {
None
};
let (mut reparented, reparented_all) =
if let Some((ancestor, ancestor_lsn)) = ancestor.as_branchpoint() {
assert!(detach_is_ongoing, "to reparent, gc must still be blocked");
let mut tasks = tokio::task::JoinSet::new();
// because we are now keeping the slot in progress, it is unlikely that there will be any
// timeline deletions during this time. if we raced one, then we'll just ignore it.
tenant
.timelines
.lock()
.unwrap()
.values()
.filter_map(|tl| {
if Arc::ptr_eq(tl, detached) {
return None;
fn fail_all_but_one() -> bool {
fail::fail_point!("timeline-detach-ancestor::allow_one_reparented", |_| true);
false
}
if !tl.is_active() {
return None;
}
let tl_ancestor = tl.ancestor_timeline.as_ref()?;
let is_same = Arc::ptr_eq(&ancestor, tl_ancestor);
let is_earlier = tl.get_ancestor_lsn() <= ancestor_lsn;
let is_deleting = tl
.delete_progress
.try_lock()
.map(|flow| !flow.is_not_started())
.unwrap_or(true);
if is_same && is_earlier && !is_deleting {
Some(tl.clone())
let failpoint_sem = if fail_all_but_one() {
Some(Arc::new(tokio::sync::Semaphore::new(1)))
} else {
None
}
})
.for_each(|timeline| {
// important in this scope: we are holding the Tenant::timelines lock
let span = tracing::info_span!("reparent", reparented=%timeline.timeline_id);
let new_parent = detached.timeline_id;
let failpoint_sem = failpoint_sem.clone();
};
tasks.spawn(
async move {
let res = async {
if let Some(failpoint_sem) = failpoint_sem {
let permit = failpoint_sem.acquire().await.map_err(|_| {
anyhow::anyhow!(
"failpoint: timeline-detach-ancestor::allow_one_reparented",
)
})?;
permit.forget();
failpoint_sem.close();
}
timeline
.remote_client
.schedule_reparenting_and_wait(&new_parent)
.await
// because we are now keeping the slot in progress, it is unlikely that there will be any
// timeline deletions during this time. if we raced one, then we'll just ignore it.
tenant
.timelines
.lock()
.unwrap()
.values()
.filter_map(|tl| {
if Arc::ptr_eq(tl, detached) {
return None;
}
.await;
match res {
Ok(()) => {
tracing::info!("reparented");
Some(timeline)
}
Err(e) => {
// with the use of tenant slot, timeline deletion is the most likely
// reason.
tracing::warn!("reparenting failed: {e:#}");
None
}
let tl_ancestor = tl.ancestor_timeline.as_ref()?;
let is_same = Arc::ptr_eq(&ancestor, tl_ancestor);
let is_earlier = tl.get_ancestor_lsn() <= *ancestor_lsn;
let is_deleting = tl
.delete_progress
.try_lock()
.map(|flow| !flow.is_not_started())
.unwrap_or(true);
if is_same && is_earlier && !is_deleting {
Some(tl.clone())
} else {
None
}
})
.for_each(|timeline| {
// important in this scope: we are holding the Tenant::timelines lock
let span = tracing::info_span!("reparent", reparented=%timeline.timeline_id);
let new_parent = detached.timeline_id;
let failpoint_sem = failpoint_sem.clone();
tasks.spawn(
async move {
let res = async {
if let Some(failpoint_sem) = failpoint_sem {
let permit = failpoint_sem.acquire().await.map_err(|_| {
anyhow::anyhow!(
"failpoint: timeline-detach-ancestor::allow_one_reparented",
)
})?;
permit.forget();
failpoint_sem.close();
}
timeline
.remote_client
.schedule_reparenting_and_wait(&new_parent)
.await
}
.await;
match res {
Ok(()) => {
tracing::info!("reparented");
Some(timeline)
}
Err(e) => {
// with the use of tenant slot, timeline deletion is the most likely
// reason.
tracing::warn!("reparenting failed: {e:#}");
None
}
}
}
.instrument(span),
);
});
let reparenting_candidates = tasks.len();
let mut reparented = Vec::with_capacity(tasks.len());
while let Some(res) = tasks.join_next().await {
match res {
Ok(Some(timeline)) => {
reparented.push((timeline.ancestor_lsn, timeline.timeline_id));
}
Ok(None) => {
// lets just ignore this for now. one or all reparented timelines could had
// started deletion, and that is fine. deleting a timeline is the most likely
// reason for this.
}
Err(je) if je.is_cancelled() => unreachable!("not used"),
Err(je) if je.is_panic() => {
// ignore; it's better to continue with a single reparenting failing (or even
// all of them) in order to get to the goal state. we will retry this after
// restart.
}
Err(je) => tracing::error!("unexpected join error: {je:?}"),
}
.instrument(span),
);
});
let reparenting_candidates = tasks.len();
let mut reparented = Vec::with_capacity(tasks.len());
while let Some(res) = tasks.join_next().await {
match res {
Ok(Some(timeline)) => {
reparented.push((timeline.ancestor_lsn, timeline.timeline_id));
}
Ok(None) => {
// lets just ignore this for now. one or all reparented timelines could had
// started deletion, and that is fine. deleting a timeline is the most likely
// reason for this.
}
Err(je) if je.is_cancelled() => unreachable!("not used"),
Err(je) if je.is_panic() => {
// ignore; it's better to continue with a single reparenting failing (or even
// all of them) in order to get to the goal state. we will retry this after
// restart.
}
Err(je) => tracing::error!("unexpected join error: {je:?}"),
}
}
let reparented_all = reparenting_candidates == reparented.len();
let reparented_all = reparenting_candidates == reparented.len();
if reparented_all {
// FIXME: we must return 503 kind of response in the end; do the restart anyways because
// otherwise the runtime state remains diverged
tracing::info!(
reparented = reparented.len(),
candidates = reparenting_candidates,
"failed to reparent all candidates; they will be retried after the restart",
);
}
if reparented_all {
// FIXME: we must return 503 kind of response in the end; do the restart anyways because
// otherwise the runtime state remains diverged
tracing::info!(
reparented = reparented.len(),
candidates = reparenting_candidates,
"failed to reparent all candidates; they will be retried after the restart",
);
}
(reparented, reparented_all)
} else {
// FIXME: again, get the list of (ancestor_lsn, reparented)
(Vec::new(), true)
};
reparented.sort_unstable();