From db00eb41a1f27ce5d122fbdfbf3f958e5df9b79f Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 10 Jan 2025 12:06:03 +0100 Subject: [PATCH] fix(spsc_fold): potentially missing wakeup when send()ing in state `SenderWaitsForReceiverToConsume` (#10318) # Problem Before this PR, there were cases where send() in state SenderWaitsForReceiverToConsume would never be woken up by the receiver, because it never registered with `wake_sender`. Example Scenario 1: we stop polling a send() future A that was waiting for the receiver to consume. We drop A and create a new send() future B. B would return Poll::Pending and never regsister a waker. Example Scenario 2: a send() future A transitions from HasData to SenderWaitsForReceiverToConsume. This registers the context X with `wake_sender`. But before the Receiver consumes the data, we poll A from a different context Y. The state is still SenderWaitsForReceiverToConsume, but we wouldn't register the new context with `wake_sender`. When the Receiver comes around to consume and `wake_sender.notify()`s, it wakes the old context X instead of Y. # Fix Register the waker in the case where we're polled in state `SenderWaitsForReceiverToConsume`. # Relation to #10309 I found this bug while investigating #10309. There was never proof that this bug here is the root cause for #10309. In the meantime we found a more probably hypothesis for the root cause than what is being fixed here. Regardless, let's walk through my thought process about how it might have been relevant: There (in page_service), Scenario 1 does not apply because we poll the send() future to completion. Scenario 2 (`tokio::join!`) also does not apply with the current `tokio::join!()` impl, because it will just poll each future every time, each with the same context. Although if we ever used something like a FuturesUnordered anywhere, that will be using a different context, so, in that case, the bug might materialize. Regarding tokio & spurious poll in general: @conradludgate is not aware of any spurious wakeup cases in current tokio, but within a `tokio::join!()`, any wake meant for one future will poll all the futures, so that can appear as a spurious wake up to the N-1 futures of the `tokio::join!()`. --- libs/utils/src/sync/spsc_fold.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/libs/utils/src/sync/spsc_fold.rs b/libs/utils/src/sync/spsc_fold.rs index b44f766ef0..7bad4f1068 100644 --- a/libs/utils/src/sync/spsc_fold.rs +++ b/libs/utils/src/sync/spsc_fold.rs @@ -96,7 +96,11 @@ impl Sender { } } State::SenderWaitsForReceiverToConsume(_data) => { - // Really, we shouldn't be polled until receiver has consumed and wakes us. + // SAFETY: send is single threaded due to `&mut self` requirement, + // therefore register is not concurrent. + unsafe { + self.state.wake_sender.register(cx.waker()); + } Poll::Pending } State::ReceiverGone => Poll::Ready(Err(SendError::ReceiverGone)),