impmlement the serial mode

This commit is contained in:
Christian Schwarz
2024-11-22 09:53:08 +01:00
parent 0fa8ae3c0a
commit 093674b2fb

View File

@@ -1023,7 +1023,18 @@ impl PageServerHandler {
)
.await
}
None => self.handle_pagerequests_serial(pgb_reader).await,
None => {
self.handle_pagerequests_serial(
pgb,
pgb_reader,
tenant_id,
timeline_id,
timeline_handles,
request_span,
&ctx,
)
.await
}
}?;
debug!("pagestream subprotocol shut down cleanly");
@@ -1039,12 +1050,38 @@ impl PageServerHandler {
async fn handle_pagerequests_serial<IO>(
&mut self,
pgb_reader: PostgresBackendReader<IO>,
pgb_writer: &mut PostgresBackend<IO>,
mut pgb_reader: PostgresBackendReader<IO>,
tenant_id: TenantId,
timeline_id: TimelineId,
mut timeline_handles: TimelineHandles,
request_span: Span,
ctx: &RequestContext,
) -> Result<(PostgresBackendReader<IO>, TimelineHandles), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
{
todo!()
loop {
let msg = Self::pagestream_read_message(
&mut pgb_reader,
tenant_id,
timeline_id,
&mut timeline_handles,
&self.cancel,
&ctx,
request_span.clone(),
)
.await?;
let msg = match msg {
Some(msg) => msg,
None => {
debug!("pagestream subprotocol end observed");
return Ok((pgb_reader, timeline_handles));
}
};
self.pagesteam_handle_batched_message(pgb_writer, *msg, &ctx)
.await?;
}
}
async fn handle_pagerequests_pipelined<IO>(