remove indentation level with exhaustive match

This commit is contained in:
Joonas Koivunen
2024-07-19 10:03:46 +00:00
parent 6a8f00dea0
commit 8258385301

View File

@@ -987,17 +987,6 @@ pub(super) async fn complete(
Deleted(TimelineId, Lsn),
}
impl Ancestor {
fn as_branchpoint(&self) -> Option<(&Arc<Timeline>, &Lsn)> {
use Ancestor::*;
match self {
NotDetached(tl, lsn) | Detached(tl, lsn) => Some((tl, lsn)),
Deleted(..) => None,
}
}
}
let (recorded_branchpoint, detach_is_ongoing) = {
let access = detached.remote_client.initialized_upload_queue()?;
let latest = access.latest_uploaded_index_part();
@@ -1045,153 +1034,154 @@ pub(super) async fn complete(
// this is not perfect, but it avoids us a retry happening after a compaction or gc on restart
// which could give us a completely wrong layer combination.
if let Ancestor::NotDetached(ancestor, ancestor_lsn) = &ancestor {
detached
.remote_client
.schedule_adding_existing_layers_to_index_detach_and_wait(
&layers,
(ancestor.timeline_id, *ancestor_lsn),
)
.await
.context("publish layers and detach ancestor")?;
let (ancestor, ancestor_lsn) = match ancestor {
Ancestor::NotDetached(ancestor, ancestor_lsn) => {
detached
.remote_client
.schedule_adding_existing_layers_to_index_detach_and_wait(
&layers,
(ancestor.timeline_id, ancestor_lsn),
)
.await
.context("publish layers and detach ancestor")?;
(ancestor, ancestor_lsn)
}
Ancestor::Detached(ancestor, ancestor_lsn) => (ancestor, ancestor_lsn),
Ancestor::Deleted(..) => {
return Ok((reparented_direct_children(detached, tenant)?, true));
}
};
assert!(detach_is_ongoing, "to reparent, gc must still be blocked");
let mut tasks = tokio::task::JoinSet::new();
fn fail_all_but_one() -> bool {
fail::fail_point!("timeline-detach-ancestor::allow_one_reparented", |_| true);
false
}
let (reparented, reparented_all) =
if let Some((ancestor, ancestor_lsn)) = ancestor.as_branchpoint() {
assert!(detach_is_ongoing, "to reparent, gc must still be blocked");
let mut tasks = tokio::task::JoinSet::new();
let failpoint_sem = if fail_all_but_one() {
Some(Arc::new(tokio::sync::Semaphore::new(1)))
} else {
None
};
fn fail_all_but_one() -> bool {
fail::fail_point!("timeline-detach-ancestor::allow_one_reparented", |_| true);
false
// because we are now keeping the slot in progress, it is unlikely that there will be any
// timeline deletions during this time. if we raced one, then we'll just ignore it.
tenant
.timelines
.lock()
.unwrap()
.values()
.filter_map(|tl| {
if Arc::ptr_eq(tl, detached) {
return None;
}
let failpoint_sem = if fail_all_but_one() {
Some(Arc::new(tokio::sync::Semaphore::new(1)))
let tl_ancestor = tl.ancestor_timeline.as_ref()?;
let is_same = Arc::ptr_eq(&ancestor, tl_ancestor);
let is_earlier = tl.get_ancestor_lsn() <= ancestor_lsn;
let is_deleting = tl
.delete_progress
.try_lock()
.map(|flow| !flow.is_not_started())
.unwrap_or(true);
if is_same && is_earlier && !is_deleting {
Some(tl.clone())
} else {
None
};
}
})
.for_each(|timeline| {
// important in this scope: we are holding the Tenant::timelines lock
let span = tracing::info_span!("reparent", reparented=%timeline.timeline_id);
let new_parent = detached.timeline_id;
let failpoint_sem = failpoint_sem.clone();
// because we are now keeping the slot in progress, it is unlikely that there will be any
// timeline deletions during this time. if we raced one, then we'll just ignore it.
tenant
.timelines
.lock()
.unwrap()
.values()
.filter_map(|tl| {
if Arc::ptr_eq(tl, detached) {
return None;
}
let tl_ancestor = tl.ancestor_timeline.as_ref()?;
let is_same = Arc::ptr_eq(&ancestor, tl_ancestor);
let is_earlier = tl.get_ancestor_lsn() <= *ancestor_lsn;
let is_deleting = tl
.delete_progress
.try_lock()
.map(|flow| !flow.is_not_started())
.unwrap_or(true);
if is_same && is_earlier && !is_deleting {
Some(tl.clone())
} else {
None
}
})
.for_each(|timeline| {
// important in this scope: we are holding the Tenant::timelines lock
let span = tracing::info_span!("reparent", reparented=%timeline.timeline_id);
let new_parent = detached.timeline_id;
let failpoint_sem = failpoint_sem.clone();
tasks.spawn(
async move {
let res = async {
if let Some(failpoint_sem) = failpoint_sem {
let permit = failpoint_sem.acquire().await.map_err(|_| {
anyhow::anyhow!(
"failpoint: timeline-detach-ancestor::allow_one_reparented",
)
})?;
permit.forget();
failpoint_sem.close();
}
timeline
.remote_client
.schedule_reparenting_and_wait(&new_parent)
.await
}
.await;
match res {
Ok(()) => {
tracing::info!("reparented");
Some(timeline)
}
Err(e) => {
// with the use of tenant slot, timeline deletion is the most likely
// reason.
tracing::warn!("reparenting failed: {e:#}");
None
}
}
tasks.spawn(
async move {
let res = async {
if let Some(failpoint_sem) = failpoint_sem {
let permit = failpoint_sem.acquire().await.map_err(|_| {
anyhow::anyhow!(
"failpoint: timeline-detach-ancestor::allow_one_reparented",
)
})?;
permit.forget();
failpoint_sem.close();
}
.instrument(span),
);
});
let reparenting_candidates = tasks.len();
let mut reparented = Vec::with_capacity(tasks.len());
timeline
.remote_client
.schedule_reparenting_and_wait(&new_parent)
.await
}
.await;
while let Some(res) = tasks.join_next().await {
match res {
Ok(Some(timeline)) => {
reparented.push((timeline.ancestor_lsn, timeline.timeline_id));
match res {
Ok(()) => {
tracing::info!("reparented");
Some(timeline)
}
Err(e) => {
// with the use of tenant slot, timeline deletion is the most likely
// reason.
tracing::warn!("reparenting failed: {e:#}");
None
}
}
Ok(None) => {
// lets just ignore this for now. one or all reparented timelines could had
// started deletion, and that is fine. deleting a timeline is the most likely
// reason for this.
}
Err(je) if je.is_cancelled() => unreachable!("not used"),
Err(je) if je.is_panic() => {
// ignore; it's better to continue with a single reparenting failing (or even
// all of them) in order to get to the goal state. we will retry this after
// restart.
}
Err(je) => tracing::error!("unexpected join error: {je:?}"),
}
.instrument(span),
);
});
let reparenting_candidates = tasks.len();
let mut reparented = Vec::with_capacity(tasks.len());
while let Some(res) = tasks.join_next().await {
match res {
Ok(Some(timeline)) => {
reparented.push((timeline.ancestor_lsn, timeline.timeline_id));
}
let reparented_all = reparenting_candidates == reparented.len();
if reparented_all {
// FIXME: we must return 503 kind of response in the end; do the restart anyways because
// otherwise the runtime state remains diverged
tracing::info!(
reparented = reparented.len(),
candidates = reparenting_candidates,
"failed to reparent all candidates; they will be retried after the restart",
);
// TODO: two-state Ok(return_value)?
(Vec::new(), false)
} else {
reparented.sort_unstable();
let reparented = reparented
.into_iter()
.map(|(_, timeline_id)| timeline_id)
.collect();
(reparented, true)
Ok(None) => {
// lets just ignore this for now. one or all reparented timelines could had
// started deletion, and that is fine. deleting a timeline is the most likely
// reason for this.
}
} else {
(reparented_direct_children(detached, tenant)?, true)
};
Err(je) if je.is_cancelled() => unreachable!("not used"),
Err(je) if je.is_panic() => {
// ignore; it's better to continue with a single reparenting failing (or even
// all of them) in order to get to the goal state. we will retry this after
// restart.
}
Err(je) => tracing::error!("unexpected join error: {je:?}"),
}
}
Ok((reparented, reparented_all))
let reparented_all = reparenting_candidates == reparented.len();
if reparented_all {
// FIXME: we must return 503 kind of response in the end; do the restart anyways because
// otherwise the runtime state remains diverged
tracing::info!(
reparented = reparented.len(),
candidates = reparenting_candidates,
"failed to reparent all candidates; they will be retried after the restart",
);
reparented.sort_unstable();
let reparented = reparented
.into_iter()
.map(|(_, timeline_id)| timeline_id)
.collect();
Ok((reparented, true))
} else {
// TODO: two-state Ok(return_value)?
Ok((Vec::new(), false))
}
}