feat: block gc persistently until detach ancestor completes

This commit is contained in:
Joonas Koivunen
2024-07-25 17:44:37 +00:00
parent f9b12def0b
commit 5fc034fa7f
2 changed files with 282 additions and 31 deletions

View File

@@ -2063,6 +2063,7 @@ impl TenantManager {
slot_guard.upsert(TenantSlot::Attached(tenant.clone()))?;
// finally ask the restarted tenant to complete the detach
tenant
.ongoing_timeline_detach
.complete(attempt, &tenant)

View File

@@ -131,13 +131,20 @@ impl Default for Options {
/// of each timeline tree.
#[derive(Default)]
pub(crate) struct SharedState {
inner: std::sync::Mutex<Option<(TimelineId, completion::Barrier)>>,
inner: std::sync::Mutex<SharedStateInner>,
gc_waiting: tokio::sync::Notify,
attempt_waiting: tokio::sync::Notify,
}
impl SharedState {
/// Notify an uninitialized shared state that an attempt to detach timeline ancestor continues
/// from previous instance.
pub(crate) fn continue_existing_attempt(&self, _attempt: &Attempt) {}
pub(crate) fn continue_existing_attempt(&self, attempt: &Attempt) {
self.inner
.lock()
.unwrap()
.continue_existing_attempt(attempt);
}
/// Only GC must be paused while a detach ancestor is ongoing. Compaction can happen, to aid
/// with any ongoing ingestion. Compaction even after restart is ok because layers will not be
@@ -145,23 +152,40 @@ impl SharedState {
pub(crate) fn attempt_blocks_gc(&self) -> bool {
// if we have any started and not finished ancestor detaches, we must remain paused
// and also let any trying to start operation know that we've paused.
// Two cases:
// - there is an actual attempt started
// - we have learned from indexparts that an attempt will be retried in near future
self.inner.lock().unwrap().is_some()
self.mark_witnessed_and_notify()
}
/// Sleep for the duration, while letting any ongoing ancestor_detach attempt know that gc has
/// been paused.
///
/// Cancellation safe.
pub(crate) async fn gc_sleeping_while<T, F: std::future::Future<Output = T>>(
&self,
fut: F,
) -> T {
// this needs to wrap the sleeping so that we can quickly let ancestor_detach continue
fut.await
let mut fut = std::pin::pin!(fut);
loop {
tokio::select! {
x = &mut fut => { return x; },
_ = self.gc_waiting.notified() => {
self.mark_witnessed_and_notify();
}
}
}
}
fn mark_witnessed_and_notify(&self) -> bool {
let mut g = self.inner.lock().unwrap();
if let Some((_, witnessed)) = g.latest.as_mut() {
if !*witnessed {
*witnessed = true;
self.attempt_waiting.notify_one();
}
true
} else {
false
}
}
/// Acquire the exclusive lock for a new detach ancestor attempt and ensure that GC task has
@@ -173,26 +197,60 @@ impl SharedState {
return Err(Error::ShuttingDown);
}
let (guard, barrier) = completion::channel();
{
let completion = {
let mut guard = self.inner.lock().unwrap();
if let Some((tl, other)) = guard.as_ref() {
if !other.is_ready() {
return Err(Error::OtherTimelineDetachOngoing(*tl));
}
}
*guard = Some((detached.timeline_id, barrier));
}
// FIXME: modify the index part to have a "detach-ancestor: inprogress { started_at }"
// unsure if it should be awaited to upload yet...
let completion = guard.start_new(&detached.timeline_id)?;
// now that we changed the contents, notify any long-sleeping gc
self.gc_waiting.notify_one();
completion
};
let started_at = std::time::Instant::now();
let mut cancelled = std::pin::pin!(detached.cancel.cancelled());
loop {
tokio::select! {
_ = &mut cancelled => { return Err(Error::ShuttingDown); },
_ = self.attempt_waiting.notified() => {
// reading a notification which was not intended for us is not a problem,
// because we check if *our* progress has been witnessed by gc.
},
};
let g = self.inner.lock().unwrap();
if g.is_gc_paused(&detached.timeline_id) {
break;
}
}
// finally
let gate_entered = detached.gate.enter().map_err(|_| Error::ShuttingDown)?;
let synced_in = started_at.elapsed();
detached
.remote_client
.schedule_started_detach_ancestor_mark_and_wait()
.await
// FIXME: aaaargh
.map_err(|_| Error::ShuttingDown)?;
let uploaded_in = started_at.elapsed() - synced_in;
// FIXME: get rid of this logging or make it a metric or two
tracing::info!(
sync_ms = synced_in.as_millis(),
upload_ms = uploaded_in.as_millis(),
"gc paused, gate entered, and uploaded"
);
Ok(Attempt {
_guard: guard,
timeline_id: detached.timeline_id,
_guard: completion,
gate_entered: Some(gate_entered),
})
}
@@ -204,22 +262,198 @@ impl SharedState {
/// Cancellation safe.
pub(crate) async fn complete(
&self,
_attempt: Attempt,
_tenant: &Arc<Tenant>,
attempt: Attempt,
tenant: &Arc<Tenant>,
) -> Result<(), Error> {
// do we need the tenant to actually activate...? yes we do, but that can happen via
// background task starting, because we will similarly want to confirm that the gc has
// paused, before we unpause it?
//
// assert that such and such state has been collected
// find the timeline the attempt represents
// using the timelines remote client, upload an index part with completion information
{
let g = self.inner.lock().unwrap();
// TODO: cover the case where retry completes?
g.validate(&attempt);
}
let attempt = scopeguard::guard(attempt, |attempt| {
// our attempt will no longer be valid, so release it
self.inner.lock().unwrap().cancel(attempt);
});
// no failpoint needed here, because the next one is the first mutating
tenant
.wait_to_become_active(std::time::Duration::from_secs(9999))
.await
.map_err(Error::WaitToActivate)?;
let Some(timeline) = tenant
.timelines
.lock()
.unwrap()
.get(&attempt.timeline_id)
.cloned()
else {
unreachable!("unsure if there is an ordering, but perhaps this is possible?");
};
// this should be an 503 at least...?
fail::fail_point!(
"timeline-detach-ancestor::complete_before_uploading",
|_| Err(Error::Failpoint(
"timeline-detach-ancestor::complete_before_uploading"
))
);
timeline
.remote_client
.schedule_completed_detach_ancestor_mark_and_wait()
.await
.map_err(|_| Error::ShuttingDown)?;
// now that the upload has gone through, we must remove this timeline from inprogress
let attempt = scopeguard::ScopeGuard::into_inner(attempt);
self.inner.lock().unwrap().complete(attempt);
Ok(())
}
}
/// Token which represents a persistent, exclusive, awaitable single attempt.
#[derive(Default)]
struct SharedStateInner {
known_ongoing: std::collections::HashSet<TimelineId>,
latest: Option<(ExistingAttempt, bool)>,
}
impl SharedStateInner {
fn continue_existing_attempt(&mut self, attempt: &Attempt) {
assert!(self.known_ongoing.is_empty());
assert!(self.latest.is_none());
self.known_ongoing.insert(attempt.timeline_id);
self.latest = Some((
ExistingAttempt::ContinuedOverRestart(attempt.timeline_id),
false,
));
}
fn start_new(&mut self, detached: &TimelineId) -> Result<completion::Completion, Error> {
let completion = if let Some((existing, witnessed)) = self.latest.as_mut() {
let completion = existing.start_new(detached)?;
*witnessed = false;
completion
} else {
let (completion, attempt) = ExistingAttempt::new(detached);
self.latest = Some((attempt, false));
completion
};
self.known_ongoing.insert(*detached);
Ok(completion)
}
fn is_gc_paused(&self, timeline_id: &TimelineId) -> bool {
match &self.latest {
Some((ExistingAttempt::Actual(x, _), paused)) => {
assert_eq!(x, timeline_id);
*paused
}
other => {
unreachable!("unexpected state {other:?}")
}
}
}
fn validate(&self, attempt: &Attempt) {
match self.latest.as_ref() {
Some((ExistingAttempt::ContinuedOverRestart(x), _)) if x == &attempt.timeline_id => {
assert!(self.known_ongoing.contains(&attempt.timeline_id));
}
other => unreachable!("unexpected: {other:?}"),
}
}
fn complete(&mut self, attempt: Attempt) {
match self.latest.as_ref() {
Some((ExistingAttempt::ContinuedOverRestart(x), witnessed))
if x == &attempt.timeline_id =>
{
let witnessed = *witnessed;
assert!(self.known_ongoing.remove(&attempt.timeline_id));
if self.known_ongoing.is_empty() {
self.latest = None;
tracing::info!("gc is now unblocked");
} else {
self.latest = Some((ExistingAttempt::ReadFromIndexPart, witnessed));
}
}
other => unreachable!("unexpected: {other:?}"),
}
}
fn cancel(&mut self, attempt: Attempt) {
match self.latest.as_ref() {
Some((ExistingAttempt::ContinuedOverRestart(x), witnessed))
if x == &attempt.timeline_id =>
{
let witnessed = *witnessed;
assert!(!self.known_ongoing.is_empty());
self.latest = Some((ExistingAttempt::ReadFromIndexPart, witnessed));
}
other => unreachable!("unexpected: {other:?}"),
}
}
}
#[derive(Debug)]
enum ExistingAttempt {
/// Informative; there are still non-zero known ongoing timeline ancestor detaches
ReadFromIndexPart,
/// Exclusive lock carried over tenant reset
ContinuedOverRestart(TimelineId),
/// Exclusive while barrier shows the task running
Actual(TimelineId, completion::Barrier),
}
impl ExistingAttempt {
fn start_new(&mut self, detached: &TimelineId) -> Result<completion::Completion, Error> {
use ExistingAttempt::*;
match self {
ReadFromIndexPart => {}
Actual(other, barrier) if barrier.is_ready() => {
if other != detached {
tracing::warn!(prev=%other, next=%detached, "switching ongoing detach; this is not expected to happen normally, but doesn't necessarily mean anything catastrophic");
}
}
Actual(other, _) | ContinuedOverRestart(other) => {
return Err(Error::OtherTimelineDetachOngoing(*other))
}
}
let (guard, attempt) = Self::new(detached);
*self = attempt;
Ok(guard)
}
fn new(detached: &TimelineId) -> (completion::Completion, Self) {
let (guard, barrier) = completion::channel();
(guard, ExistingAttempt::Actual(*detached, barrier))
}
}
/// Represents an across tenant reset exclusive single attempt to detach ancestor.
pub(crate) struct Attempt {
timeline_id: TimelineId,
_guard: completion::Completion,
gate_entered: Option<utils::sync::gate::GateGuard>,
}
@@ -232,20 +466,36 @@ impl Attempt {
}
#[derive(Default)]
pub(crate) struct SharedStateBuilder {}
pub(crate) struct SharedStateBuilder {
inprogress: std::collections::HashSet<TimelineId>,
}
impl SharedStateBuilder {
/// While loading, visit a timelines persistent [`crate::tenant::IndexPart`] and record if it is being
/// detached.
pub(crate) fn record_loading_timeline(
&mut self,
_timeline_id: &TimelineId,
_index_part: &crate::tenant::IndexPart,
timeline_id: &TimelineId,
index_part: &crate::tenant::IndexPart,
) {
if index_part.ongoing_detach_ancestor.is_some() {
// if the loading a timeline fails, tenant loading must fail as it does right now, or
// something more elaborate needs to be done with this tracking
self.inprogress.insert(*timeline_id);
}
}
/// Merge the loaded not yet deleting in-progress to the existing datastructure.
pub(crate) fn build(self, _target: &SharedState) {}
pub(crate) fn build(self, target: &SharedState) {
let mut g = target.inner.lock().unwrap();
assert_eq!(g.latest.is_none(), g.known_ongoing.is_empty());
g.known_ongoing.extend(self.inprogress.into_iter());
if g.latest.is_none() && !g.known_ongoing.is_empty() {
g.latest = Some((ExistingAttempt::ReadFromIndexPart, false));
}
}
}
/// See [`Timeline::prepare_to_detach_from_ancestor`]