mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
fix execution errors
This commit is contained in:
@@ -1311,31 +1311,31 @@ impl PageServerHandler {
|
||||
// call, which (all unmeasured) adds syscall overhead but reduces time to first byte
|
||||
// and avoids building up a "giant" contiguous userspace buffer to hold the entire response.
|
||||
// TODO: vectored socket IO would be great, but pgb_writer doesn't support that.
|
||||
let flush_timers = {
|
||||
let flushing_start_time = Instant::now();
|
||||
let mut flush_timers = Vec::with_capacity(handler_results.len());
|
||||
for handler_result in &mut handler_results {
|
||||
let flush_timer = match handler_result {
|
||||
Ok((_, timer)) => Some(
|
||||
timer
|
||||
.observe_execution_end(flushing_start_time)
|
||||
.expect("we are the first caller"),
|
||||
),
|
||||
Err(_) => {
|
||||
// TODO: measure errors
|
||||
None
|
||||
}
|
||||
};
|
||||
flush_timers.push(flush_timer);
|
||||
}
|
||||
assert_eq!(flush_timers.len(), handler_results.len());
|
||||
flush_timers
|
||||
};
|
||||
// let flush_timers = {
|
||||
// let flushing_start_time = Instant::now();
|
||||
// let mut flush_timers = Vec::with_capacity(handler_results.len());
|
||||
// for handler_result in &mut handler_results {
|
||||
// let flush_timer = match handler_result {
|
||||
// Ok((_, timer)) => Some(
|
||||
// timer
|
||||
// .observe_execution_end(flushing_start_time)
|
||||
// .expect("we are the first caller"),
|
||||
// ),
|
||||
// Err(_) => {
|
||||
// // TODO: measure errors
|
||||
// None
|
||||
// }
|
||||
// };
|
||||
// flush_timers.push(flush_timer);
|
||||
// }
|
||||
// assert_eq!(flush_timers.len(), handler_results.len());
|
||||
// flush_timers
|
||||
// };
|
||||
|
||||
// Map handler result to protocol behavior.
|
||||
// Some handler errors cause exit from pagestream protocol.
|
||||
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
|
||||
for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) {
|
||||
for handler_result in handler_results.into_iter() {
|
||||
let response_msg = match handler_result {
|
||||
Err(e) => match &e.err {
|
||||
PageStreamError::Shutdown => {
|
||||
@@ -1384,14 +1384,14 @@ impl PageServerHandler {
|
||||
let socket_fd = pgb_writer.socket_fd;
|
||||
let flush_fut = pgb_writer.flush();
|
||||
// metric for how long flushing takes
|
||||
let flush_fut = match flushing_timer {
|
||||
Some(flushing_timer) => futures::future::Either::Left(flushing_timer.measure(
|
||||
Instant::now(),
|
||||
flush_fut,
|
||||
socket_fd,
|
||||
)),
|
||||
None => futures::future::Either::Right(flush_fut),
|
||||
};
|
||||
// let flush_fut = match flushing_timer {
|
||||
// Some(flushing_timer) => futures::future::Either::Left(flushing_timer.measure(
|
||||
// Instant::now(),
|
||||
// flush_fut,
|
||||
// socket_fd,
|
||||
// )),
|
||||
// None => futures::future::Either::Right(flush_fut),
|
||||
// };
|
||||
// do it while respecting cancellation
|
||||
let _: () = async move {
|
||||
tokio::select! {
|
||||
|
||||
Reference in New Issue
Block a user