mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 00:42:54 +00:00
fix(spsc_fold): potentially missing wakeup when send()ing in state SenderWaitsForReceiverToConsume (#10318)
# Problem Before this PR, there were cases where send() in state SenderWaitsForReceiverToConsume would never be woken up by the receiver, because it never registered with `wake_sender`. Example Scenario 1: we stop polling a send() future A that was waiting for the receiver to consume. We drop A and create a new send() future B. B would return Poll::Pending and never regsister a waker. Example Scenario 2: a send() future A transitions from HasData to SenderWaitsForReceiverToConsume. This registers the context X with `wake_sender`. But before the Receiver consumes the data, we poll A from a different context Y. The state is still SenderWaitsForReceiverToConsume, but we wouldn't register the new context with `wake_sender`. When the Receiver comes around to consume and `wake_sender.notify()`s, it wakes the old context X instead of Y. # Fix Register the waker in the case where we're polled in state `SenderWaitsForReceiverToConsume`. # Relation to #10309 I found this bug while investigating #10309. There was never proof that this bug here is the root cause for #10309. In the meantime we found a more probably hypothesis for the root cause than what is being fixed here. Regardless, let's walk through my thought process about how it might have been relevant: There (in page_service), Scenario 1 does not apply because we poll the send() future to completion. Scenario 2 (`tokio::join!`) also does not apply with the current `tokio::join!()` impl, because it will just poll each future every time, each with the same context. Although if we ever used something like a FuturesUnordered anywhere, that will be using a different context, so, in that case, the bug might materialize. Regarding tokio & spurious poll in general: @conradludgate is not aware of any spurious wakeup cases in current tokio, but within a `tokio::join!()`, any wake meant for one future will poll all the futures, so that can appear as a spurious wake up to the N-1 futures of the `tokio::join!()`.
This commit is contained in:
committed by
GitHub
parent
735c66dc65
commit
db00eb41a1
@@ -96,7 +96,11 @@ impl<T: Send> Sender<T> {
|
||||
}
|
||||
}
|
||||
State::SenderWaitsForReceiverToConsume(_data) => {
|
||||
// Really, we shouldn't be polled until receiver has consumed and wakes us.
|
||||
// SAFETY: send is single threaded due to `&mut self` requirement,
|
||||
// therefore register is not concurrent.
|
||||
unsafe {
|
||||
self.state.wake_sender.register(cx.waker());
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
State::ReceiverGone => Poll::Ready(Err(SendError::ReceiverGone)),
|
||||
|
||||
Reference in New Issue
Block a user