From 89b6cb8ebaf5ce79f801d0784391d6686765f720 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 20 Nov 2024 20:17:49 +0100 Subject: [PATCH] Revert "vanilla tokio based timer impl based on tokio::time::Sleep" This reverts commit 517dda849f713d3374d44dd397549e5a2769a3ac. --- pageserver/src/page_service.rs | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 4ed6b1ef27..3472dda378 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -22,7 +22,6 @@ use pq_proto::FeStartupPacket; use pq_proto::{BeMessage, FeMessage, RowDescriptor}; use std::borrow::Cow; use std::io; -use std::pin::Pin; use std::str; use std::str::FromStr; use std::sync::Arc; @@ -317,8 +316,6 @@ struct PageServerHandler { /// See [`PageServerConf::server_side_batch_timeout`] server_side_batch_timeout: Option, - - server_side_batch_timer: Pin>, } struct Carry { @@ -588,7 +585,6 @@ impl PageServerHandler { timeline_handles: TimelineHandles::new(tenant_manager), cancel, server_side_batch_timeout, - server_side_batch_timer: Box::pin(tokio::time::sleep(Duration::ZERO)), } } @@ -654,12 +650,24 @@ impl PageServerHandler { } Some(batch_timeout) => { // Take into consideration the time the carry spent waiting. - let batch_deadline = carry.started_at + batch_timeout; - self.server_side_batch_timer.as_mut().reset(batch_deadline.into()); - batching_deadline_storage = Some(&mut self.server_side_batch_timer); - Either::Right( - batching_deadline_storage.as_mut().expect("we just set it"), - ) + let batch_timeout = + batch_timeout.saturating_sub(carry.started_at.elapsed()); + if batch_timeout.is_zero() { + // the timer doesn't support restarting with zero duration + return Ok(BatchOrEof::Batch(smallvec::smallvec![ + maybe_carry + .take() + .expect("we already checked it's Some") + .msg + ])); + } else { + batching_deadline_storage = Some(Box::pin(async move { + tokio::time::sleep(batch_timeout).await; + })); + Either::Right( + batching_deadline_storage.as_mut().expect("we just set it"), + ) + } } } }