diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index b5c6353334..daac888963 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -596,29 +596,18 @@ impl PageServerHandler { } else { futures::future::Either::Right(futures::future::pending()) }; - tokio::select! { + let msg = tokio::select! { biased; _ = self.cancel.cancelled() => { return Err(QueryError::Shutdown) } msg = pgb.read_message() => { - requests.push(msg); - let started_at = debounce.get_or_insert_with(Instant::now); - if started_at.elapsed() > *BOUNCE_TIMEOUT { - break; - } + msg } _ = sleep_fut => { break; } - } - } - - CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM - .observe(num_consecutive_getpage_requests as f64); - num_consecutive_getpage_requests = 0; - - for msg in requests.drain(..) { + }; let copy_data_bytes = match msg? { Some(FeMessage::CopyData(bytes)) => bytes, Some(FeMessage::Terminate) => break 'outer, @@ -629,13 +618,26 @@ impl PageServerHandler { } None => break 'outer, // client disconnected }; - trace!("query: {copy_data_bytes:?}"); fail::fail_point!("ps::handle-pagerequest-message"); // parse request let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?; + requests.push(neon_fe_msg); + + // debounce + let started_at = debounce.get_or_insert_with(Instant::now); + if started_at.elapsed() > *BOUNCE_TIMEOUT { + break; + } + } + + CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM + .observe(num_consecutive_getpage_requests as f64); + num_consecutive_getpage_requests = 0; + + for neon_fe_msg in requests.drain(..) { // invoke handler function let (handler_result, span) = match neon_fe_msg { PagestreamFeMessage::Exists(req) => {