mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-25 00:50:36 +00:00
plumb the shared state through
the api for the gc pausing is quite awkward.
This commit is contained in:
@@ -1811,7 +1811,7 @@ async fn timeline_detach_ancestor_handler(
|
||||
// drop(tenant);
|
||||
|
||||
let resp = match progress {
|
||||
detach_ancestor::Progress::Prepared(_guard, prepared) => {
|
||||
detach_ancestor::Progress::Prepared(attempt, prepared) => {
|
||||
// it would be great to tag the guard on to the tenant activation future
|
||||
let reparented_timelines = state
|
||||
.tenant_manager
|
||||
@@ -1819,6 +1819,7 @@ async fn timeline_detach_ancestor_handler(
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
prepared,
|
||||
attempt,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -1832,6 +1833,7 @@ async fn timeline_detach_ancestor_handler(
|
||||
detach_ancestor::Progress::Done(resp) => resp,
|
||||
};
|
||||
|
||||
// FIXME: if the ordering is really needed and not a hashset, move it here?
|
||||
json_response(StatusCode::OK, resp)
|
||||
}
|
||||
.instrument(span)
|
||||
|
||||
@@ -304,7 +304,7 @@ pub struct Tenant {
|
||||
///
|
||||
/// After starting the timeline detach ancestor, blocking GC until it completes allows retrying
|
||||
/// the ancestor detach, until we can be certain that all reparentings have been done.
|
||||
ongoing_timeline_detach: std::sync::Mutex<Option<(TimelineId, utils::completion::Barrier)>>,
|
||||
ongoing_timeline_detach: timeline::detach_ancestor::SharedState,
|
||||
|
||||
l0_flush_global_state: L0FlushGlobalState,
|
||||
}
|
||||
@@ -986,6 +986,8 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
let mut shared_state_builder = timeline::detach_ancestor::SharedStateBuilder::default();
|
||||
|
||||
// For every timeline, download the metadata file, scan the local directory,
|
||||
// and build a layer map that contains an entry for each remote and local
|
||||
// layer file.
|
||||
@@ -995,9 +997,7 @@ impl Tenant {
|
||||
.remove(&timeline_id)
|
||||
.expect("just put it in above");
|
||||
|
||||
// FIXME: collect here **any** timelines which have started and not finished
|
||||
// detach_ancestor (ignoring the ones which have started deletion instead)
|
||||
// then later reflect it in the Tenant::detach_ancestor whatever
|
||||
shared_state_builder.record_loading_timeline(&index_part);
|
||||
|
||||
// TODO again handle early failure
|
||||
self.load_remote_timeline(
|
||||
@@ -1043,6 +1043,8 @@ impl Tenant {
|
||||
// IndexPart is the source of truth.
|
||||
self.clean_up_timelines(&existent_timelines)?;
|
||||
|
||||
shared_state_builder.build(&self.ongoing_timeline_detach);
|
||||
|
||||
fail::fail_point!("attach-before-activate", |_| {
|
||||
anyhow::bail!("attach-before-activate");
|
||||
});
|
||||
@@ -1615,6 +1617,11 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
if self.ongoing_timeline_detach.attempt_blocks_gc() {
|
||||
info!("Skipping GC while there is an ongoing detach_ancestor attempt");
|
||||
return Ok(GcResult::default());
|
||||
}
|
||||
|
||||
self.gc_iteration_internal(target_timeline_id, horizon, pitr, cancel, ctx)
|
||||
.await
|
||||
}
|
||||
@@ -2622,7 +2629,7 @@ impl Tenant {
|
||||
&crate::metrics::tenant_throttling::TIMELINE_GET,
|
||||
)),
|
||||
tenant_conf: Arc::new(ArcSwap::from_pointee(attached_conf)),
|
||||
ongoing_timeline_detach: std::sync::Mutex::default(),
|
||||
ongoing_timeline_detach: Default::default(),
|
||||
l0_flush_global_state,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,7 +54,7 @@ use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::remote_timeline_client::remote_tenant_path;
|
||||
use super::secondary::SecondaryTenant;
|
||||
use super::timeline::detach_ancestor::PreparedTimelineDetach;
|
||||
use super::timeline::detach_ancestor::{self, PreparedTimelineDetach};
|
||||
use super::TenantSharedResources;
|
||||
|
||||
/// For a tenant that appears in TenantsMap, it may either be
|
||||
@@ -1968,8 +1968,10 @@ impl TenantManager {
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
prepared: PreparedTimelineDetach,
|
||||
mut attempt: detach_ancestor::Attempt,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<TimelineId>, anyhow::Error> {
|
||||
// FIXME: this is unnecessary, slotguard already has these semantics
|
||||
struct RevertOnDropSlot(Option<SlotGuard>);
|
||||
|
||||
impl Drop for RevertOnDropSlot {
|
||||
@@ -2017,12 +2019,14 @@ impl TenantManager {
|
||||
|
||||
let timeline = tenant.get_timeline(timeline_id, true)?;
|
||||
|
||||
let reparented = timeline
|
||||
let resp = timeline
|
||||
.complete_detaching_timeline_ancestor(&tenant, prepared, ctx)
|
||||
.await?;
|
||||
|
||||
let mut slot_guard = slot_guard.into_inner();
|
||||
|
||||
attempt.before_shutdown();
|
||||
|
||||
let (_guard, progress) = utils::completion::channel();
|
||||
match tenant.shutdown(progress, ShutdownMode::Hard).await {
|
||||
Ok(()) => {
|
||||
@@ -2051,9 +2055,14 @@ impl TenantManager {
|
||||
ctx,
|
||||
);
|
||||
|
||||
slot_guard.upsert(TenantSlot::Attached(tenant))?;
|
||||
slot_guard.upsert(TenantSlot::Attached(tenant.clone()))?;
|
||||
|
||||
Ok(reparented)
|
||||
tenant
|
||||
.ongoing_timeline_detach
|
||||
.complete(attempt, &tenant)
|
||||
.await?;
|
||||
|
||||
Ok(resp)
|
||||
}
|
||||
|
||||
/// A page service client sends a TenantId, and to look up the correct Tenant we must
|
||||
|
||||
@@ -129,9 +129,11 @@ pub fn start_background_loops(
|
||||
let background_jobs_can_start = background_jobs_can_start.cloned();
|
||||
async move {
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
let can_start = completion::Barrier::maybe_wait(background_jobs_can_start);
|
||||
let can_start = tenant.ongoing_timeline_detach.gc_sleeping_while(can_start);
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => { return Ok(()) },
|
||||
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
|
||||
_ = can_start => {}
|
||||
};
|
||||
gc_loop(tenant, cancel)
|
||||
.instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
|
||||
@@ -361,14 +363,13 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
if first {
|
||||
first = false;
|
||||
|
||||
if delay_by_lease_length(tenant.get_lsn_lease_length(), &cancel)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
let delays = async {
|
||||
delay_by_lease_length(tenant.get_lsn_lease_length(), &cancel).await?;
|
||||
random_init_delay(period, &cancel).await?;
|
||||
Ok::<_, Cancelled>(())
|
||||
};
|
||||
|
||||
if random_init_delay(period, &cancel).await.is_err() {
|
||||
if tenant.ongoing_timeline_detach.gc_sleeping_while(delays).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -404,8 +405,8 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
let wait_duration = Duration::from_secs_f64(wait_duration);
|
||||
|
||||
error!(
|
||||
"Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
|
||||
);
|
||||
"Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
|
||||
);
|
||||
wait_duration
|
||||
}
|
||||
}
|
||||
@@ -414,7 +415,9 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Gc);
|
||||
|
||||
// Sleep
|
||||
if tokio::time::timeout(sleep_duration, cancel.cancelled())
|
||||
let cancelled = cancel.cancelled();
|
||||
let cancelled = tenant.ongoing_timeline_detach.gc_sleeping_while(cancelled);
|
||||
if tokio::time::timeout(sleep_duration, cancelled)
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
|
||||
@@ -92,7 +92,7 @@ impl From<FlushLayerError> for Error {
|
||||
}
|
||||
|
||||
pub(crate) enum Progress {
|
||||
Prepared(completion::Completion, PreparedTimelineDetach),
|
||||
Prepared(Attempt, PreparedTimelineDetach),
|
||||
Done(AncestorDetached),
|
||||
}
|
||||
|
||||
@@ -121,28 +121,71 @@ impl Default for Options {
|
||||
///
|
||||
/// Currently this is tracked at tenant level, but it could be moved to be on the roots
|
||||
/// of each timeline tree.
|
||||
struct SharedState {}
|
||||
pub(crate) struct SharedState {
|
||||
inner: std::sync::Mutex<Option<(TimelineId, completion::Barrier)>>,
|
||||
}
|
||||
|
||||
impl Default for SharedState {
|
||||
fn default() -> Self {
|
||||
SharedState {
|
||||
inner: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SharedState {
|
||||
/// Only GC must be paused while a detach ancestor is ongoing. Compaction can happen, to aid
|
||||
/// with any ongoing ingestion. Compaction even after restart is ok because layers will not be
|
||||
/// removed until the detach has been persistently completed.
|
||||
///
|
||||
/// Cancellation safe.
|
||||
pub(super) async fn pause_gc(&self) {
|
||||
pub(crate) fn attempt_blocks_gc(&self) -> bool {
|
||||
// if we have any started and not finished ancestor detaches, we must remain paused
|
||||
// and also let any trying to start operation know that we've paused.
|
||||
false
|
||||
}
|
||||
|
||||
/// Sleep for the duration, while letting any ongoing ancestor_detach attempt know that gc has
|
||||
/// been paused.
|
||||
///
|
||||
/// Cancellation safe.
|
||||
pub(crate) async fn gc_sleeping_while<T, F: std::future::Future<Output = T>>(
|
||||
&self,
|
||||
fut: F,
|
||||
) -> T {
|
||||
// this needs to wrap the sleeping so that we can quickly let ancestor_detach continue
|
||||
fut.await
|
||||
}
|
||||
|
||||
/// Acquire the exclusive lock for a new detach ancestor attempt and ensure that GC task has
|
||||
/// been persistently paused via [`crate::tenant::IndexPart`], awaiting for completion.
|
||||
///
|
||||
/// Cancellation safe.
|
||||
async fn start_new_attempt(
|
||||
&self,
|
||||
_remote_client: &crate::tenant::remote_timeline_client::RemoteTimelineClient,
|
||||
) -> Result<completion::Completion, Error> {
|
||||
Err(Error::OtherTimelineDetachOngoing(TimelineId::generate()))
|
||||
async fn start_new_attempt(&self, detached: &Arc<Timeline>) -> Result<Attempt, Error> {
|
||||
if detached.cancel.is_cancelled() {
|
||||
return Err(Error::ShuttingDown);
|
||||
}
|
||||
|
||||
let (guard, barrier) = completion::channel();
|
||||
|
||||
{
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
if let Some((tl, other)) = guard.as_ref() {
|
||||
if !other.is_ready() {
|
||||
return Err(Error::OtherTimelineDetachOngoing(*tl));
|
||||
}
|
||||
}
|
||||
*guard = Some((detached.timeline_id, barrier));
|
||||
}
|
||||
|
||||
// FIXME: modify the index part to have a "detach-ancestor: inprogress { started_at }"
|
||||
// unsure if it should be awaited to upload yet...
|
||||
|
||||
// finally
|
||||
let gate_entered = detached.gate.enter().map_err(|_| Error::ShuttingDown)?;
|
||||
|
||||
Ok(Attempt {
|
||||
_guard: guard,
|
||||
gate_entered: Some(gate_entered),
|
||||
})
|
||||
}
|
||||
|
||||
/// Completes a previously started detach ancestor attempt. To be called *after* the operation
|
||||
@@ -150,30 +193,44 @@ impl SharedState {
|
||||
/// can be done afterwards.
|
||||
///
|
||||
/// Cancellation safe.
|
||||
async fn complete(
|
||||
pub(crate) async fn complete(
|
||||
&self,
|
||||
_attempt: DetachAncestorAttempt,
|
||||
_remote_client: &crate::tenant::remote_timeline_client::RemoteTimelineClient,
|
||||
_attempt: Attempt,
|
||||
_tenant: &Arc<Tenant>,
|
||||
) -> Result<(), Error> {
|
||||
Err(Error::ShuttingDown)
|
||||
// do we need the tenant to actually activate...? yes we do, but that can happen via
|
||||
// background task starting, because we will similarly want to confirm that the gc has
|
||||
// paused, before we unpause it?
|
||||
//
|
||||
// assert that such and such state has been collected
|
||||
// find the timeline the attempt represents
|
||||
// using the timelines remote client, upload an index part with completion information
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Token which represents a persistent, exclusive, awaitable single attempt.
|
||||
struct DetachAncestorAttempt {}
|
||||
pub(crate) struct Attempt {
|
||||
_guard: completion::Completion,
|
||||
gate_entered: Option<utils::sync::gate::GateGuard>,
|
||||
}
|
||||
|
||||
impl DetachAncestorAttempt {}
|
||||
impl Attempt {
|
||||
pub(crate) fn before_shutdown(&mut self) {
|
||||
let taken = self.gate_entered.take();
|
||||
assert!(taken.is_some());
|
||||
}
|
||||
}
|
||||
|
||||
struct SharedStateBuilder {}
|
||||
#[derive(Default)]
|
||||
pub(crate) struct SharedStateBuilder {}
|
||||
|
||||
impl SharedStateBuilder {
|
||||
/// While loading, visit a timelines persistent [`crate::tenant::IndexPart`] and record if it is being
|
||||
/// detached.
|
||||
pub(super) fn record_loaded_timeline(&mut self, _index_part: &crate::tenant::IndexPart) {}
|
||||
pub(crate) fn record_loading_timeline(&mut self, _index_part: &crate::tenant::IndexPart) {}
|
||||
|
||||
pub(super) fn build(self) -> Option<SharedState> {
|
||||
None
|
||||
}
|
||||
pub(crate) fn build(self, _target: &SharedState) {}
|
||||
}
|
||||
|
||||
/// See [`Timeline::prepare_to_detach_from_ancestor`]
|
||||
@@ -272,25 +329,10 @@ pub(super) async fn prepare(
|
||||
return Err(TooManyAncestors);
|
||||
}
|
||||
|
||||
// before we acquire the gate, we must mark the ancestor as having a detach operation
|
||||
// ongoing which will block other concurrent detach operations so we don't get to ackward
|
||||
// situations where there would be two branches trying to reparent earlier branches.
|
||||
let (guard, barrier) = completion::channel();
|
||||
|
||||
{
|
||||
let mut guard = tenant.ongoing_timeline_detach.lock().unwrap();
|
||||
if let Some((tl, other)) = guard.as_ref() {
|
||||
if !other.is_ready() {
|
||||
return Err(OtherTimelineDetachOngoing(*tl));
|
||||
}
|
||||
}
|
||||
*guard = Some((detached.timeline_id, barrier));
|
||||
}
|
||||
|
||||
// FIXME: modify the index part to have a "detach-ancestor: inprogress { started_at }"
|
||||
// unsure if it should be awaited to upload yet...
|
||||
|
||||
let _gate_entered = detached.gate.enter().map_err(|_| ShuttingDown)?;
|
||||
let attempt = tenant
|
||||
.ongoing_timeline_detach
|
||||
.start_new_attempt(detached)
|
||||
.await?;
|
||||
|
||||
utils::pausable_failpoint!("timeline-detach-ancestor::before_starting_after_locking_pausable");
|
||||
|
||||
@@ -453,7 +495,7 @@ pub(super) async fn prepare(
|
||||
|
||||
let prepared = PreparedTimelineDetach { layers: new_layers };
|
||||
|
||||
Ok(Progress::Prepared(guard, prepared))
|
||||
Ok(Progress::Prepared(attempt, prepared))
|
||||
}
|
||||
|
||||
fn partition_work(
|
||||
|
||||
Reference in New Issue
Block a user