diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 01b137f858..efbb8342ba 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1023,7 +1023,18 @@ impl PageServerHandler { ) .await } - None => self.handle_pagerequests_serial(pgb_reader).await, + None => { + self.handle_pagerequests_serial( + pgb, + pgb_reader, + tenant_id, + timeline_id, + timeline_handles, + request_span, + &ctx, + ) + .await + } }?; debug!("pagestream subprotocol shut down cleanly"); @@ -1039,12 +1050,38 @@ impl PageServerHandler { async fn handle_pagerequests_serial( &mut self, - pgb_reader: PostgresBackendReader, + pgb_writer: &mut PostgresBackend, + mut pgb_reader: PostgresBackendReader, + tenant_id: TenantId, + timeline_id: TimelineId, + mut timeline_handles: TimelineHandles, + request_span: Span, + ctx: &RequestContext, ) -> Result<(PostgresBackendReader, TimelineHandles), QueryError> where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, { - todo!() + loop { + let msg = Self::pagestream_read_message( + &mut pgb_reader, + tenant_id, + timeline_id, + &mut timeline_handles, + &self.cancel, + &ctx, + request_span.clone(), + ) + .await?; + let msg = match msg { + Some(msg) => msg, + None => { + debug!("pagestream subprotocol end observed"); + return Ok((pgb_reader, timeline_handles)); + } + }; + self.pagesteam_handle_batched_message(pgb_writer, *msg, &ctx) + .await?; + } } async fn handle_pagerequests_pipelined(