fix: resubscribe while holding the lock

This commit is contained in:
Joonas Koivunen
2023-05-05 02:40:15 +03:00
parent 022ff36481
commit fa947f0f4f

View File

@@ -1450,7 +1450,7 @@ impl Tenant {
// need to synchronize and cannot be original semi-lockless algorithm for "upload
// indexpart" part, we create a single task to delete the timeline.
let rx = {
let mut rx = {
let mut g = timeline.delete_self.lock().await;
let maybe_rx = if let Some(maybe_done) = g.as_ref() {
use timeline::MaybeDone;
@@ -1523,12 +1523,15 @@ impl Tenant {
*g = Some(timeline::MaybeDone::Pending(Arc::downgrade(&rx)));
rx
}
// subscribe while the mutex is held, so we are guaranteed to receive the message,
// unless a panic comes.
.resubscribe()
};
{
use tokio::sync::broadcast::error::RecvError;
match rx.resubscribe().recv().await {
match rx.recv().await {
Ok(Ok(())) => Ok(()),
Ok(Err(e)) => Err(DeleteTimelineError::from(&e)),
// lagged doesn't mean anything with 1 send, but whatever, handle it the same