deboucner: move decoding into debounce loop

This commit is contained in:
Christian Schwarz
2024-09-12 10:58:09 +00:00
parent 2d6763882e
commit ac2702afd3

View File

@@ -596,29 +596,18 @@ impl PageServerHandler {
} else {
futures::future::Either::Right(futures::future::pending())
};
tokio::select! {
let msg = tokio::select! {
biased;
_ = self.cancel.cancelled() => {
return Err(QueryError::Shutdown)
}
msg = pgb.read_message() => {
requests.push(msg);
let started_at = debounce.get_or_insert_with(Instant::now);
if started_at.elapsed() > *BOUNCE_TIMEOUT {
break;
}
msg
}
_ = sleep_fut => {
break;
}
}
}
CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM
.observe(num_consecutive_getpage_requests as f64);
num_consecutive_getpage_requests = 0;
for msg in requests.drain(..) {
};
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Terminate) => break 'outer,
@@ -629,13 +618,26 @@ impl PageServerHandler {
}
None => break 'outer, // client disconnected
};
trace!("query: {copy_data_bytes:?}");
fail::fail_point!("ps::handle-pagerequest-message");
// parse request
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
requests.push(neon_fe_msg);
// debounce
let started_at = debounce.get_or_insert_with(Instant::now);
if started_at.elapsed() > *BOUNCE_TIMEOUT {
break;
}
}
CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM
.observe(num_consecutive_getpage_requests as f64);
num_consecutive_getpage_requests = 0;
for neon_fe_msg in requests.drain(..) {
// invoke handler function
let (handler_result, span) = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {