mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 21:50:37 +00:00
request coalescing with MaybeDone
This commit is contained in:
@@ -461,7 +461,7 @@ pub enum DeleteTimelineError {
|
||||
InternalFailure,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum InnerDeleteTimelineError {
|
||||
// FIXME: this should be fixed by init order (either empty or from remote)
|
||||
StopUploadQueue(remote_timeline_client::StopError),
|
||||
@@ -1449,29 +1449,20 @@ impl Tenant {
|
||||
// need to synchronize and cannot be original semi-lockless algorithm for "upload
|
||||
// indexpart" part, we create a single task to delete the timeline.
|
||||
|
||||
let mut rx = {
|
||||
let rx = {
|
||||
let mut g = timeline.delete_self.lock().await;
|
||||
let maybe_rx = if let Some(rx) = g.as_ref() {
|
||||
let maybe_rx = if let Some(maybe_done) = g.as_ref() {
|
||||
use timeline::MaybeDone;
|
||||
// we got the lock, let's see if the previous attempt failed permanently
|
||||
// TODO: is here some deadlock with the lock acquisition order?
|
||||
let mut rx = rx.clone();
|
||||
let spawn_new = match &*rx.borrow_and_update() {
|
||||
Some(Ok(())) => return Ok(()),
|
||||
Some(Err(e)) if e.is_permanent() => return Err(DeleteTimelineError::from(e)),
|
||||
Some(Err(_retryable)) => true,
|
||||
// FIXME: if the task panics without getting to the send_replace, we will be
|
||||
// stuck here, so perhaps this should be a futures::future::Shared, only
|
||||
// communicate with the joinhandle return value?
|
||||
//
|
||||
// there is no test for this yet
|
||||
None => false,
|
||||
};
|
||||
|
||||
if spawn_new {
|
||||
None
|
||||
} else {
|
||||
// this cannot be returned from None arm above, because NLL limitations.
|
||||
Some(rx)
|
||||
match maybe_done {
|
||||
MaybeDone::Done(Ok(())) => return Ok(()),
|
||||
MaybeDone::Done(Err(e)) if e.is_permanent() => {
|
||||
return Err(DeleteTimelineError::from(e))
|
||||
}
|
||||
MaybeDone::Pending(rx) => rx.upgrade(),
|
||||
MaybeDone::Done(Err(_retryable)) => None,
|
||||
}
|
||||
} else {
|
||||
None
|
||||
@@ -1480,33 +1471,69 @@ impl Tenant {
|
||||
if let Some(rx) = maybe_rx {
|
||||
rx
|
||||
} else {
|
||||
// try another round
|
||||
let (tx, rx) = tokio::sync::watch::channel(None);
|
||||
// try another time
|
||||
let (tx, rx) = tokio::sync::broadcast::channel(1);
|
||||
// now anyone else racing will see the None
|
||||
*g = Some(rx.clone());
|
||||
let this = self.clone();
|
||||
let timeline = timeline.clone();
|
||||
tokio::spawn(
|
||||
|
||||
let rx = Arc::new(rx);
|
||||
|
||||
// sadly futures::future::FutureExt::shared requires clone, which we cannot give
|
||||
// for tokio::task::JoinError
|
||||
//
|
||||
// TODO: this could be tenant scoped task_mgr task?
|
||||
tokio::spawn({
|
||||
let rx = rx.clone();
|
||||
async move {
|
||||
// to uphold the MaybeDone promise, we keep the channel alive *until* we've
|
||||
// swapped the values
|
||||
let tx = tx;
|
||||
let rx = rx;
|
||||
|
||||
let res = this.unique_delete_timeline(&timeline).await;
|
||||
let _ = tx.send_replace(Some(res));
|
||||
|
||||
{
|
||||
let mut g = timeline.delete_self.lock().await;
|
||||
#[cfg(debug_assertions)]
|
||||
match g.as_ref() {
|
||||
Some(timeline::MaybeDone::Pending(weak)) => {
|
||||
let same = weak
|
||||
.upgrade()
|
||||
// we don't yet have Reciver::same_channel
|
||||
.map(|rx2| Arc::ptr_eq(&rx, &rx2))
|
||||
.unwrap_or(false);
|
||||
assert!(same, "different channel had been replaced");
|
||||
}
|
||||
other => panic!("unexpected MaybeDone: {other:?}"),
|
||||
}
|
||||
*g = Some(timeline::MaybeDone::Done(res.clone()));
|
||||
drop(rx);
|
||||
}
|
||||
|
||||
// now no one can get the Pending(weak) value to upgrade and they only see
|
||||
// the Done(res).
|
||||
|
||||
// send the result value to listeners, if any
|
||||
drop(tx.send(res));
|
||||
}
|
||||
.instrument(tracing::info_span!("unique_delete_timeline")),
|
||||
);
|
||||
.instrument(tracing::info_span!("unique_delete_timeline"))
|
||||
});
|
||||
*g = Some(timeline::MaybeDone::Pending(Arc::downgrade(&rx)));
|
||||
rx
|
||||
}
|
||||
};
|
||||
|
||||
loop {
|
||||
rx.changed()
|
||||
.await
|
||||
.map_err(|_| DeleteTimelineError::InternalFailure)?;
|
||||
{
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
|
||||
if let Some(res) = &*rx.borrow_and_update() {
|
||||
return match res {
|
||||
Ok(()) => Ok(()),
|
||||
Err(e) => Err(DeleteTimelineError::from(e)),
|
||||
};
|
||||
match rx.resubscribe().recv().await {
|
||||
Ok(Ok(())) => Ok(()),
|
||||
Ok(Err(e)) => Err(DeleteTimelineError::from(&e)),
|
||||
// lagged doesn't mean anything with 1 send, but whatever, handle it the same
|
||||
Err(RecvError::Closed | RecvError::Lagged(_)) => {
|
||||
Err(DeleteTimelineError::InternalFailure)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -232,9 +232,22 @@ pub struct Timeline {
|
||||
/// New tasks can join in and await for the result if they can upgrade the receiver. If they
|
||||
/// cannot, it means that previous attempt completed, but did it complete successfully if we
|
||||
/// are still reachable?
|
||||
pub(super) delete_self: tokio::sync::Mutex<
|
||||
Option<tokio::sync::watch::Receiver<Option<Result<(), super::InnerDeleteTimelineError>>>>,
|
||||
>,
|
||||
pub(super) delete_self:
|
||||
tokio::sync::Mutex<Option<MaybeDone<Result<(), super::InnerDeleteTimelineError>>>>,
|
||||
}
|
||||
|
||||
/// MaybeDone handles synchronization for multiple requests and the single actual task.
|
||||
///
|
||||
/// If request handlers witness `Pending` which they are able to upgrade, they are guaranteed a
|
||||
/// useful `recv().await`, where useful means "value" or "disconnect" arrives. If upgrade fails,
|
||||
/// this means that "disconnect" has happened in the past.
|
||||
///
|
||||
/// On successful execution the one executing task will set this to `Done` variant, with the actual
|
||||
/// resulting value.
|
||||
#[derive(Debug)]
|
||||
pub(super) enum MaybeDone<V> {
|
||||
Pending(std::sync::Weak<tokio::sync::broadcast::Receiver<V>>),
|
||||
Done(V),
|
||||
}
|
||||
|
||||
/// Internal structure to hold all data needed for logical size calculation.
|
||||
|
||||
Reference in New Issue
Block a user