From 721643beedce7da9005b5dc32cf1d139ea5fa58b Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 20 Nov 2024 18:50:48 +0100 Subject: [PATCH] try interval-based impl to cross-chec => zero batching https://www.notion.so/neondatabase/benchmarking-notes-143f189e004780c4a630cb5f426e39ba?pvs=4#144f189e00478065a9b3e51726082885 --- pageserver/src/page_service.rs | 58 +++++++++------------------------- 1 file changed, 15 insertions(+), 43 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 3e01887355..3c15d0353d 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -23,7 +23,6 @@ use pq_proto::FeStartupPacket; use pq_proto::{BeMessage, FeMessage, RowDescriptor}; use std::borrow::Cow; use std::io; -use std::pin::Pin; use std::str; use std::str::FromStr; use std::sync::Arc; @@ -319,7 +318,7 @@ struct PageServerHandler { /// See [`PageServerConf::server_side_batch_timeout`] server_side_batch_timeout: Option, - server_side_batch_timer: Pin>, + server_side_batch_interval: Option, } struct Carry { @@ -589,7 +588,8 @@ impl PageServerHandler { timeline_handles: TimelineHandles::new(tenant_manager), cancel, server_side_batch_timeout, - server_side_batch_timer: Box::pin(async_timer::new_timer(Duration::from_secs(999))), // reset each iteration + server_side_batch_interval: server_side_batch_timeout + .map(|timeout| async_timer::Interval::new(timeout)), } } @@ -631,50 +631,22 @@ impl PageServerHandler { { debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); - let mut batching_deadline_storage = None; // TODO: can this just be an unsync once_cell? - loop { // Create a future that will become ready when we need to stop batching. use futures::future::Either; - let batching_deadline = match ( - &*maybe_carry as &Option, - &mut batching_deadline_storage, - ) { - (None, None) => Either::Left(futures::future::pending()), // there's no deadline before we have something batched - (None, Some(_)) => unreachable!(), - (Some(_), Some(fut)) => Either::Right(fut), // below arm already ran - (Some(carry), None) => { - match self.server_side_batch_timeout { - None => { - return Ok(BatchOrEof::Batch(smallvec::smallvec![ - maybe_carry - .take() - .expect("we already checked it's Some") - .msg - ])) - } - Some(batch_timeout) => { - // Take into consideration the time the carry spent waiting. - let batch_timeout = - batch_timeout.saturating_sub(carry.started_at.elapsed()); - if batch_timeout.is_zero() { - // the timer doesn't support restarting with zero duration - return Ok(BatchOrEof::Batch(smallvec::smallvec![ - maybe_carry - .take() - .expect("we already checked it's Some") - .msg - ])); - } else { - self.server_side_batch_timer.restart(batch_timeout); - batching_deadline_storage = Some(&mut self.server_side_batch_timer); - Either::Right( - batching_deadline_storage.as_mut().expect("we just set it"), - ) - } - } + let batching_deadline = match &*maybe_carry as &Option { + None => Either::Left(futures::future::pending()), // there's no deadline before we have something batched + Some(carry) => match &mut self.server_side_batch_interval { + None => { + return Ok(BatchOrEof::Batch(smallvec::smallvec![ + maybe_carry + .take() + .expect("we already checked it's Some") + .msg + ])) } - } + Some(interval) => Either::Right(interval), + }, }; let msg = tokio::select! { biased;