fix ready_for_next_batch order

This commit is contained in:
Christian Schwarz
2024-11-21 19:11:57 +01:00
parent aa1032aeff
commit 345f8b6c3b

View File

@@ -997,86 +997,110 @@ impl PageServerHandler {
.expect("implementation error: timeline_handles should not be locked");
let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1);
let read_message_task = tokio::spawn({
let cancel = self.cancel.child_token();
let ctx = ctx.attached_child();
async move {
let mut pgb_reader = pgb_reader;
loop {
let msg = Self::pagestream_read_message(
&mut pgb_reader,
tenant_id,
timeline_id,
&mut timeline_handles,
&cancel,
&ctx,
)
.await?;
match requests_tx.send(msg).await {
Ok(()) => {}
Err(tokio::sync::mpsc::error::SendError(_)) => {
debug!("request processing pipeline downstream dead");
let read_message_task = tokio::spawn(
{
let cancel = self.cancel.child_token();
let ctx = ctx.attached_child();
async move {
let mut pgb_reader = pgb_reader;
loop {
let msg = Self::pagestream_read_message(
&mut pgb_reader,
tenant_id,
timeline_id,
&mut timeline_handles,
&cancel,
&ctx,
)
.await?;
match requests_tx.send(msg).await {
Ok(()) => {}
Err(tokio::sync::mpsc::error::SendError(_)) => {
debug!("request processing pipeline downstream dead");
break;
}
}
}
anyhow::Ok((pgb_reader, timeline_handles))
}
}
.instrument(tracing::info_span!("reading")),
);
let ready_for_next_batch = Arc::new(tokio::sync::Notify::new());
let (batched_tx, mut batched_rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(
{
let ready_for_next_batch = Arc::clone(&ready_for_next_batch);
async move {
let mut batch: Option<Box<BatchedFeMessage>> = None;
let mut stop = false;
while !stop {
let maybe_flush_msg = tokio::select! {
req = requests_rx.recv() => {
let arg = match req {
Some(Some(req)) => Some(req),
Some(None) => {
debug!("upstream task observed end of pagestream protocol");
None
}
None => {
debug!("upstream task observed protocol error");
None
}
};
if arg.is_none() {
stop = true;
}
Self::pagestream_do_batch(&mut batch, arg).await
},
() = ready_for_next_batch.notified() => {
debug!("downstream ready, flushing batch early");
// pass None so the batch gets flushed
Self::pagestream_do_batch(&mut batch, None).await
}
};
let flush_msg = match maybe_flush_msg {
None => {
debug!("not batching");
continue;
}
Some(flush_msg) => flush_msg,
};
debug!("flushing batch");
match batched_tx.send(flush_msg).await {
Ok(()) => {}
Err(_) => {
debug!("batched messages consumer is gone");
stop = true;
}
}
}
}
}
.instrument(tracing::info_span!("batching")),
);
loop {
let batch = match batched_rx.try_recv() {
Ok(batch) => batch,
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
ready_for_next_batch.notify_one();
match batched_rx.recv().await {
Some(batch) => batch,
None => {
debug!(
"batched_rx observed as disconnected while waiting for next batch"
);
break;
}
}
}
anyhow::Ok((pgb_reader, timeline_handles))
}
});
let ready_for_next_batch = Arc::new(tokio::sync::Notify::new());
let (batched_tx, mut batched_rx) = tokio::sync::mpsc::channel(1);
tokio::spawn({
let ready_for_next_batch = Arc::clone(&ready_for_next_batch);
async move {
let mut batch: Option<Box<BatchedFeMessage>> = None;
let mut stop = false;
while !stop {
let maybe_flush_msg = tokio::select! {
req = requests_rx.recv() => {
let arg = match req {
Some(Some(req)) => Some(req),
Some(None) => {
debug!("upstream task observed end of pagestream protocol");
None
}
None => {
debug!("upstream task observed protocol error");
None
}
};
if arg.is_none() {
stop = true;
}
Self::pagestream_do_batch(&mut batch, arg).await
},
() = ready_for_next_batch.notified() => {
debug!("downstream ready, flushing batch early");
// pass None so the batch gets flushed
Self::pagestream_do_batch(&mut batch, None).await
}
};
let flush_msg = match maybe_flush_msg {
None => {
debug!("not batching");
continue;
}
Some(flush_msg) => flush_msg,
};
debug!("flushing batch");
match batched_tx.send(flush_msg).await {
Ok(()) => {}
Err(_) => {
debug!("batched messages consumer is gone");
stop = true;
}
}
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
debug!("batched_rx observed as disconnected as part of try_recv()");
break;
}
}
});
while let Some(batch) = batched_rx.recv().await {
ready_for_next_batch.notify_one();
};
self.pagesteam_handle_batched_message(tenant_id, timeline_id, pgb, *batch, &ctx)
.await?;
}