diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index c4011d593c..3eccfe22a8 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -3170,6 +3170,16 @@ static TOKIO_EXECUTOR_THREAD_COUNT: Lazy = Lazy::new(|| { .unwrap() }); +pub(crate) static CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM: Lazy = + Lazy::new(|| { + register_histogram!( + "pageserver_consecutive_nonblocking_getpage_requests", + "Number of consecutive nonblocking getpage requests", + vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, ], + ) + .unwrap() + }); + pub(crate) fn set_tokio_runtime_setup(setup: &str, num_threads: NonZeroUsize) { static SERIALIZE: std::sync::Mutex<()> = std::sync::Mutex::new(()); let _guard = SERIALIZE.lock().unwrap(); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 39c6a6fb74..8f326de5ed 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -43,7 +43,7 @@ use crate::basebackup; use crate::basebackup::BasebackupError; use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; -use crate::metrics; +use crate::metrics::{self, CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM}; use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS}; use crate::pgdatadir_mapping::Version; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id; @@ -577,14 +577,21 @@ impl PageServerHandler { } } + let mut num_consecutive_getpage_requests = 0; loop { // read request bytes (it's exactly 1 PagestreamFeMessage per CopyData) - let msg = tokio::select! { - biased; - _ = self.cancel.cancelled() => { - return Err(QueryError::Shutdown) + let msg = loop { + tokio::select! { + biased; + _ = self.cancel.cancelled() => { + return Err(QueryError::Shutdown) + } + msg = pgb.read_message() => { break msg; } + () = futures::future::ready(()) => { + CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM.observe(num_consecutive_getpage_requests as f64); + num_consecutive_getpage_requests = 0; + } } - msg = pgb.read_message() => { msg } }; let copy_data_bytes = match msg? { Some(FeMessage::CopyData(bytes)) => bytes, @@ -626,6 +633,7 @@ impl PageServerHandler { ) } PagestreamFeMessage::GetPage(req) => { + num_consecutive_getpage_requests += 1; fail::fail_point!("ps::handle-pagerequest-message::getpage"); // shard_id is filled in by the handler let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);