diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 3472dda378..4ed6b1ef27 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -22,6 +22,7 @@ use pq_proto::FeStartupPacket; use pq_proto::{BeMessage, FeMessage, RowDescriptor}; use std::borrow::Cow; use std::io; +use std::pin::Pin; use std::str; use std::str::FromStr; use std::sync::Arc; @@ -316,6 +317,8 @@ struct PageServerHandler { /// See [`PageServerConf::server_side_batch_timeout`] server_side_batch_timeout: Option, + + server_side_batch_timer: Pin>, } struct Carry { @@ -585,6 +588,7 @@ impl PageServerHandler { timeline_handles: TimelineHandles::new(tenant_manager), cancel, server_side_batch_timeout, + server_side_batch_timer: Box::pin(tokio::time::sleep(Duration::ZERO)), } } @@ -650,24 +654,12 @@ impl PageServerHandler { } Some(batch_timeout) => { // Take into consideration the time the carry spent waiting. - let batch_timeout = - batch_timeout.saturating_sub(carry.started_at.elapsed()); - if batch_timeout.is_zero() { - // the timer doesn't support restarting with zero duration - return Ok(BatchOrEof::Batch(smallvec::smallvec![ - maybe_carry - .take() - .expect("we already checked it's Some") - .msg - ])); - } else { - batching_deadline_storage = Some(Box::pin(async move { - tokio::time::sleep(batch_timeout).await; - })); - Either::Right( - batching_deadline_storage.as_mut().expect("we just set it"), - ) - } + let batch_deadline = carry.started_at + batch_timeout; + self.server_side_batch_timer.as_mut().reset(batch_deadline.into()); + batching_deadline_storage = Some(&mut self.server_side_batch_timer); + Either::Right( + batching_deadline_storage.as_mut().expect("we just set it"), + ) } } }