diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 3c15d0353d..3e01887355 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -23,6 +23,7 @@ use pq_proto::FeStartupPacket; use pq_proto::{BeMessage, FeMessage, RowDescriptor}; use std::borrow::Cow; use std::io; +use std::pin::Pin; use std::str; use std::str::FromStr; use std::sync::Arc; @@ -318,7 +319,7 @@ struct PageServerHandler { /// See [`PageServerConf::server_side_batch_timeout`] server_side_batch_timeout: Option, - server_side_batch_interval: Option, + server_side_batch_timer: Pin>, } struct Carry { @@ -588,8 +589,7 @@ impl PageServerHandler { timeline_handles: TimelineHandles::new(tenant_manager), cancel, server_side_batch_timeout, - server_side_batch_interval: server_side_batch_timeout - .map(|timeout| async_timer::Interval::new(timeout)), + server_side_batch_timer: Box::pin(async_timer::new_timer(Duration::from_secs(999))), // reset each iteration } } @@ -631,22 +631,50 @@ impl PageServerHandler { { debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); + let mut batching_deadline_storage = None; // TODO: can this just be an unsync once_cell? + loop { // Create a future that will become ready when we need to stop batching. use futures::future::Either; - let batching_deadline = match &*maybe_carry as &Option { - None => Either::Left(futures::future::pending()), // there's no deadline before we have something batched - Some(carry) => match &mut self.server_side_batch_interval { - None => { - return Ok(BatchOrEof::Batch(smallvec::smallvec![ - maybe_carry - .take() - .expect("we already checked it's Some") - .msg - ])) + let batching_deadline = match ( + &*maybe_carry as &Option, + &mut batching_deadline_storage, + ) { + (None, None) => Either::Left(futures::future::pending()), // there's no deadline before we have something batched + (None, Some(_)) => unreachable!(), + (Some(_), Some(fut)) => Either::Right(fut), // below arm already ran + (Some(carry), None) => { + match self.server_side_batch_timeout { + None => { + return Ok(BatchOrEof::Batch(smallvec::smallvec![ + maybe_carry + .take() + .expect("we already checked it's Some") + .msg + ])) + } + Some(batch_timeout) => { + // Take into consideration the time the carry spent waiting. + let batch_timeout = + batch_timeout.saturating_sub(carry.started_at.elapsed()); + if batch_timeout.is_zero() { + // the timer doesn't support restarting with zero duration + return Ok(BatchOrEof::Batch(smallvec::smallvec![ + maybe_carry + .take() + .expect("we already checked it's Some") + .msg + ])); + } else { + self.server_side_batch_timer.restart(batch_timeout); + batching_deadline_storage = Some(&mut self.server_side_batch_timer); + Either::Right( + batching_deadline_storage.as_mut().expect("we just set it"), + ) + } + } } - Some(interval) => Either::Right(interval), - }, + } }; let msg = tokio::select! { biased;