mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 17:02:56 +00:00
spsc_fold: fix missing wakeup on receiver or sender drop while other side is waiting
This commit is contained in:
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -1,6 +1,6 @@
|
||||
# This file is automatically @generated by Cargo.
|
||||
# It is not intended for manual editing.
|
||||
version = 3
|
||||
version = 4
|
||||
|
||||
[[package]]
|
||||
name = "RustyXML"
|
||||
@@ -7069,6 +7069,7 @@ dependencies = [
|
||||
"rand 0.8.5",
|
||||
"regex",
|
||||
"routerify",
|
||||
"scopeguard",
|
||||
"sentry",
|
||||
"serde",
|
||||
"serde_assert",
|
||||
|
||||
@@ -46,6 +46,7 @@ tracing.workspace = true
|
||||
tracing-error.workspace = true
|
||||
tracing-subscriber = { workspace = true, features = ["json", "registry"] }
|
||||
rand.workspace = true
|
||||
scopeguard.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
url.workspace = true
|
||||
|
||||
@@ -115,6 +115,9 @@ impl<T: Send> Sender<T> {
|
||||
|
||||
impl<T> Drop for Sender<T> {
|
||||
fn drop(&mut self) {
|
||||
scopeguard::defer! {
|
||||
self.state.wake_receiver.notify()
|
||||
};
|
||||
let Ok(mut guard) = self.state.value.lock() else {
|
||||
return;
|
||||
};
|
||||
@@ -179,6 +182,9 @@ impl<T: Send> Receiver<T> {
|
||||
|
||||
impl<T> Drop for Receiver<T> {
|
||||
fn drop(&mut self) {
|
||||
scopeguard::defer! {
|
||||
self.state.wake_sender.notify()
|
||||
};
|
||||
let Ok(mut guard) = self.state.value.lock() else {
|
||||
return;
|
||||
};
|
||||
@@ -401,4 +407,46 @@ mod tests {
|
||||
let result = receiver.recv().await;
|
||||
assert!(matches!(result, Err(RecvError::SenderGone)));
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn test_receiver_drops_after_sender_went_to_sleep() {
|
||||
let (mut sender, receiver) = channel();
|
||||
let state = receiver.state.clone();
|
||||
|
||||
sender.send(23, |_, _| unreachable!()).await.unwrap();
|
||||
|
||||
let send_task = tokio::spawn(async move { sender.send(42, |_, v| Err(v)).await });
|
||||
|
||||
tokio::time::sleep(FOREVER).await;
|
||||
|
||||
assert!(matches!(
|
||||
&*state.value.lock().unwrap(),
|
||||
&State::SenderWaitsForReceiverToConsume(_)
|
||||
));
|
||||
|
||||
drop(receiver);
|
||||
|
||||
let err = send_task
|
||||
.await
|
||||
.unwrap()
|
||||
.expect_err("should unblock immediately");
|
||||
assert!(matches!(err, SendError::ReceiverGone));
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn test_sender_drops_after_receiver_went_to_sleep() {
|
||||
let (sender, mut receiver) = channel::<usize>();
|
||||
let state = sender.state.clone();
|
||||
|
||||
let recv_task = tokio::spawn(async move { receiver.recv().await });
|
||||
|
||||
tokio::time::sleep(FOREVER).await;
|
||||
|
||||
assert!(matches!(&*state.value.lock().unwrap(), &State::NoData));
|
||||
|
||||
drop(sender);
|
||||
|
||||
let err = recv_task.await.unwrap().expect_err("should error");
|
||||
assert!(matches!(err, RecvError::SenderGone));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user