diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 058d8b3269..d246e5e00b 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -997,86 +997,110 @@ impl PageServerHandler { .expect("implementation error: timeline_handles should not be locked"); let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1); - let read_message_task = tokio::spawn({ - let cancel = self.cancel.child_token(); - let ctx = ctx.attached_child(); - async move { - let mut pgb_reader = pgb_reader; - loop { - let msg = Self::pagestream_read_message( - &mut pgb_reader, - tenant_id, - timeline_id, - &mut timeline_handles, - &cancel, - &ctx, - ) - .await?; - match requests_tx.send(msg).await { - Ok(()) => {} - Err(tokio::sync::mpsc::error::SendError(_)) => { - debug!("request processing pipeline downstream dead"); + let read_message_task = tokio::spawn( + { + let cancel = self.cancel.child_token(); + let ctx = ctx.attached_child(); + async move { + let mut pgb_reader = pgb_reader; + loop { + let msg = Self::pagestream_read_message( + &mut pgb_reader, + tenant_id, + timeline_id, + &mut timeline_handles, + &cancel, + &ctx, + ) + .await?; + match requests_tx.send(msg).await { + Ok(()) => {} + Err(tokio::sync::mpsc::error::SendError(_)) => { + debug!("request processing pipeline downstream dead"); + break; + } + } + } + anyhow::Ok((pgb_reader, timeline_handles)) + } + } + .instrument(tracing::info_span!("reading")), + ); + + let ready_for_next_batch = Arc::new(tokio::sync::Notify::new()); + let (batched_tx, mut batched_rx) = tokio::sync::mpsc::channel(1); + tokio::spawn( + { + let ready_for_next_batch = Arc::clone(&ready_for_next_batch); + async move { + let mut batch: Option> = None; + let mut stop = false; + while !stop { + let maybe_flush_msg = tokio::select! { + req = requests_rx.recv() => { + let arg = match req { + Some(Some(req)) => Some(req), + Some(None) => { + debug!("upstream task observed end of pagestream protocol"); + None + } + None => { + debug!("upstream task observed protocol error"); + None + } + }; + if arg.is_none() { + stop = true; + } + Self::pagestream_do_batch(&mut batch, arg).await + }, + () = ready_for_next_batch.notified() => { + debug!("downstream ready, flushing batch early"); + // pass None so the batch gets flushed + Self::pagestream_do_batch(&mut batch, None).await + } + }; + let flush_msg = match maybe_flush_msg { + None => { + debug!("not batching"); + continue; + } + Some(flush_msg) => flush_msg, + }; + debug!("flushing batch"); + match batched_tx.send(flush_msg).await { + Ok(()) => {} + Err(_) => { + debug!("batched messages consumer is gone"); + stop = true; + } + } + } + } + } + .instrument(tracing::info_span!("batching")), + ); + + loop { + let batch = match batched_rx.try_recv() { + Ok(batch) => batch, + Err(tokio::sync::mpsc::error::TryRecvError::Empty) => { + ready_for_next_batch.notify_one(); + match batched_rx.recv().await { + Some(batch) => batch, + None => { + debug!( + "batched_rx observed as disconnected while waiting for next batch" + ); break; } } } - anyhow::Ok((pgb_reader, timeline_handles)) - } - }); - - let ready_for_next_batch = Arc::new(tokio::sync::Notify::new()); - let (batched_tx, mut batched_rx) = tokio::sync::mpsc::channel(1); - tokio::spawn({ - let ready_for_next_batch = Arc::clone(&ready_for_next_batch); - async move { - let mut batch: Option> = None; - let mut stop = false; - while !stop { - let maybe_flush_msg = tokio::select! { - req = requests_rx.recv() => { - let arg = match req { - Some(Some(req)) => Some(req), - Some(None) => { - debug!("upstream task observed end of pagestream protocol"); - None - } - None => { - debug!("upstream task observed protocol error"); - None - } - }; - if arg.is_none() { - stop = true; - } - Self::pagestream_do_batch(&mut batch, arg).await - }, - () = ready_for_next_batch.notified() => { - debug!("downstream ready, flushing batch early"); - // pass None so the batch gets flushed - Self::pagestream_do_batch(&mut batch, None).await - } - }; - let flush_msg = match maybe_flush_msg { - None => { - debug!("not batching"); - continue; - } - Some(flush_msg) => flush_msg, - }; - debug!("flushing batch"); - match batched_tx.send(flush_msg).await { - Ok(()) => {} - Err(_) => { - debug!("batched messages consumer is gone"); - stop = true; - } - } + Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => { + debug!("batched_rx observed as disconnected as part of try_recv()"); + break; } - } - }); - - while let Some(batch) = batched_rx.recv().await { - ready_for_next_batch.notify_one(); + }; self.pagesteam_handle_batched_message(tenant_id, timeline_id, pgb, *batch, &ctx) .await?; }