From ce83418aa49ac363d0af02b5ee943c2bf9cf0207 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 29 Nov 2024 20:54:28 +0100 Subject: [PATCH] spsc_fold: fix missing wakeup on receiver or sender drop while other side is waiting --- Cargo.lock | 3 +- libs/utils/Cargo.toml | 1 + libs/utils/src/sync/spsc_fold.rs | 48 ++++++++++++++++++++++++++++++++ 3 files changed, 51 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index b104a35bf5..9af7e124dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "RustyXML" @@ -7069,6 +7069,7 @@ dependencies = [ "rand 0.8.5", "regex", "routerify", + "scopeguard", "sentry", "serde", "serde_assert", diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index a52d953d66..5648072a83 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -46,6 +46,7 @@ tracing.workspace = true tracing-error.workspace = true tracing-subscriber = { workspace = true, features = ["json", "registry"] } rand.workspace = true +scopeguard.workspace = true strum.workspace = true strum_macros.workspace = true url.workspace = true diff --git a/libs/utils/src/sync/spsc_fold.rs b/libs/utils/src/sync/spsc_fold.rs index a33f8097fc..b44f766ef0 100644 --- a/libs/utils/src/sync/spsc_fold.rs +++ b/libs/utils/src/sync/spsc_fold.rs @@ -115,6 +115,9 @@ impl Sender { impl Drop for Sender { fn drop(&mut self) { + scopeguard::defer! { + self.state.wake_receiver.notify() + }; let Ok(mut guard) = self.state.value.lock() else { return; }; @@ -179,6 +182,9 @@ impl Receiver { impl Drop for Receiver { fn drop(&mut self) { + scopeguard::defer! { + self.state.wake_sender.notify() + }; let Ok(mut guard) = self.state.value.lock() else { return; }; @@ -401,4 +407,46 @@ mod tests { let result = receiver.recv().await; assert!(matches!(result, Err(RecvError::SenderGone))); } + + #[tokio::test(start_paused = true)] + async fn test_receiver_drops_after_sender_went_to_sleep() { + let (mut sender, receiver) = channel(); + let state = receiver.state.clone(); + + sender.send(23, |_, _| unreachable!()).await.unwrap(); + + let send_task = tokio::spawn(async move { sender.send(42, |_, v| Err(v)).await }); + + tokio::time::sleep(FOREVER).await; + + assert!(matches!( + &*state.value.lock().unwrap(), + &State::SenderWaitsForReceiverToConsume(_) + )); + + drop(receiver); + + let err = send_task + .await + .unwrap() + .expect_err("should unblock immediately"); + assert!(matches!(err, SendError::ReceiverGone)); + } + + #[tokio::test(start_paused = true)] + async fn test_sender_drops_after_receiver_went_to_sleep() { + let (sender, mut receiver) = channel::(); + let state = sender.state.clone(); + + let recv_task = tokio::spawn(async move { receiver.recv().await }); + + tokio::time::sleep(FOREVER).await; + + assert!(matches!(&*state.value.lock().unwrap(), &State::NoData)); + + drop(sender); + + let err = recv_task.await.unwrap().expect_err("should error"); + assert!(matches!(err, RecvError::SenderGone)); + } }