WIP refactor to allow truly serial mode

This commit is contained in:
Christian Schwarz
2024-11-22 09:47:49 +01:00
parent c1040bc25d
commit 0fa8ae3c0a

View File

@@ -1003,13 +1003,65 @@ impl PageServerHandler {
.split()
.context("implementation error: split pgb into reader and writer")?;
let mut timeline_handles = self
let timeline_handles = self
.timeline_handles
.take()
.expect("implementation error: timeline_handles should not be locked");
let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1);
let request_span = info_span!("request", shard_id = tracing::field::Empty);
let (pgb_reader, timeline_handles) = match self.pipelining_config {
Some(PageServicePipeliningConfig { max_batch_size, .. }) => {
self.handle_pagerequests_pipelined(
pgb,
pgb_reader,
tenant_id,
timeline_id,
timeline_handles,
request_span,
max_batch_size,
&ctx,
)
.await
}
None => self.handle_pagerequests_serial(pgb_reader).await,
}?;
debug!("pagestream subprotocol shut down cleanly");
pgb.unsplit(pgb_reader)
.context("implementation error: unsplit pgb")?;
let replaced = self.timeline_handles.replace(timeline_handles);
assert!(replaced.is_none());
Ok(())
}
async fn handle_pagerequests_serial<IO>(
&mut self,
pgb_reader: PostgresBackendReader<IO>,
) -> Result<(PostgresBackendReader<IO>, TimelineHandles), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
{
todo!()
}
async fn handle_pagerequests_pipelined<IO>(
&mut self,
pgb_writer: &mut PostgresBackend<IO>,
pgb_reader: PostgresBackendReader<IO>,
tenant_id: TenantId,
timeline_id: TimelineId,
mut timeline_handles: TimelineHandles,
request_span: Span,
max_batch_size: NonZeroUsize,
ctx: &RequestContext,
) -> Result<(PostgresBackendReader<IO>, TimelineHandles), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
{
let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1);
let read_messages = {
let cancel = self.cancel.child_token();
let ctx = ctx.attached_child();
@@ -1067,11 +1119,6 @@ impl PageServerHandler {
let notify_batcher = Arc::new(tokio::sync::Notify::new());
let batcher = {
let notify_batcher = notify_batcher.clone();
let max_batch_size = self
.pipelining_config
.as_ref()
.map(|PageServicePipeliningConfig { max_batch_size, .. }| *max_batch_size)
.unwrap_or(NonZeroUsize::new(1).unwrap());
async move {
scopeguard::defer! {
debug!("exiting");
@@ -1147,7 +1194,7 @@ impl PageServerHandler {
};
notify_batcher.notify_one();
debug!("processing batch");
self.pagesteam_handle_batched_message(pgb, *batch, &ctx)
self.pagesteam_handle_batched_message(pgb_writer, *batch, &ctx)
.await?;
}
Ok(())
@@ -1156,25 +1203,13 @@ impl PageServerHandler {
let (read_message_task_res, _, executor_res): (_, (), _) =
tokio::join!(read_messages, batcher, executor);
let (pgb_reader, timeline_handles) = match (read_message_task_res, executor_res) {
(_, Err(e)) => {
match (read_message_task_res, executor_res) {
(Err(e), _) | (_, Err(e)) => {
let e: QueryError = e;
return Err(e);
}
(Err(e), _) => {
return Err(e);
}
(Ok((pgb_reader, timeline_handles)), Ok(())) => (pgb_reader, timeline_handles),
};
debug!("pagestream subprotocol shut down cleanly");
pgb.unsplit(pgb_reader)
.context("implementation error: unsplit pgb")?;
let replaced = self.timeline_handles.replace(timeline_handles);
assert!(replaced.is_none());
Ok(())
(Ok((pgb_reader, timeline_handles)), Ok(())) => Ok((pgb_reader, timeline_handles)),
}
}
/// Helper function to handle the LSN from client request.