diff --git a/libs/utils/src/sync/spsc_fold.rs b/libs/utils/src/sync/spsc_fold.rs index 7bad4f1068..0cab291d51 100644 --- a/libs/utils/src/sync/spsc_fold.rs +++ b/libs/utils/src/sync/spsc_fold.rs @@ -453,4 +453,38 @@ mod tests { let err = recv_task.await.unwrap().expect_err("should error"); assert!(matches!(err, RecvError::SenderGone)); } + + #[tokio::test(start_paused = true)] + async fn test_receiver_drop_while_waiting_for_receiver_to_consume_unblocks_sender() { + let (mut sender, receiver) = channel(); + + let state = receiver.state.clone(); + + sender.send((), |_, _| unreachable!()).await.unwrap(); + + assert!(matches!(&*state.value.lock().unwrap(), &State::HasData(_))); + + let unmergeable = sender.send((), |_, _| Err(())); + let mut unmergeable = std::pin::pin!(unmergeable); + tokio::select! { + _ = tokio::time::sleep(FOREVER) => {}, + _ = &mut unmergeable => { + panic!("unmergeable should not complete"); + }, + } + + assert!(matches!( + &*state.value.lock().unwrap(), + &State::SenderWaitsForReceiverToConsume(_) + )); + + drop(receiver); + + assert!(matches!( + &*state.value.lock().unwrap(), + &State::ReceiverGone + )); + + unmergeable.await.unwrap_err(); + } }