From 849fe0f191428f2808ac12fd6023f315890aae32 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Wed, 17 Jul 2024 17:02:48 +0000 Subject: [PATCH] plumb the shared state through the api for the gc pausing is quite awkward. --- pageserver/src/http/routes.rs | 4 +- pageserver/src/tenant.rs | 17 ++- pageserver/src/tenant/mgr.rs | 17 ++- pageserver/src/tenant/tasks.rs | 25 ++-- .../src/tenant/timeline/detach_ancestor.rs | 124 ++++++++++++------ 5 files changed, 125 insertions(+), 62 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 7935aeb5e9..65854d4d9b 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1811,7 +1811,7 @@ async fn timeline_detach_ancestor_handler( // drop(tenant); let resp = match progress { - detach_ancestor::Progress::Prepared(_guard, prepared) => { + detach_ancestor::Progress::Prepared(attempt, prepared) => { // it would be great to tag the guard on to the tenant activation future let reparented_timelines = state .tenant_manager @@ -1819,6 +1819,7 @@ async fn timeline_detach_ancestor_handler( tenant_shard_id, timeline_id, prepared, + attempt, ctx, ) .await @@ -1832,6 +1833,7 @@ async fn timeline_detach_ancestor_handler( detach_ancestor::Progress::Done(resp) => resp, }; + // FIXME: if the ordering is really needed and not a hashset, move it here? json_response(StatusCode::OK, resp) } .instrument(span) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 43bdd9baa4..603d095e44 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -304,7 +304,7 @@ pub struct Tenant { /// /// After starting the timeline detach ancestor, blocking GC until it completes allows retrying /// the ancestor detach, until we can be certain that all reparentings have been done. - ongoing_timeline_detach: std::sync::Mutex>, + ongoing_timeline_detach: timeline::detach_ancestor::SharedState, l0_flush_global_state: L0FlushGlobalState, } @@ -986,6 +986,8 @@ impl Tenant { } } + let mut shared_state_builder = timeline::detach_ancestor::SharedStateBuilder::default(); + // For every timeline, download the metadata file, scan the local directory, // and build a layer map that contains an entry for each remote and local // layer file. @@ -995,9 +997,7 @@ impl Tenant { .remove(&timeline_id) .expect("just put it in above"); - // FIXME: collect here **any** timelines which have started and not finished - // detach_ancestor (ignoring the ones which have started deletion instead) - // then later reflect it in the Tenant::detach_ancestor whatever + shared_state_builder.record_loading_timeline(&index_part); // TODO again handle early failure self.load_remote_timeline( @@ -1043,6 +1043,8 @@ impl Tenant { // IndexPart is the source of truth. self.clean_up_timelines(&existent_timelines)?; + shared_state_builder.build(&self.ongoing_timeline_detach); + fail::fail_point!("attach-before-activate", |_| { anyhow::bail!("attach-before-activate"); }); @@ -1615,6 +1617,11 @@ impl Tenant { } } + if self.ongoing_timeline_detach.attempt_blocks_gc() { + info!("Skipping GC while there is an ongoing detach_ancestor attempt"); + return Ok(GcResult::default()); + } + self.gc_iteration_internal(target_timeline_id, horizon, pitr, cancel, ctx) .await } @@ -2622,7 +2629,7 @@ impl Tenant { &crate::metrics::tenant_throttling::TIMELINE_GET, )), tenant_conf: Arc::new(ArcSwap::from_pointee(attached_conf)), - ongoing_timeline_detach: std::sync::Mutex::default(), + ongoing_timeline_detach: Default::default(), l0_flush_global_state, } } diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 75c8682c97..7f9b3b7e93 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -54,7 +54,7 @@ use utils::id::{TenantId, TimelineId}; use super::remote_timeline_client::remote_tenant_path; use super::secondary::SecondaryTenant; -use super::timeline::detach_ancestor::PreparedTimelineDetach; +use super::timeline::detach_ancestor::{self, PreparedTimelineDetach}; use super::TenantSharedResources; /// For a tenant that appears in TenantsMap, it may either be @@ -1968,8 +1968,10 @@ impl TenantManager { tenant_shard_id: TenantShardId, timeline_id: TimelineId, prepared: PreparedTimelineDetach, + mut attempt: detach_ancestor::Attempt, ctx: &RequestContext, ) -> Result, anyhow::Error> { + // FIXME: this is unnecessary, slotguard already has these semantics struct RevertOnDropSlot(Option); impl Drop for RevertOnDropSlot { @@ -2017,12 +2019,14 @@ impl TenantManager { let timeline = tenant.get_timeline(timeline_id, true)?; - let reparented = timeline + let resp = timeline .complete_detaching_timeline_ancestor(&tenant, prepared, ctx) .await?; let mut slot_guard = slot_guard.into_inner(); + attempt.before_shutdown(); + let (_guard, progress) = utils::completion::channel(); match tenant.shutdown(progress, ShutdownMode::Hard).await { Ok(()) => { @@ -2051,9 +2055,14 @@ impl TenantManager { ctx, ); - slot_guard.upsert(TenantSlot::Attached(tenant))?; + slot_guard.upsert(TenantSlot::Attached(tenant.clone()))?; - Ok(reparented) + tenant + .ongoing_timeline_detach + .complete(attempt, &tenant) + .await?; + + Ok(resp) } /// A page service client sends a TenantId, and to look up the correct Tenant we must diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 7f59e54eb7..71b093b4a2 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -129,9 +129,11 @@ pub fn start_background_loops( let background_jobs_can_start = background_jobs_can_start.cloned(); async move { let cancel = task_mgr::shutdown_token(); + let can_start = completion::Barrier::maybe_wait(background_jobs_can_start); + let can_start = tenant.ongoing_timeline_detach.gc_sleeping_while(can_start); tokio::select! { _ = cancel.cancelled() => { return Ok(()) }, - _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {} + _ = can_start => {} }; gc_loop(tenant, cancel) .instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug())) @@ -361,14 +363,13 @@ async fn gc_loop(tenant: Arc, cancel: CancellationToken) { if first { first = false; - if delay_by_lease_length(tenant.get_lsn_lease_length(), &cancel) - .await - .is_err() - { - break; - } + let delays = async { + delay_by_lease_length(tenant.get_lsn_lease_length(), &cancel).await?; + random_init_delay(period, &cancel).await?; + Ok::<_, Cancelled>(()) + }; - if random_init_delay(period, &cancel).await.is_err() { + if tenant.ongoing_timeline_detach.gc_sleeping_while(delays).await.is_err() { break; } } @@ -404,8 +405,8 @@ async fn gc_loop(tenant: Arc, cancel: CancellationToken) { let wait_duration = Duration::from_secs_f64(wait_duration); error!( - "Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}", - ); + "Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}", + ); wait_duration } } @@ -414,7 +415,9 @@ async fn gc_loop(tenant: Arc, cancel: CancellationToken) { warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Gc); // Sleep - if tokio::time::timeout(sleep_duration, cancel.cancelled()) + let cancelled = cancel.cancelled(); + let cancelled = tenant.ongoing_timeline_detach.gc_sleeping_while(cancelled); + if tokio::time::timeout(sleep_duration, cancelled) .await .is_ok() { diff --git a/pageserver/src/tenant/timeline/detach_ancestor.rs b/pageserver/src/tenant/timeline/detach_ancestor.rs index 54a3c33b2f..7b0af067f9 100644 --- a/pageserver/src/tenant/timeline/detach_ancestor.rs +++ b/pageserver/src/tenant/timeline/detach_ancestor.rs @@ -92,7 +92,7 @@ impl From for Error { } pub(crate) enum Progress { - Prepared(completion::Completion, PreparedTimelineDetach), + Prepared(Attempt, PreparedTimelineDetach), Done(AncestorDetached), } @@ -121,28 +121,71 @@ impl Default for Options { /// /// Currently this is tracked at tenant level, but it could be moved to be on the roots /// of each timeline tree. -struct SharedState {} +pub(crate) struct SharedState { + inner: std::sync::Mutex>, +} + +impl Default for SharedState { + fn default() -> Self { + SharedState { + inner: Default::default(), + } + } +} impl SharedState { /// Only GC must be paused while a detach ancestor is ongoing. Compaction can happen, to aid /// with any ongoing ingestion. Compaction even after restart is ok because layers will not be /// removed until the detach has been persistently completed. - /// - /// Cancellation safe. - pub(super) async fn pause_gc(&self) { + pub(crate) fn attempt_blocks_gc(&self) -> bool { // if we have any started and not finished ancestor detaches, we must remain paused // and also let any trying to start operation know that we've paused. + false + } + + /// Sleep for the duration, while letting any ongoing ancestor_detach attempt know that gc has + /// been paused. + /// + /// Cancellation safe. + pub(crate) async fn gc_sleeping_while>( + &self, + fut: F, + ) -> T { + // this needs to wrap the sleeping so that we can quickly let ancestor_detach continue + fut.await } /// Acquire the exclusive lock for a new detach ancestor attempt and ensure that GC task has /// been persistently paused via [`crate::tenant::IndexPart`], awaiting for completion. /// /// Cancellation safe. - async fn start_new_attempt( - &self, - _remote_client: &crate::tenant::remote_timeline_client::RemoteTimelineClient, - ) -> Result { - Err(Error::OtherTimelineDetachOngoing(TimelineId::generate())) + async fn start_new_attempt(&self, detached: &Arc) -> Result { + if detached.cancel.is_cancelled() { + return Err(Error::ShuttingDown); + } + + let (guard, barrier) = completion::channel(); + + { + let mut guard = self.inner.lock().unwrap(); + if let Some((tl, other)) = guard.as_ref() { + if !other.is_ready() { + return Err(Error::OtherTimelineDetachOngoing(*tl)); + } + } + *guard = Some((detached.timeline_id, barrier)); + } + + // FIXME: modify the index part to have a "detach-ancestor: inprogress { started_at }" + // unsure if it should be awaited to upload yet... + + // finally + let gate_entered = detached.gate.enter().map_err(|_| Error::ShuttingDown)?; + + Ok(Attempt { + _guard: guard, + gate_entered: Some(gate_entered), + }) } /// Completes a previously started detach ancestor attempt. To be called *after* the operation @@ -150,30 +193,44 @@ impl SharedState { /// can be done afterwards. /// /// Cancellation safe. - async fn complete( + pub(crate) async fn complete( &self, - _attempt: DetachAncestorAttempt, - _remote_client: &crate::tenant::remote_timeline_client::RemoteTimelineClient, + _attempt: Attempt, + _tenant: &Arc, ) -> Result<(), Error> { - Err(Error::ShuttingDown) + // do we need the tenant to actually activate...? yes we do, but that can happen via + // background task starting, because we will similarly want to confirm that the gc has + // paused, before we unpause it? + // + // assert that such and such state has been collected + // find the timeline the attempt represents + // using the timelines remote client, upload an index part with completion information + Ok(()) } } /// Token which represents a persistent, exclusive, awaitable single attempt. -struct DetachAncestorAttempt {} +pub(crate) struct Attempt { + _guard: completion::Completion, + gate_entered: Option, +} -impl DetachAncestorAttempt {} +impl Attempt { + pub(crate) fn before_shutdown(&mut self) { + let taken = self.gate_entered.take(); + assert!(taken.is_some()); + } +} -struct SharedStateBuilder {} +#[derive(Default)] +pub(crate) struct SharedStateBuilder {} impl SharedStateBuilder { /// While loading, visit a timelines persistent [`crate::tenant::IndexPart`] and record if it is being /// detached. - pub(super) fn record_loaded_timeline(&mut self, _index_part: &crate::tenant::IndexPart) {} + pub(crate) fn record_loading_timeline(&mut self, _index_part: &crate::tenant::IndexPart) {} - pub(super) fn build(self) -> Option { - None - } + pub(crate) fn build(self, _target: &SharedState) {} } /// See [`Timeline::prepare_to_detach_from_ancestor`] @@ -272,25 +329,10 @@ pub(super) async fn prepare( return Err(TooManyAncestors); } - // before we acquire the gate, we must mark the ancestor as having a detach operation - // ongoing which will block other concurrent detach operations so we don't get to ackward - // situations where there would be two branches trying to reparent earlier branches. - let (guard, barrier) = completion::channel(); - - { - let mut guard = tenant.ongoing_timeline_detach.lock().unwrap(); - if let Some((tl, other)) = guard.as_ref() { - if !other.is_ready() { - return Err(OtherTimelineDetachOngoing(*tl)); - } - } - *guard = Some((detached.timeline_id, barrier)); - } - - // FIXME: modify the index part to have a "detach-ancestor: inprogress { started_at }" - // unsure if it should be awaited to upload yet... - - let _gate_entered = detached.gate.enter().map_err(|_| ShuttingDown)?; + let attempt = tenant + .ongoing_timeline_detach + .start_new_attempt(detached) + .await?; utils::pausable_failpoint!("timeline-detach-ancestor::before_starting_after_locking_pausable"); @@ -453,7 +495,7 @@ pub(super) async fn prepare( let prepared = PreparedTimelineDetach { layers: new_layers }; - Ok(Progress::Prepared(guard, prepared)) + Ok(Progress::Prepared(attempt, prepared)) } fn partition_work(