diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 3eccfe22a8..2ae8d91b83 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -3175,7 +3175,7 @@ pub(crate) static CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM: Lazy>(), ) .unwrap() }); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index b161e4e007..b5c6353334 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -5,7 +5,7 @@ use anyhow::Context; use async_compression::tokio::write::GzipEncoder; use bytes::Buf; use futures::FutureExt; -use once_cell::sync::OnceCell; +use once_cell::sync::{Lazy, OnceCell}; use pageserver_api::models::TenantState; use pageserver_api::models::{ PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, @@ -577,141 +577,186 @@ impl PageServerHandler { } } + let mut requests = Vec::new(); let mut num_consecutive_getpage_requests = 0; - loop { + 'outer: loop { // read request bytes (it's exactly 1 PagestreamFeMessage per CopyData) - let msg = loop { + let mut debounce: Option = None; + requests.clear(); + loop { + static BOUNCE_TIMEOUT: Lazy = Lazy::new(|| { + utils::env::var::("NEON_PAGESERVER_DEBOUNCE") + .unwrap() + .into() + }); + let sleep_fut = if let Some(started_at) = debounce { + futures::future::Either::Left(tokio::time::sleep_until( + (started_at + *BOUNCE_TIMEOUT).into(), + )) + } else { + futures::future::Either::Right(futures::future::pending()) + }; tokio::select! { biased; _ = self.cancel.cancelled() => { return Err(QueryError::Shutdown) } - msg = pgb.read_message() => { break msg; } - () = futures::future::ready(()), if num_consecutive_getpage_requests > 0 => { - CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM.observe(num_consecutive_getpage_requests as f64); + msg = pgb.read_message() => { + requests.push(msg); + let started_at = debounce.get_or_insert_with(Instant::now); + if started_at.elapsed() > *BOUNCE_TIMEOUT { + break; + } + } + _ = sleep_fut => { + break; + } + } + } + + CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM + .observe(num_consecutive_getpage_requests as f64); + num_consecutive_getpage_requests = 0; + + for msg in requests.drain(..) { + let copy_data_bytes = match msg? { + Some(FeMessage::CopyData(bytes)) => bytes, + Some(FeMessage::Terminate) => break 'outer, + Some(m) => { + return Err(QueryError::Other(anyhow::anyhow!( + "unexpected message: {m:?} during COPY" + ))); + } + None => break 'outer, // client disconnected + }; + + trace!("query: {copy_data_bytes:?}"); + fail::fail_point!("ps::handle-pagerequest-message"); + + // parse request + let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?; + + // invoke handler function + let (handler_result, span) = match neon_fe_msg { + PagestreamFeMessage::Exists(req) => { + CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM + .observe(num_consecutive_getpage_requests as f64); num_consecutive_getpage_requests = 0; + + fail::fail_point!("ps::handle-pagerequest-message::exists"); + let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn); + ( + self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx) + .instrument(span.clone()) + .await, + span, + ) } - } - }; - let copy_data_bytes = match msg? { - Some(FeMessage::CopyData(bytes)) => bytes, - Some(FeMessage::Terminate) => break, - Some(m) => { - return Err(QueryError::Other(anyhow::anyhow!( - "unexpected message: {m:?} during COPY" - ))); - } - None => break, // client disconnected - }; - - trace!("query: {copy_data_bytes:?}"); - fail::fail_point!("ps::handle-pagerequest-message"); - - // parse request - let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?; - - // invoke handler function - let (handler_result, span) = match neon_fe_msg { - PagestreamFeMessage::Exists(req) => { - fail::fail_point!("ps::handle-pagerequest-message::exists"); - let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn); - ( - self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx) - .instrument(span.clone()) - .await, - span, - ) - } - PagestreamFeMessage::Nblocks(req) => { - fail::fail_point!("ps::handle-pagerequest-message::nblocks"); - let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn); - ( - self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx) - .instrument(span.clone()) - .await, - span, - ) - } - PagestreamFeMessage::GetPage(req) => { - num_consecutive_getpage_requests += 1; - fail::fail_point!("ps::handle-pagerequest-message::getpage"); - // shard_id is filled in by the handler - let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn); - ( - self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx) - .instrument(span.clone()) - .await, - span, - ) - } - PagestreamFeMessage::DbSize(req) => { - fail::fail_point!("ps::handle-pagerequest-message::dbsize"); - let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn); - ( - self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx) - .instrument(span.clone()) - .await, - span, - ) - } - PagestreamFeMessage::GetSlruSegment(req) => { - fail::fail_point!("ps::handle-pagerequest-message::slrusegment"); - let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn); - ( - self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx) - .instrument(span.clone()) - .await, - span, - ) - } - }; - - // Map handler result to protocol behavior. - // Some handler errors cause exit from pagestream protocol. - // Other handler errors are sent back as an error message and we stay in pagestream protocol. - let response_msg = match handler_result { - Err(e) => match &e { - PageStreamError::Shutdown => { - // If we fail to fulfil a request during shutdown, which may be _because_ of - // shutdown, then do not send the error to the client. Instead just drop the - // connection. - span.in_scope(|| info!("dropping connection due to shutdown")); - return Err(QueryError::Shutdown); + PagestreamFeMessage::Nblocks(req) => { + CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM + .observe(num_consecutive_getpage_requests as f64); + num_consecutive_getpage_requests = 0; + fail::fail_point!("ps::handle-pagerequest-message::nblocks"); + let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn); + ( + self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx) + .instrument(span.clone()) + .await, + span, + ) } - PageStreamError::Reconnect(reason) => { - span.in_scope(|| info!("handler requested reconnect: {reason}")); - return Err(QueryError::Reconnect); + PagestreamFeMessage::GetPage(req) => { + num_consecutive_getpage_requests += 1; + fail::fail_point!("ps::handle-pagerequest-message::getpage"); + // shard_id is filled in by the handler + let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn); + ( + self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx) + .instrument(span.clone()) + .await, + span, + ) } - PageStreamError::Read(_) - | PageStreamError::LsnTimeout(_) - | PageStreamError::NotFound(_) - | PageStreamError::BadRequest(_) => { - // print the all details to the log with {:#}, but for the client the - // error message is enough. Do not log if shutting down, as the anyhow::Error - // here includes cancellation which is not an error. - let full = utils::error::report_compact_sources(&e); - span.in_scope(|| { - error!("error reading relation or page version: {full:#}") - }); - PagestreamBeMessage::Error(PagestreamErrorResponse { - message: e.to_string(), - }) + PagestreamFeMessage::DbSize(req) => { + CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM + .observe(num_consecutive_getpage_requests as f64); + num_consecutive_getpage_requests = 0; + fail::fail_point!("ps::handle-pagerequest-message::dbsize"); + let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn); + ( + self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx) + .instrument(span.clone()) + .await, + span, + ) } - }, - Ok(response_msg) => response_msg, - }; + PagestreamFeMessage::GetSlruSegment(req) => { + CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM + .observe(num_consecutive_getpage_requests as f64); + num_consecutive_getpage_requests = 0; + fail::fail_point!("ps::handle-pagerequest-message::slrusegment"); + let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn); + ( + self.handle_get_slru_segment_request( + tenant_id, + timeline_id, + &req, + &ctx, + ) + .instrument(span.clone()) + .await, + span, + ) + } + }; - // marshal & transmit response message - pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?; - tokio::select! { - biased; - _ = self.cancel.cancelled() => { - // We were requested to shut down. - info!("shutdown request received in page handler"); - return Err(QueryError::Shutdown) - } - res = pgb.flush() => { - res?; + // Map handler result to protocol behavior. + // Some handler errors cause exit from pagestream protocol. + // Other handler errors are sent back as an error message and we stay in pagestream protocol. + let response_msg = match handler_result { + Err(e) => match &e { + PageStreamError::Shutdown => { + // If we fail to fulfil a request during shutdown, which may be _because_ of + // shutdown, then do not send the error to the client. Instead just drop the + // connection. + span.in_scope(|| info!("dropping connection due to shutdown")); + return Err(QueryError::Shutdown); + } + PageStreamError::Reconnect(reason) => { + span.in_scope(|| info!("handler requested reconnect: {reason}")); + return Err(QueryError::Reconnect); + } + PageStreamError::Read(_) + | PageStreamError::LsnTimeout(_) + | PageStreamError::NotFound(_) + | PageStreamError::BadRequest(_) => { + // print the all details to the log with {:#}, but for the client the + // error message is enough. Do not log if shutting down, as the anyhow::Error + // here includes cancellation which is not an error. + let full = utils::error::report_compact_sources(&e); + span.in_scope(|| { + error!("error reading relation or page version: {full:#}") + }); + PagestreamBeMessage::Error(PagestreamErrorResponse { + message: e.to_string(), + }) + } + }, + Ok(response_msg) => response_msg, + }; + + // marshal & transmit response message + pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?; + tokio::select! { + biased; + _ = self.cancel.cancelled() => { + // We were requested to shut down. + info!("shutdown request received in page handler"); + return Err(QueryError::Shutdown) + } + res = pgb.flush() => { + res?; + } } } }