diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index b7cc054789..92d4e44d7a 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -461,7 +461,7 @@ pub enum DeleteTimelineError { InternalFailure, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum InnerDeleteTimelineError { // FIXME: this should be fixed by init order (either empty or from remote) StopUploadQueue(remote_timeline_client::StopError), @@ -1449,29 +1449,20 @@ impl Tenant { // need to synchronize and cannot be original semi-lockless algorithm for "upload // indexpart" part, we create a single task to delete the timeline. - let mut rx = { + let rx = { let mut g = timeline.delete_self.lock().await; - let maybe_rx = if let Some(rx) = g.as_ref() { + let maybe_rx = if let Some(maybe_done) = g.as_ref() { + use timeline::MaybeDone; // we got the lock, let's see if the previous attempt failed permanently // TODO: is here some deadlock with the lock acquisition order? - let mut rx = rx.clone(); - let spawn_new = match &*rx.borrow_and_update() { - Some(Ok(())) => return Ok(()), - Some(Err(e)) if e.is_permanent() => return Err(DeleteTimelineError::from(e)), - Some(Err(_retryable)) => true, - // FIXME: if the task panics without getting to the send_replace, we will be - // stuck here, so perhaps this should be a futures::future::Shared, only - // communicate with the joinhandle return value? - // - // there is no test for this yet - None => false, - }; - if spawn_new { - None - } else { - // this cannot be returned from None arm above, because NLL limitations. - Some(rx) + match maybe_done { + MaybeDone::Done(Ok(())) => return Ok(()), + MaybeDone::Done(Err(e)) if e.is_permanent() => { + return Err(DeleteTimelineError::from(e)) + } + MaybeDone::Pending(rx) => rx.upgrade(), + MaybeDone::Done(Err(_retryable)) => None, } } else { None @@ -1480,33 +1471,69 @@ impl Tenant { if let Some(rx) = maybe_rx { rx } else { - // try another round - let (tx, rx) = tokio::sync::watch::channel(None); + // try another time + let (tx, rx) = tokio::sync::broadcast::channel(1); // now anyone else racing will see the None - *g = Some(rx.clone()); let this = self.clone(); let timeline = timeline.clone(); - tokio::spawn( + + let rx = Arc::new(rx); + + // sadly futures::future::FutureExt::shared requires clone, which we cannot give + // for tokio::task::JoinError + // + // TODO: this could be tenant scoped task_mgr task? + tokio::spawn({ + let rx = rx.clone(); async move { + // to uphold the MaybeDone promise, we keep the channel alive *until* we've + // swapped the values + let tx = tx; + let rx = rx; + let res = this.unique_delete_timeline(&timeline).await; - let _ = tx.send_replace(Some(res)); + + { + let mut g = timeline.delete_self.lock().await; + #[cfg(debug_assertions)] + match g.as_ref() { + Some(timeline::MaybeDone::Pending(weak)) => { + let same = weak + .upgrade() + // we don't yet have Reciver::same_channel + .map(|rx2| Arc::ptr_eq(&rx, &rx2)) + .unwrap_or(false); + assert!(same, "different channel had been replaced"); + } + other => panic!("unexpected MaybeDone: {other:?}"), + } + *g = Some(timeline::MaybeDone::Done(res.clone())); + drop(rx); + } + + // now no one can get the Pending(weak) value to upgrade and they only see + // the Done(res). + + // send the result value to listeners, if any + drop(tx.send(res)); } - .instrument(tracing::info_span!("unique_delete_timeline")), - ); + .instrument(tracing::info_span!("unique_delete_timeline")) + }); + *g = Some(timeline::MaybeDone::Pending(Arc::downgrade(&rx))); rx } }; - loop { - rx.changed() - .await - .map_err(|_| DeleteTimelineError::InternalFailure)?; + { + use tokio::sync::broadcast::error::RecvError; - if let Some(res) = &*rx.borrow_and_update() { - return match res { - Ok(()) => Ok(()), - Err(e) => Err(DeleteTimelineError::from(e)), - }; + match rx.resubscribe().recv().await { + Ok(Ok(())) => Ok(()), + Ok(Err(e)) => Err(DeleteTimelineError::from(&e)), + // lagged doesn't mean anything with 1 send, but whatever, handle it the same + Err(RecvError::Closed | RecvError::Lagged(_)) => { + Err(DeleteTimelineError::InternalFailure) + } } } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index f81d36ab43..0cd82a9bdf 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -232,9 +232,22 @@ pub struct Timeline { /// New tasks can join in and await for the result if they can upgrade the receiver. If they /// cannot, it means that previous attempt completed, but did it complete successfully if we /// are still reachable? - pub(super) delete_self: tokio::sync::Mutex< - Option>>>, - >, + pub(super) delete_self: + tokio::sync::Mutex>>>, +} + +/// MaybeDone handles synchronization for multiple requests and the single actual task. +/// +/// If request handlers witness `Pending` which they are able to upgrade, they are guaranteed a +/// useful `recv().await`, where useful means "value" or "disconnect" arrives. If upgrade fails, +/// this means that "disconnect" has happened in the past. +/// +/// On successful execution the one executing task will set this to `Done` variant, with the actual +/// resulting value. +#[derive(Debug)] +pub(super) enum MaybeDone { + Pending(std::sync::Weak>), + Done(V), } /// Internal structure to hold all data needed for logical size calculation.