From f9bf038d2cb297351a38421cec403b38b0da7fcc Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 20 Nov 2024 15:25:52 +0100 Subject: [PATCH] Revert "tokio_timerfd::Interval" This reverts commit 12124b28d0290d396f9b5d4902129f57c7085a4a. --- Cargo.lock | 23 ------- Cargo.toml | 1 - pageserver/Cargo.toml | 1 - pageserver/src/page_service.rs | 62 +++++++++++++------ .../test_pageserver_getpage_merge.py | 4 +- 5 files changed, 43 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a1b01de453..c7af140f7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3655,7 +3655,6 @@ dependencies = [ "tokio-postgres", "tokio-stream", "tokio-tar", - "tokio-timerfd", "tokio-util", "toml_edit", "tracing", @@ -6136,15 +6135,6 @@ dependencies = [ "time-core", ] -[[package]] -name = "timerfd" -version = "1.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84e482e368cf7efa2c8b570f476e5b9fd9fd5e9b9219fc567832b05f13511091" -dependencies = [ - "rustix", -] - [[package]] name = "tiny-keccak" version = "2.0.2" @@ -6329,19 +6319,6 @@ dependencies = [ "xattr", ] -[[package]] -name = "tokio-timerfd" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87eecdae9a9b793843b1df7a64bc136f203443c1ca9889b3c4a39590afa51094" -dependencies = [ - "futures-core", - "libc", - "slab", - "timerfd", - "tokio", -] - [[package]] name = "tokio-tungstenite" version = "0.21.0" diff --git a/Cargo.toml b/Cargo.toml index 96b56762e0..dbda930535 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -177,7 +177,6 @@ tokio-postgres-rustls = "0.12.0" tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"]} tokio-stream = "0.1" tokio-tar = "0.3" -tokio-timerfd = "0.2.0" tokio-util = { version = "0.7.10", features = ["io", "rt"] } toml = "0.8" toml_edit = "0.22" diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 135aee6d54..143d8236df 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -63,7 +63,6 @@ tokio-postgres.workspace = true tokio-stream.workspace = true tokio-util.workspace = true toml_edit = { workspace = true, features = [ "serde" ] } -tokio-timerfd.workspace = true tracing.workspace = true url.workspace = true walkdir.workspace = true diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 1d4ef5e42c..3472dda378 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -4,7 +4,7 @@ use anyhow::{bail, Context}; use async_compression::tokio::write::GzipEncoder; use bytes::Buf; -use futures::{FutureExt, StreamExt}; +use futures::FutureExt; use itertools::Itertools; use once_cell::sync::OnceCell; use pageserver_api::models::{self, TenantState}; @@ -316,8 +316,6 @@ struct PageServerHandler { /// See [`PageServerConf::server_side_batch_timeout`] server_side_batch_timeout: Option, - - server_side_batch_interval: Option, } struct Carry { @@ -587,11 +585,6 @@ impl PageServerHandler { timeline_handles: TimelineHandles::new(tenant_manager), cancel, server_side_batch_timeout, - server_side_batch_interval: server_side_batch_timeout.map(|timeout| { - // The timerfd missed tick behavior is equivalent to - // tokio::time::MissedTickBehavior::Delay - tokio_timerfd::Interval::new_interval(timeout).expect("TODO") - }), } } @@ -633,22 +626,51 @@ impl PageServerHandler { { debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); + let mut batching_deadline_storage = None; // TODO: can this just be an unsync once_cell? + loop { // Create a future that will become ready when we need to stop batching. use futures::future::Either; - let batching_deadline = match &*maybe_carry as &Option { - None => Either::Left(futures::future::pending()), // there's no deadline before we have something batched - Some(carry) => match &mut self.server_side_batch_interval { - None => { - return Ok(BatchOrEof::Batch(smallvec::smallvec![ - maybe_carry - .take() - .expect("we already checked it's Some") - .msg - ])) + let batching_deadline = match ( + &*maybe_carry as &Option, + &mut batching_deadline_storage, + ) { + (None, None) => Either::Left(futures::future::pending()), // there's no deadline before we have something batched + (None, Some(_)) => unreachable!(), + (Some(_), Some(fut)) => Either::Right(fut), // below arm already ran + (Some(carry), None) => { + match self.server_side_batch_timeout { + None => { + return Ok(BatchOrEof::Batch(smallvec::smallvec![ + maybe_carry + .take() + .expect("we already checked it's Some") + .msg + ])) + } + Some(batch_timeout) => { + // Take into consideration the time the carry spent waiting. + let batch_timeout = + batch_timeout.saturating_sub(carry.started_at.elapsed()); + if batch_timeout.is_zero() { + // the timer doesn't support restarting with zero duration + return Ok(BatchOrEof::Batch(smallvec::smallvec![ + maybe_carry + .take() + .expect("we already checked it's Some") + .msg + ])); + } else { + batching_deadline_storage = Some(Box::pin(async move { + tokio::time::sleep(batch_timeout).await; + })); + Either::Right( + batching_deadline_storage.as_mut().expect("we just set it"), + ) + } + } } - Some(interval) => Either::Right(interval.next()), - }, + } }; let msg = tokio::select! { biased; diff --git a/test_runner/performance/pageserver/test_pageserver_getpage_merge.py b/test_runner/performance/pageserver/test_pageserver_getpage_merge.py index 850bc1b8df..be7cf66c79 100644 --- a/test_runner/performance/pageserver/test_pageserver_getpage_merge.py +++ b/test_runner/performance/pageserver/test_pageserver_getpage_merge.py @@ -9,7 +9,7 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnvBuilder from fixtures.utils import humantime_to_ms -TARGET_RUNTIME = 5 +TARGET_RUNTIME = 60 @pytest.mark.parametrize( @@ -18,12 +18,10 @@ TARGET_RUNTIME = 5 # the next 4 cases demonstrate how not-batchable workloads suffer from batching timeout (50, None, TARGET_RUNTIME, 1, 128, "not batchable no batching"), (50, "10us", TARGET_RUNTIME, 1, 128, "not batchable 10us timeout"), - (50, "20us", TARGET_RUNTIME, 1, 128, "not batchable 20us timeout"), (50, "1ms", TARGET_RUNTIME, 1, 128, "not batchable 1ms timeout"), # the next 4 cases demonstrate how batchable workloads benefit from batching (50, None, TARGET_RUNTIME, 100, 128, "batchable no batching"), (50, "10us", TARGET_RUNTIME, 100, 128, "batchable 10us timeout"), - (50, "20us", TARGET_RUNTIME, 100, 128, "batchable 20us timeout"), (50, "100us", TARGET_RUNTIME, 100, 128, "batchable 100us timeout"), (50, "1ms", TARGET_RUNTIME, 100, 128, "batchable 1ms timeout"), ],