diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 445676c4ae..09cedf9523 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -2063,6 +2063,7 @@ impl TenantManager { slot_guard.upsert(TenantSlot::Attached(tenant.clone()))?; + // finally ask the restarted tenant to complete the detach tenant .ongoing_timeline_detach .complete(attempt, &tenant) diff --git a/pageserver/src/tenant/timeline/detach_ancestor.rs b/pageserver/src/tenant/timeline/detach_ancestor.rs index fff33eee8b..f141c8510b 100644 --- a/pageserver/src/tenant/timeline/detach_ancestor.rs +++ b/pageserver/src/tenant/timeline/detach_ancestor.rs @@ -131,13 +131,20 @@ impl Default for Options { /// of each timeline tree. #[derive(Default)] pub(crate) struct SharedState { - inner: std::sync::Mutex>, + inner: std::sync::Mutex, + gc_waiting: tokio::sync::Notify, + attempt_waiting: tokio::sync::Notify, } impl SharedState { /// Notify an uninitialized shared state that an attempt to detach timeline ancestor continues /// from previous instance. - pub(crate) fn continue_existing_attempt(&self, _attempt: &Attempt) {} + pub(crate) fn continue_existing_attempt(&self, attempt: &Attempt) { + self.inner + .lock() + .unwrap() + .continue_existing_attempt(attempt); + } /// Only GC must be paused while a detach ancestor is ongoing. Compaction can happen, to aid /// with any ongoing ingestion. Compaction even after restart is ok because layers will not be @@ -145,23 +152,40 @@ impl SharedState { pub(crate) fn attempt_blocks_gc(&self) -> bool { // if we have any started and not finished ancestor detaches, we must remain paused // and also let any trying to start operation know that we've paused. - - // Two cases: - // - there is an actual attempt started - // - we have learned from indexparts that an attempt will be retried in near future - self.inner.lock().unwrap().is_some() + self.mark_witnessed_and_notify() } /// Sleep for the duration, while letting any ongoing ancestor_detach attempt know that gc has /// been paused. - /// - /// Cancellation safe. pub(crate) async fn gc_sleeping_while>( &self, fut: F, ) -> T { // this needs to wrap the sleeping so that we can quickly let ancestor_detach continue - fut.await + let mut fut = std::pin::pin!(fut); + + loop { + tokio::select! { + x = &mut fut => { return x; }, + _ = self.gc_waiting.notified() => { + self.mark_witnessed_and_notify(); + } + } + } + } + + fn mark_witnessed_and_notify(&self) -> bool { + let mut g = self.inner.lock().unwrap(); + if let Some((_, witnessed)) = g.latest.as_mut() { + if !*witnessed { + *witnessed = true; + self.attempt_waiting.notify_one(); + } + + true + } else { + false + } } /// Acquire the exclusive lock for a new detach ancestor attempt and ensure that GC task has @@ -173,26 +197,60 @@ impl SharedState { return Err(Error::ShuttingDown); } - let (guard, barrier) = completion::channel(); - - { + let completion = { let mut guard = self.inner.lock().unwrap(); - if let Some((tl, other)) = guard.as_ref() { - if !other.is_ready() { - return Err(Error::OtherTimelineDetachOngoing(*tl)); - } - } - *guard = Some((detached.timeline_id, barrier)); - } - // FIXME: modify the index part to have a "detach-ancestor: inprogress { started_at }" - // unsure if it should be awaited to upload yet... + let completion = guard.start_new(&detached.timeline_id)?; + + // now that we changed the contents, notify any long-sleeping gc + self.gc_waiting.notify_one(); + + completion + }; + + let started_at = std::time::Instant::now(); + + let mut cancelled = std::pin::pin!(detached.cancel.cancelled()); + + loop { + tokio::select! { + _ = &mut cancelled => { return Err(Error::ShuttingDown); }, + _ = self.attempt_waiting.notified() => { + // reading a notification which was not intended for us is not a problem, + // because we check if *our* progress has been witnessed by gc. + }, + }; + + let g = self.inner.lock().unwrap(); + if g.is_gc_paused(&detached.timeline_id) { + break; + } + } // finally let gate_entered = detached.gate.enter().map_err(|_| Error::ShuttingDown)?; + let synced_in = started_at.elapsed(); + + detached + .remote_client + .schedule_started_detach_ancestor_mark_and_wait() + .await + // FIXME: aaaargh + .map_err(|_| Error::ShuttingDown)?; + + let uploaded_in = started_at.elapsed() - synced_in; + + // FIXME: get rid of this logging or make it a metric or two + tracing::info!( + sync_ms = synced_in.as_millis(), + upload_ms = uploaded_in.as_millis(), + "gc paused, gate entered, and uploaded" + ); + Ok(Attempt { - _guard: guard, + timeline_id: detached.timeline_id, + _guard: completion, gate_entered: Some(gate_entered), }) } @@ -204,22 +262,198 @@ impl SharedState { /// Cancellation safe. pub(crate) async fn complete( &self, - _attempt: Attempt, - _tenant: &Arc, + attempt: Attempt, + tenant: &Arc, ) -> Result<(), Error> { // do we need the tenant to actually activate...? yes we do, but that can happen via // background task starting, because we will similarly want to confirm that the gc has // paused, before we unpause it? - // + // assert that such and such state has been collected // find the timeline the attempt represents // using the timelines remote client, upload an index part with completion information + + { + let g = self.inner.lock().unwrap(); + + // TODO: cover the case where retry completes? + g.validate(&attempt); + } + + let attempt = scopeguard::guard(attempt, |attempt| { + // our attempt will no longer be valid, so release it + self.inner.lock().unwrap().cancel(attempt); + }); + + // no failpoint needed here, because the next one is the first mutating + + tenant + .wait_to_become_active(std::time::Duration::from_secs(9999)) + .await + .map_err(Error::WaitToActivate)?; + + let Some(timeline) = tenant + .timelines + .lock() + .unwrap() + .get(&attempt.timeline_id) + .cloned() + else { + unreachable!("unsure if there is an ordering, but perhaps this is possible?"); + }; + + // this should be an 503 at least...? + fail::fail_point!( + "timeline-detach-ancestor::complete_before_uploading", + |_| Err(Error::Failpoint( + "timeline-detach-ancestor::complete_before_uploading" + )) + ); + + timeline + .remote_client + .schedule_completed_detach_ancestor_mark_and_wait() + .await + .map_err(|_| Error::ShuttingDown)?; + + // now that the upload has gone through, we must remove this timeline from inprogress + let attempt = scopeguard::ScopeGuard::into_inner(attempt); + + self.inner.lock().unwrap().complete(attempt); + Ok(()) } } -/// Token which represents a persistent, exclusive, awaitable single attempt. +#[derive(Default)] +struct SharedStateInner { + known_ongoing: std::collections::HashSet, + latest: Option<(ExistingAttempt, bool)>, +} + +impl SharedStateInner { + fn continue_existing_attempt(&mut self, attempt: &Attempt) { + assert!(self.known_ongoing.is_empty()); + assert!(self.latest.is_none()); + self.known_ongoing.insert(attempt.timeline_id); + self.latest = Some(( + ExistingAttempt::ContinuedOverRestart(attempt.timeline_id), + false, + )); + } + + fn start_new(&mut self, detached: &TimelineId) -> Result { + let completion = if let Some((existing, witnessed)) = self.latest.as_mut() { + let completion = existing.start_new(detached)?; + *witnessed = false; + completion + } else { + let (completion, attempt) = ExistingAttempt::new(detached); + self.latest = Some((attempt, false)); + completion + }; + + self.known_ongoing.insert(*detached); + + Ok(completion) + } + + fn is_gc_paused(&self, timeline_id: &TimelineId) -> bool { + match &self.latest { + Some((ExistingAttempt::Actual(x, _), paused)) => { + assert_eq!(x, timeline_id); + *paused + } + other => { + unreachable!("unexpected state {other:?}") + } + } + } + + fn validate(&self, attempt: &Attempt) { + match self.latest.as_ref() { + Some((ExistingAttempt::ContinuedOverRestart(x), _)) if x == &attempt.timeline_id => { + assert!(self.known_ongoing.contains(&attempt.timeline_id)); + } + other => unreachable!("unexpected: {other:?}"), + } + } + + fn complete(&mut self, attempt: Attempt) { + match self.latest.as_ref() { + Some((ExistingAttempt::ContinuedOverRestart(x), witnessed)) + if x == &attempt.timeline_id => + { + let witnessed = *witnessed; + assert!(self.known_ongoing.remove(&attempt.timeline_id)); + + if self.known_ongoing.is_empty() { + self.latest = None; + tracing::info!("gc is now unblocked"); + } else { + self.latest = Some((ExistingAttempt::ReadFromIndexPart, witnessed)); + } + } + other => unreachable!("unexpected: {other:?}"), + } + } + + fn cancel(&mut self, attempt: Attempt) { + match self.latest.as_ref() { + Some((ExistingAttempt::ContinuedOverRestart(x), witnessed)) + if x == &attempt.timeline_id => + { + let witnessed = *witnessed; + assert!(!self.known_ongoing.is_empty()); + self.latest = Some((ExistingAttempt::ReadFromIndexPart, witnessed)); + } + other => unreachable!("unexpected: {other:?}"), + } + } +} + +#[derive(Debug)] +enum ExistingAttempt { + /// Informative; there are still non-zero known ongoing timeline ancestor detaches + ReadFromIndexPart, + + /// Exclusive lock carried over tenant reset + ContinuedOverRestart(TimelineId), + + /// Exclusive while barrier shows the task running + Actual(TimelineId, completion::Barrier), +} + +impl ExistingAttempt { + fn start_new(&mut self, detached: &TimelineId) -> Result { + use ExistingAttempt::*; + match self { + ReadFromIndexPart => {} + Actual(other, barrier) if barrier.is_ready() => { + if other != detached { + tracing::warn!(prev=%other, next=%detached, "switching ongoing detach; this is not expected to happen normally, but doesn't necessarily mean anything catastrophic"); + } + } + Actual(other, _) | ContinuedOverRestart(other) => { + return Err(Error::OtherTimelineDetachOngoing(*other)) + } + } + + let (guard, attempt) = Self::new(detached); + *self = attempt; + Ok(guard) + } + + fn new(detached: &TimelineId) -> (completion::Completion, Self) { + let (guard, barrier) = completion::channel(); + (guard, ExistingAttempt::Actual(*detached, barrier)) + } +} + +/// Represents an across tenant reset exclusive single attempt to detach ancestor. pub(crate) struct Attempt { + timeline_id: TimelineId, + _guard: completion::Completion, gate_entered: Option, } @@ -232,20 +466,36 @@ impl Attempt { } #[derive(Default)] -pub(crate) struct SharedStateBuilder {} +pub(crate) struct SharedStateBuilder { + inprogress: std::collections::HashSet, +} impl SharedStateBuilder { /// While loading, visit a timelines persistent [`crate::tenant::IndexPart`] and record if it is being /// detached. pub(crate) fn record_loading_timeline( &mut self, - _timeline_id: &TimelineId, - _index_part: &crate::tenant::IndexPart, + timeline_id: &TimelineId, + index_part: &crate::tenant::IndexPart, ) { + if index_part.ongoing_detach_ancestor.is_some() { + // if the loading a timeline fails, tenant loading must fail as it does right now, or + // something more elaborate needs to be done with this tracking + self.inprogress.insert(*timeline_id); + } } /// Merge the loaded not yet deleting in-progress to the existing datastructure. - pub(crate) fn build(self, _target: &SharedState) {} + pub(crate) fn build(self, target: &SharedState) { + let mut g = target.inner.lock().unwrap(); + + assert_eq!(g.latest.is_none(), g.known_ongoing.is_empty()); + + g.known_ongoing.extend(self.inprogress.into_iter()); + if g.latest.is_none() && !g.known_ongoing.is_empty() { + g.latest = Some((ExistingAttempt::ReadFromIndexPart, false)); + } + } } /// See [`Timeline::prepare_to_detach_from_ancestor`]