diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 196e2c0e4b..01b137f858 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1003,13 +1003,65 @@ impl PageServerHandler { .split() .context("implementation error: split pgb into reader and writer")?; - let mut timeline_handles = self + let timeline_handles = self .timeline_handles .take() .expect("implementation error: timeline_handles should not be locked"); - let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1); let request_span = info_span!("request", shard_id = tracing::field::Empty); + let (pgb_reader, timeline_handles) = match self.pipelining_config { + Some(PageServicePipeliningConfig { max_batch_size, .. }) => { + self.handle_pagerequests_pipelined( + pgb, + pgb_reader, + tenant_id, + timeline_id, + timeline_handles, + request_span, + max_batch_size, + &ctx, + ) + .await + } + None => self.handle_pagerequests_serial(pgb_reader).await, + }?; + + debug!("pagestream subprotocol shut down cleanly"); + + pgb.unsplit(pgb_reader) + .context("implementation error: unsplit pgb")?; + + let replaced = self.timeline_handles.replace(timeline_handles); + assert!(replaced.is_none()); + + Ok(()) + } + + async fn handle_pagerequests_serial( + &mut self, + pgb_reader: PostgresBackendReader, + ) -> Result<(PostgresBackendReader, TimelineHandles), QueryError> + where + IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, + { + todo!() + } + + async fn handle_pagerequests_pipelined( + &mut self, + pgb_writer: &mut PostgresBackend, + pgb_reader: PostgresBackendReader, + tenant_id: TenantId, + timeline_id: TimelineId, + mut timeline_handles: TimelineHandles, + request_span: Span, + max_batch_size: NonZeroUsize, + ctx: &RequestContext, + ) -> Result<(PostgresBackendReader, TimelineHandles), QueryError> + where + IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, + { + let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1); let read_messages = { let cancel = self.cancel.child_token(); let ctx = ctx.attached_child(); @@ -1067,11 +1119,6 @@ impl PageServerHandler { let notify_batcher = Arc::new(tokio::sync::Notify::new()); let batcher = { let notify_batcher = notify_batcher.clone(); - let max_batch_size = self - .pipelining_config - .as_ref() - .map(|PageServicePipeliningConfig { max_batch_size, .. }| *max_batch_size) - .unwrap_or(NonZeroUsize::new(1).unwrap()); async move { scopeguard::defer! { debug!("exiting"); @@ -1147,7 +1194,7 @@ impl PageServerHandler { }; notify_batcher.notify_one(); debug!("processing batch"); - self.pagesteam_handle_batched_message(pgb, *batch, &ctx) + self.pagesteam_handle_batched_message(pgb_writer, *batch, &ctx) .await?; } Ok(()) @@ -1156,25 +1203,13 @@ impl PageServerHandler { let (read_message_task_res, _, executor_res): (_, (), _) = tokio::join!(read_messages, batcher, executor); - let (pgb_reader, timeline_handles) = match (read_message_task_res, executor_res) { - (_, Err(e)) => { + match (read_message_task_res, executor_res) { + (Err(e), _) | (_, Err(e)) => { + let e: QueryError = e; return Err(e); } - (Err(e), _) => { - return Err(e); - } - (Ok((pgb_reader, timeline_handles)), Ok(())) => (pgb_reader, timeline_handles), - }; - - debug!("pagestream subprotocol shut down cleanly"); - - pgb.unsplit(pgb_reader) - .context("implementation error: unsplit pgb")?; - - let replaced = self.timeline_handles.replace(timeline_handles); - assert!(replaced.is_none()); - - Ok(()) + (Ok((pgb_reader, timeline_handles)), Ok(())) => Ok((pgb_reader, timeline_handles)), + } } /// Helper function to handle the LSN from client request.