diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 3e01887355..928472fc4c 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -3,7 +3,6 @@ use anyhow::{bail, Context}; use async_compression::tokio::write::GzipEncoder; -use async_timer::Timer; use bytes::Buf; use futures::FutureExt; use itertools::Itertools; @@ -17,13 +16,14 @@ use pageserver_api::models::{ PagestreamProtocolVersion, }; use pageserver_api::shard::TenantShardId; -use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError}; +use postgres_backend::{ + is_expected_io_error, AuthType, PostgresBackend, PostgresBackendReader, QueryError, +}; use pq_proto::framed::ConnectionError; use pq_proto::FeStartupPacket; use pq_proto::{BeMessage, FeMessage, RowDescriptor}; use std::borrow::Cow; use std::io; -use std::pin::Pin; use std::str; use std::str::FromStr; use std::sync::Arc; @@ -258,7 +258,7 @@ async fn page_service_conn_main( // a while: we will tear down this PageServerHandler and instantiate a new one if/when // they reconnect. socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms))); - let socket = std::pin::pin!(socket); + let socket = Box::pin(socket); fail::fail_point!("ps::connection-start::pre-login"); @@ -285,7 +285,7 @@ async fn page_service_conn_main( info!("Postgres client disconnected ({io_error})"); Ok(()) } else { - let tenant_id = conn_handler.timeline_handles.tenant_id(); + let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id(); Err(io_error).context(format!( "Postgres connection error for tenant_id={:?} client at peer_addr={}", tenant_id, peer_addr @@ -293,7 +293,7 @@ async fn page_service_conn_main( } } other => { - let tenant_id = conn_handler.timeline_handles.tenant_id(); + let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id(); other.context(format!( "Postgres query error for tenant_id={:?} client peer_addr={}", tenant_id, peer_addr @@ -314,17 +314,8 @@ struct PageServerHandler { cancel: CancellationToken, - timeline_handles: TimelineHandles, - - /// See [`PageServerConf::server_side_batch_timeout`] - server_side_batch_timeout: Option, - - server_side_batch_timer: Pin>, -} - -struct Carry { - msg: BatchedFeMessage, - started_at: Instant, + /// None only while pagestream protocol is being processed. + timeline_handles: Option, } struct TimelineHandles { @@ -567,13 +558,6 @@ enum BatchedFeMessage { }, } -enum BatchOrEof { - /// In the common case, this has one entry. - /// At most, it has two entries: the first is the leftover batch, the second is an error. - Batch(smallvec::SmallVec<[BatchedFeMessage; 1]>), - Eof, -} - impl PageServerHandler { pub fn new( tenant_manager: Arc, @@ -586,10 +570,8 @@ impl PageServerHandler { auth, claims: None, connection_ctx, - timeline_handles: TimelineHandles::new(tenant_manager), + timeline_handles: Some(TimelineHandles::new(tenant_manager)), cancel, - server_side_batch_timeout, - server_side_batch_timer: Box::pin(async_timer::new_timer(Duration::from_secs(999))), // reset each iteration } } @@ -617,255 +599,359 @@ impl PageServerHandler { ) } - #[instrument(skip_all, level = tracing::Level::TRACE)] - async fn read_batch_from_connection( - &mut self, - pgb: &mut PostgresBackend, - tenant_id: &TenantId, - timeline_id: &TimelineId, - maybe_carry: &mut Option, + async fn pagestream_read_message( + pgb: &mut PostgresBackendReader, + ref tenant_id: TenantId, + ref timeline_id: TimelineId, + timeline_handles: &mut TimelineHandles, + cancel: &CancellationToken, ctx: &RequestContext, - ) -> Result + ) -> Result>, QueryError> + where + IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, + { + let msg = tokio::select! { + biased; + _ = cancel.cancelled() => { + return Err(QueryError::Shutdown) + } + msg = pgb.read_message() => { msg } + }; + + // Rest of this loop body is trying to batch `msg` into `batch`. + // If we can add msg to batch we continue into the next loop iteration. + // If we can't add msg to batch batch, we carry `msg` over to the next call. + + let copy_data_bytes = match msg? { + Some(FeMessage::CopyData(bytes)) => bytes, + Some(FeMessage::Terminate) => { + return Ok(None); + } + Some(m) => { + return Err(QueryError::Other(anyhow::anyhow!( + "unexpected message: {m:?} during COPY" + ))); + } + None => { + return Ok(None); + } // client disconnected + }; + trace!("query: {copy_data_bytes:?}"); + + fail::fail_point!("ps::handle-pagerequest-message"); + + // parse request + let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?; + + let batched_msg = match neon_fe_msg { + PagestreamFeMessage::Exists(req) => BatchedFeMessage::Exists { + span: tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn), + req, + }, + PagestreamFeMessage::Nblocks(req) => BatchedFeMessage::Nblocks { + span: tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn), + req, + }, + PagestreamFeMessage::DbSize(req) => BatchedFeMessage::DbSize { + span: tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn), + req, + }, + PagestreamFeMessage::GetSlruSegment(req) => BatchedFeMessage::GetSlruSegment { + span: tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn), + req, + }, + PagestreamFeMessage::GetPage(PagestreamGetPageRequest { + request_lsn, + not_modified_since, + rel, + blkno, + }) => { + // shard_id is filled in by the handler + let span = tracing::info_span!( + "handle_get_page_at_lsn_request_batched", + %tenant_id, %timeline_id, shard_id = tracing::field::Empty, req_lsn = %request_lsn, + batch_size = tracing::field::Empty, batch_id = tracing::field::Empty + ); + + macro_rules! respond_error { + ($error:expr) => {{ + let error = BatchedFeMessage::RespondError { + span, + error: $error, + }; + Ok(Some(Box::new(error))) + }}; + } + + let key = rel_block_to_key(rel, blkno); + let shard = match timeline_handles + .get(*tenant_id, *timeline_id, ShardSelector::Page(key)) + .instrument(span.clone()) + .await + { + Ok(tl) => tl, + Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => { + // We already know this tenant exists in general, because we resolved it at + // start of connection. Getting a NotFound here indicates that the shard containing + // the requested page is not present on this node: the client's knowledge of shard->pageserver + // mapping is out of date. + // + // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via + // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration + // and talk to a different pageserver. + return respond_error!(PageStreamError::Reconnect( + "getpage@lsn request routed to wrong shard".into() + )); + } + Err(e) => { + return respond_error!(e.into()); + } + }; + let effective_request_lsn = match Self::wait_or_get_last_lsn( + &shard, + request_lsn, + not_modified_since, + &shard.get_latest_gc_cutoff_lsn(), + ctx, + ) + // TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait + .await + { + Ok(lsn) => lsn, + Err(e) => { + return respond_error!(e); + } + }; + BatchedFeMessage::GetPage { + span, + shard, + effective_request_lsn, + pages: smallvec::smallvec![(rel, blkno)], + } + } + }; + Ok(Some(Box::new(batched_msg))) + } + + #[instrument(skip_all, level = tracing::Level::TRACE)] + async fn pagestream_do_batch( + maybe_carry: &mut Option>, + arg: Option>, + cancel: &CancellationToken, + ctx: &RequestContext, + ) -> Option> { + debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); + + match (maybe_carry.as_deref_mut(), arg.map(|x| *x)) { + (None, Some(arg)) => { + *maybe_carry = Some(Box::new(arg)); // TODO: avoid this boxing + None + } + ( + Some(BatchedFeMessage::GetPage { + span: _, + shard: accum_shard, + pages: ref mut accum_pages, + effective_request_lsn: accum_lsn, + }), + // would be nice to have box pattern here + Some(BatchedFeMessage::GetPage { + span: _, + shard: this_shard, + pages: this_pages, + effective_request_lsn: this_lsn, + }), + ) if async { + assert_eq!(this_pages.len(), 1); + if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize { + trace!(%accum_lsn, %this_lsn, "stopping batching because of batch size"); + assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize); + return false; + } + if (accum_shard.tenant_shard_id, accum_shard.timeline_id) + != (this_shard.tenant_shard_id, this_shard.timeline_id) + { + trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch"); + // TODO: we _could_ batch & execute each shard seperately (and in parallel). + // But the current logic for keeping responses in order does not support that. + return false; + } + // the vectored get currently only supports a single LSN, so, bounce as soon + // as the effective request_lsn changes + if *accum_lsn != this_lsn { + trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed"); + return false; + } + true + } + .await => + { + // ok to batch + accum_pages.extend(this_pages); + None + } + (Some(_), Some(this_msg)) => { + // by default, don't continue batching + let this_msg = Box::new(this_msg); // TODO: avoid this box + let carry = maybe_carry.replace(this_msg).expect("this match arm checks it's Some()"); + Some(carry) + } + (Some(_), None) => { + Some(maybe_carry.take().expect("this match arm checks it's Some()")) + } + (None, None) => { + None // TODO: can we prevent this branch trhough the typesystem + } + } + } + + async fn pagesteam_handle_batched_message( + &mut self, + tenant_id: TenantId, + timeline_id: TimelineId, + pgb_writer: &mut PostgresBackend, + batch: BatchedFeMessage, + ctx: &RequestContext, + ) -> Result<(), QueryError> where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, { - debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); - - let mut batching_deadline_storage = None; // TODO: can this just be an unsync once_cell? - - loop { - // Create a future that will become ready when we need to stop batching. - use futures::future::Either; - let batching_deadline = match ( - &*maybe_carry as &Option, - &mut batching_deadline_storage, - ) { - (None, None) => Either::Left(futures::future::pending()), // there's no deadline before we have something batched - (None, Some(_)) => unreachable!(), - (Some(_), Some(fut)) => Either::Right(fut), // below arm already ran - (Some(carry), None) => { - match self.server_side_batch_timeout { - None => { - return Ok(BatchOrEof::Batch(smallvec::smallvec![ - maybe_carry - .take() - .expect("we already checked it's Some") - .msg - ])) - } - Some(batch_timeout) => { - // Take into consideration the time the carry spent waiting. - let batch_timeout = - batch_timeout.saturating_sub(carry.started_at.elapsed()); - if batch_timeout.is_zero() { - // the timer doesn't support restarting with zero duration - return Ok(BatchOrEof::Batch(smallvec::smallvec![ - maybe_carry - .take() - .expect("we already checked it's Some") - .msg - ])); - } else { - self.server_side_batch_timer.restart(batch_timeout); - batching_deadline_storage = Some(&mut self.server_side_batch_timer); - Either::Right( - batching_deadline_storage.as_mut().expect("we just set it"), - ) - } - } - } - } - }; - let msg = tokio::select! { - biased; - _ = self.cancel.cancelled() => { - return Err(QueryError::Shutdown) - } - _ = batching_deadline => { - return Ok(BatchOrEof::Batch(smallvec::smallvec![maybe_carry.take().expect("per construction of batching_deadline").msg])); - } - msg = pgb.read_message() => { msg } - }; - - let msg_start = Instant::now(); - - // Rest of this loop body is trying to batch `msg` into `batch`. - // If we can add msg to batch we continue into the next loop iteration. - // If we can't add msg to batch batch, we carry `msg` over to the next call. - - let copy_data_bytes = match msg? { - Some(FeMessage::CopyData(bytes)) => bytes, - Some(FeMessage::Terminate) => { - return Ok(BatchOrEof::Eof); - } - Some(m) => { - return Err(QueryError::Other(anyhow::anyhow!( - "unexpected message: {m:?} during COPY" - ))); - } - None => { - return Ok(BatchOrEof::Eof); - } // client disconnected - }; - trace!("query: {copy_data_bytes:?}"); - - fail::fail_point!("ps::handle-pagerequest-message"); - - // parse request - let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?; - - let this_msg = match neon_fe_msg { - PagestreamFeMessage::Exists(req) => BatchedFeMessage::Exists { - span: tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn), - req, - }, - PagestreamFeMessage::Nblocks(req) => BatchedFeMessage::Nblocks { - span: tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn), - req, - }, - PagestreamFeMessage::DbSize(req) => BatchedFeMessage::DbSize { - span: tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn), - req, - }, - PagestreamFeMessage::GetSlruSegment(req) => BatchedFeMessage::GetSlruSegment { - span: tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn), - req, - }, - PagestreamFeMessage::GetPage(PagestreamGetPageRequest { - request_lsn, - not_modified_since, - rel, - blkno, - }) => { - // shard_id is filled in by the handler - let span = tracing::info_span!( - "handle_get_page_at_lsn_request_batched", - %tenant_id, %timeline_id, shard_id = tracing::field::Empty, req_lsn = %request_lsn, - batch_size = tracing::field::Empty, batch_id = tracing::field::Empty - ); - - macro_rules! current_batch_and_error { - ($error:expr) => {{ - let error = BatchedFeMessage::RespondError { - span, - error: $error, - }; - let batch_and_error = match maybe_carry.take() { - Some(carry) => smallvec::smallvec![carry.msg, error], - None => smallvec::smallvec![error], - }; - Ok(BatchOrEof::Batch(batch_and_error)) - }}; - } - - let key = rel_block_to_key(rel, blkno); - let shard = match self - .timeline_handles - .get(*tenant_id, *timeline_id, ShardSelector::Page(key)) - .instrument(span.clone()) - .await - { - Ok(tl) => tl, - Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => { - // We already know this tenant exists in general, because we resolved it at - // start of connection. Getting a NotFound here indicates that the shard containing - // the requested page is not present on this node: the client's knowledge of shard->pageserver - // mapping is out of date. - // - // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via - // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration - // and talk to a different pageserver. - return current_batch_and_error!(PageStreamError::Reconnect( - "getpage@lsn request routed to wrong shard".into() - )); - } - Err(e) => { - return current_batch_and_error!(e.into()); - } - }; - let effective_request_lsn = match Self::wait_or_get_last_lsn( - &shard, - request_lsn, - not_modified_since, - &shard.get_latest_gc_cutoff_lsn(), - ctx, - ) - // TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait - .await - { - Ok(lsn) => lsn, - Err(e) => { - return current_batch_and_error!(e); - } - }; - BatchedFeMessage::GetPage { + // invoke handler function + let (handler_results, span): (Vec>, _) = + match batch { + BatchedFeMessage::Exists { span, req } => { + fail::fail_point!("ps::handle-pagerequest-message::exists"); + ( + vec![ + self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx) + .instrument(span.clone()) + .await, + ], span, - shard, - effective_request_lsn, - pages: smallvec::smallvec![(rel, blkno)], - } + ) + } + BatchedFeMessage::Nblocks { span, req } => { + fail::fail_point!("ps::handle-pagerequest-message::nblocks"); + ( + vec![ + self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx) + .instrument(span.clone()) + .await, + ], + span, + ) + } + BatchedFeMessage::GetPage { + span, + shard, + effective_request_lsn, + pages, + } => { + fail::fail_point!("ps::handle-pagerequest-message::getpage"); + ( + { + let npages = pages.len(); + trace!(npages, "handling getpage request"); + let res = self + .handle_get_page_at_lsn_request_batched( + &shard, + effective_request_lsn, + pages, + &ctx, + ) + .instrument(span.clone()) + .await; + assert_eq!(res.len(), npages); + res + }, + span, + ) + } + BatchedFeMessage::DbSize { span, req } => { + fail::fail_point!("ps::handle-pagerequest-message::dbsize"); + ( + vec![ + self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx) + .instrument(span.clone()) + .await, + ], + span, + ) + } + BatchedFeMessage::GetSlruSegment { span, req } => { + fail::fail_point!("ps::handle-pagerequest-message::slrusegment"); + ( + vec![ + self.handle_get_slru_segment_request( + tenant_id, + timeline_id, + &req, + &ctx, + ) + .instrument(span.clone()) + .await, + ], + span, + ) + } + BatchedFeMessage::RespondError { span, error } => { + // We've already decided to respond with an error, so we don't need to + // call the handler. + (vec![Err(error)], span) } }; - // - // batch - // - match (maybe_carry.as_mut(), this_msg) { - (None, this_msg) => { - *maybe_carry = Some(Carry { msg: this_msg, started_at: msg_start }); - } - ( - Some(Carry { msg: BatchedFeMessage::GetPage { - span: _, - shard: accum_shard, - pages: ref mut accum_pages, - effective_request_lsn: accum_lsn, - }, started_at: _}), - BatchedFeMessage::GetPage { - span: _, - shard: this_shard, - pages: this_pages, - effective_request_lsn: this_lsn, - }, - ) if async { - assert_eq!(this_pages.len(), 1); - if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize { - trace!(%accum_lsn, %this_lsn, "stopping batching because of batch size"); - assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize); - return false; + // Map handler result to protocol behavior. + // Some handler errors cause exit from pagestream protocol. + // Other handler errors are sent back as an error message and we stay in pagestream protocol. + for handler_result in handler_results { + let response_msg = match handler_result { + Err(e) => match &e { + PageStreamError::Shutdown => { + // If we fail to fulfil a request during shutdown, which may be _because_ of + // shutdown, then do not send the error to the client. Instead just drop the + // connection. + span.in_scope(|| info!("dropping connection due to shutdown")); + return Err(QueryError::Shutdown); } - if (accum_shard.tenant_shard_id, accum_shard.timeline_id) - != (this_shard.tenant_shard_id, this_shard.timeline_id) - { - trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch"); - // TODO: we _could_ batch & execute each shard seperately (and in parallel). - // But the current logic for keeping responses in order does not support that. - return false; + PageStreamError::Reconnect(reason) => { + span.in_scope(|| info!("handler requested reconnect: {reason}")); + return Err(QueryError::Reconnect); } - // the vectored get currently only supports a single LSN, so, bounce as soon - // as the effective request_lsn changes - if *accum_lsn != this_lsn { - trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed"); - return false; - } - true - } - .await => - { - // ok to batch - accum_pages.extend(this_pages); - } - (Some(carry), this_msg) => { - // by default, don't continue batching - let carry = std::mem::replace(carry, - Carry { - msg: this_msg, - started_at: msg_start, + PageStreamError::Read(_) + | PageStreamError::LsnTimeout(_) + | PageStreamError::NotFound(_) + | PageStreamError::BadRequest(_) => { + // print the all details to the log with {:#}, but for the client the + // error message is enough. Do not log if shutting down, as the anyhow::Error + // here includes cancellation which is not an error. + let full = utils::error::report_compact_sources(&e); + span.in_scope(|| { + error!("error reading relation or page version: {full:#}") }); - return Ok(BatchOrEof::Batch(smallvec::smallvec![carry.msg])); - } + PagestreamBeMessage::Error(PagestreamErrorResponse { + message: e.to_string(), + }) + } + }, + Ok(response_msg) => response_msg, + }; + + // marshal & transmit response message + pgb_writer.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?; + } + tokio::select! { + biased; + _ = self.cancel.cancelled() => { + // We were requested to shut down. + info!("shutdown request received in page handler"); + return Err(QueryError::Shutdown) + } + res = pgb_writer.flush() => { + res?; } } + Ok(()) } /// Pagestream sub-protocol handler. @@ -887,7 +973,7 @@ impl PageServerHandler { ctx: RequestContext, ) -> Result<(), QueryError> where - IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, + IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, { debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); @@ -903,164 +989,122 @@ impl PageServerHandler { } } - let mut carry: Option = None; + let pgb_reader = pgb + .split() + .context("implementation error: split pgb into reader and writer")?; - loop { - let maybe_batched = self - .read_batch_from_connection(pgb, &tenant_id, &timeline_id, &mut carry, &ctx) - .await?; - let batched = match maybe_batched { - BatchOrEof::Batch(b) => b, - BatchOrEof::Eof => { - break; + let mut timeline_handles = self + .timeline_handles + .take() + .expect("implementation error: timeline_handles should not be locked"); + + let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1); + let read_message_task = tokio::spawn({ + let cancel = self.cancel.child_token(); + let ctx = ctx.attached_child(); + async move { + let mut pgb_reader = pgb_reader; + loop { + let msg = Self::pagestream_read_message( + &mut pgb_reader, + tenant_id, + timeline_id, + &mut timeline_handles, + &cancel, + &ctx, + ) + .await?; + match requests_tx.send(msg).await { + Ok(()) => {} + Err(tokio::sync::mpsc::error::SendError(_)) => { + debug!("request processing pipeline downstream dead"); + break; + } + } } - }; + anyhow::Ok((pgb_reader, timeline_handles)) + } + }); - for batch in batched { - // invoke handler function - let (handler_results, span): ( - Vec>, - _, - ) = match batch { - BatchedFeMessage::Exists { span, req } => { - fail::fail_point!("ps::handle-pagerequest-message::exists"); - ( - vec![ - self.handle_get_rel_exists_request( - tenant_id, - timeline_id, - &req, - &ctx, - ) - .instrument(span.clone()) - .await, - ], - span, - ) - } - BatchedFeMessage::Nblocks { span, req } => { - fail::fail_point!("ps::handle-pagerequest-message::nblocks"); - ( - vec![ - self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx) - .instrument(span.clone()) - .await, - ], - span, - ) - } - BatchedFeMessage::GetPage { - span, - shard, - effective_request_lsn, - pages, - } => { - fail::fail_point!("ps::handle-pagerequest-message::getpage"); - ( - { - let npages = pages.len(); - trace!(npages, "handling getpage request"); - let res = self - .handle_get_page_at_lsn_request_batched( - &shard, - effective_request_lsn, - pages, - &ctx, - ) - .instrument(span.clone()) - .await; - assert_eq!(res.len(), npages); - res - }, - span, - ) - } - BatchedFeMessage::DbSize { span, req } => { - fail::fail_point!("ps::handle-pagerequest-message::dbsize"); - ( - vec![ - self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx) - .instrument(span.clone()) - .await, - ], - span, - ) - } - BatchedFeMessage::GetSlruSegment { span, req } => { - fail::fail_point!("ps::handle-pagerequest-message::slrusegment"); - ( - vec![ - self.handle_get_slru_segment_request( - tenant_id, - timeline_id, - &req, - &ctx, - ) - .instrument(span.clone()) - .await, - ], - span, - ) - } - BatchedFeMessage::RespondError { span, error } => { - // We've already decided to respond with an error, so we don't need to - // call the handler. - (vec![Err(error)], span) - } - }; - - // Map handler result to protocol behavior. - // Some handler errors cause exit from pagestream protocol. - // Other handler errors are sent back as an error message and we stay in pagestream protocol. - for handler_result in handler_results { - let response_msg = match handler_result { - Err(e) => match &e { - PageStreamError::Shutdown => { - // If we fail to fulfil a request during shutdown, which may be _because_ of - // shutdown, then do not send the error to the client. Instead just drop the - // connection. - span.in_scope(|| info!("dropping connection due to shutdown")); - return Err(QueryError::Shutdown); - } - PageStreamError::Reconnect(reason) => { - span.in_scope(|| info!("handler requested reconnect: {reason}")); - return Err(QueryError::Reconnect); - } - PageStreamError::Read(_) - | PageStreamError::LsnTimeout(_) - | PageStreamError::NotFound(_) - | PageStreamError::BadRequest(_) => { - // print the all details to the log with {:#}, but for the client the - // error message is enough. Do not log if shutting down, as the anyhow::Error - // here includes cancellation which is not an error. - let full = utils::error::report_compact_sources(&e); - span.in_scope(|| { - error!("error reading relation or page version: {full:#}") - }); - PagestreamBeMessage::Error(PagestreamErrorResponse { - message: e.to_string(), - }) + let ready_for_next_batch = Arc::new(tokio::sync::Notify::new()); + let (batched_tx, mut batched_rx) = tokio::sync::mpsc::channel(1); + tokio::spawn({ + let cancel = self.cancel.child_token(); + let ready_for_next_batch = Arc::clone(&ready_for_next_batch); + let ctx = ctx.attached_child(); + async move { + let mut batch: Option> = None; + let mut stop = false; + while !stop { + let maybe_flush_msg = tokio::select! { + req = requests_rx.recv() => { + let arg = match req { + Some(Some(req)) => Some(req), + Some(None) => { + debug!("upstream task observed end of pagestream protocol"); + None + } + None => { + debug!("upstream task observed protocol error"); + None + } + }; + if arg.is_none() { + stop = true; } + Self::pagestream_do_batch(&mut batch, arg, &cancel, &ctx).await }, - Ok(response_msg) => response_msg, + () = ready_for_next_batch.notified() => { + debug!("downstream ready, flushing batch early"); + // pass None so the batch gets flushed + Self::pagestream_do_batch(&mut batch, None, &cancel, &ctx).await + } }; - - // marshal & transmit response message - pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?; - } - tokio::select! { - biased; - _ = self.cancel.cancelled() => { - // We were requested to shut down. - info!("shutdown request received in page handler"); - return Err(QueryError::Shutdown) - } - res = pgb.flush() => { - res?; + let flush_msg = match maybe_flush_msg { + None => { + debug!("not batching"); + continue; + } + Some(flush_msg) => flush_msg, + }; + debug!("flushing batch"); + match batched_tx.send(flush_msg).await { + Ok(()) => {} + Err(_) => { + debug!("batched messages consumer is gone"); + stop = true; + } } } } + }); + + while let Some(batch) = batched_rx.recv().await { + ready_for_next_batch.notify_one(); + self.pagesteam_handle_batched_message( + tenant_id, + timeline_id, + pgb, + *batch, + &ctx, + ) + .await?; } + + let (pgb_reader, timeline_handles) = read_message_task + .await + .context("read message task panicked")? + // if the client made a protocol error, this is where we bubble up the QueryError + ?; + + debug!("pagestream subprotocol shut down cleanly"); + + pgb.unsplit(pgb_reader) + .context("implementation error: unsplit pgb")?; + + let replaced = self.timeline_handles.replace(timeline_handles); + assert!(replaced.is_none()); + Ok(()) } @@ -1164,6 +1208,8 @@ impl PageServerHandler { { let timeline = self .timeline_handles + .as_mut() + .unwrap() .get( tenant_shard_id.tenant_id, timeline_id, @@ -1205,6 +1251,8 @@ impl PageServerHandler { ) -> Result { let timeline = self .timeline_handles + .as_mut() + .unwrap() .get(tenant_id, timeline_id, ShardSelector::Zero) .await?; let _timer = timeline @@ -1240,6 +1288,8 @@ impl PageServerHandler { ) -> Result { let timeline = self .timeline_handles + .as_mut() + .unwrap() .get(tenant_id, timeline_id, ShardSelector::Zero) .await?; @@ -1276,6 +1326,8 @@ impl PageServerHandler { ) -> Result { let timeline = self .timeline_handles + .as_mut() + .unwrap() .get(tenant_id, timeline_id, ShardSelector::Zero) .await?; @@ -1340,6 +1392,8 @@ impl PageServerHandler { ) -> Result { let timeline = self .timeline_handles + .as_mut() + .unwrap() .get(tenant_id, timeline_id, ShardSelector::Zero) .await?; @@ -1407,6 +1461,8 @@ impl PageServerHandler { let timeline = self .timeline_handles + .as_mut() + .unwrap() .get(tenant_id, timeline_id, ShardSelector::Zero) .await?; @@ -1749,7 +1805,7 @@ impl PageServiceCmd { impl postgres_backend::Handler for PageServerHandler where - IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, + IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, { fn check_auth_jwt( &mut self,