From 12124b28d0290d396f9b5d4902129f57c7085a4a Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 20 Nov 2024 15:25:14 +0100 Subject: [PATCH] tokio_timerfd::Interval Resolution not high enough to do _any_ batching at 10us or 20us https://www.notion.so/neondatabase/benchmarking-notes-143f189e004780c4a630cb5f426e39ba?pvs=4#144f189e0047800fb74bd8f4ab6cf8e2 --- Cargo.lock | 23 +++++++ Cargo.toml | 1 + pageserver/Cargo.toml | 1 + pageserver/src/page_service.rs | 62 ++++++------------- .../test_pageserver_getpage_merge.py | 4 +- 5 files changed, 48 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c7af140f7d..a1b01de453 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3655,6 +3655,7 @@ dependencies = [ "tokio-postgres", "tokio-stream", "tokio-tar", + "tokio-timerfd", "tokio-util", "toml_edit", "tracing", @@ -6135,6 +6136,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "timerfd" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84e482e368cf7efa2c8b570f476e5b9fd9fd5e9b9219fc567832b05f13511091" +dependencies = [ + "rustix", +] + [[package]] name = "tiny-keccak" version = "2.0.2" @@ -6319,6 +6329,19 @@ dependencies = [ "xattr", ] +[[package]] +name = "tokio-timerfd" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87eecdae9a9b793843b1df7a64bc136f203443c1ca9889b3c4a39590afa51094" +dependencies = [ + "futures-core", + "libc", + "slab", + "timerfd", + "tokio", +] + [[package]] name = "tokio-tungstenite" version = "0.21.0" diff --git a/Cargo.toml b/Cargo.toml index dbda930535..96b56762e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -177,6 +177,7 @@ tokio-postgres-rustls = "0.12.0" tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"]} tokio-stream = "0.1" tokio-tar = "0.3" +tokio-timerfd = "0.2.0" tokio-util = { version = "0.7.10", features = ["io", "rt"] } toml = "0.8" toml_edit = "0.22" diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 143d8236df..135aee6d54 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -63,6 +63,7 @@ tokio-postgres.workspace = true tokio-stream.workspace = true tokio-util.workspace = true toml_edit = { workspace = true, features = [ "serde" ] } +tokio-timerfd.workspace = true tracing.workspace = true url.workspace = true walkdir.workspace = true diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 3472dda378..1d4ef5e42c 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -4,7 +4,7 @@ use anyhow::{bail, Context}; use async_compression::tokio::write::GzipEncoder; use bytes::Buf; -use futures::FutureExt; +use futures::{FutureExt, StreamExt}; use itertools::Itertools; use once_cell::sync::OnceCell; use pageserver_api::models::{self, TenantState}; @@ -316,6 +316,8 @@ struct PageServerHandler { /// See [`PageServerConf::server_side_batch_timeout`] server_side_batch_timeout: Option, + + server_side_batch_interval: Option, } struct Carry { @@ -585,6 +587,11 @@ impl PageServerHandler { timeline_handles: TimelineHandles::new(tenant_manager), cancel, server_side_batch_timeout, + server_side_batch_interval: server_side_batch_timeout.map(|timeout| { + // The timerfd missed tick behavior is equivalent to + // tokio::time::MissedTickBehavior::Delay + tokio_timerfd::Interval::new_interval(timeout).expect("TODO") + }), } } @@ -626,51 +633,22 @@ impl PageServerHandler { { debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); - let mut batching_deadline_storage = None; // TODO: can this just be an unsync once_cell? - loop { // Create a future that will become ready when we need to stop batching. use futures::future::Either; - let batching_deadline = match ( - &*maybe_carry as &Option, - &mut batching_deadline_storage, - ) { - (None, None) => Either::Left(futures::future::pending()), // there's no deadline before we have something batched - (None, Some(_)) => unreachable!(), - (Some(_), Some(fut)) => Either::Right(fut), // below arm already ran - (Some(carry), None) => { - match self.server_side_batch_timeout { - None => { - return Ok(BatchOrEof::Batch(smallvec::smallvec![ - maybe_carry - .take() - .expect("we already checked it's Some") - .msg - ])) - } - Some(batch_timeout) => { - // Take into consideration the time the carry spent waiting. - let batch_timeout = - batch_timeout.saturating_sub(carry.started_at.elapsed()); - if batch_timeout.is_zero() { - // the timer doesn't support restarting with zero duration - return Ok(BatchOrEof::Batch(smallvec::smallvec![ - maybe_carry - .take() - .expect("we already checked it's Some") - .msg - ])); - } else { - batching_deadline_storage = Some(Box::pin(async move { - tokio::time::sleep(batch_timeout).await; - })); - Either::Right( - batching_deadline_storage.as_mut().expect("we just set it"), - ) - } - } + let batching_deadline = match &*maybe_carry as &Option { + None => Either::Left(futures::future::pending()), // there's no deadline before we have something batched + Some(carry) => match &mut self.server_side_batch_interval { + None => { + return Ok(BatchOrEof::Batch(smallvec::smallvec![ + maybe_carry + .take() + .expect("we already checked it's Some") + .msg + ])) } - } + Some(interval) => Either::Right(interval.next()), + }, }; let msg = tokio::select! { biased; diff --git a/test_runner/performance/pageserver/test_pageserver_getpage_merge.py b/test_runner/performance/pageserver/test_pageserver_getpage_merge.py index be7cf66c79..850bc1b8df 100644 --- a/test_runner/performance/pageserver/test_pageserver_getpage_merge.py +++ b/test_runner/performance/pageserver/test_pageserver_getpage_merge.py @@ -9,7 +9,7 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnvBuilder from fixtures.utils import humantime_to_ms -TARGET_RUNTIME = 60 +TARGET_RUNTIME = 5 @pytest.mark.parametrize( @@ -18,10 +18,12 @@ TARGET_RUNTIME = 60 # the next 4 cases demonstrate how not-batchable workloads suffer from batching timeout (50, None, TARGET_RUNTIME, 1, 128, "not batchable no batching"), (50, "10us", TARGET_RUNTIME, 1, 128, "not batchable 10us timeout"), + (50, "20us", TARGET_RUNTIME, 1, 128, "not batchable 20us timeout"), (50, "1ms", TARGET_RUNTIME, 1, 128, "not batchable 1ms timeout"), # the next 4 cases demonstrate how batchable workloads benefit from batching (50, None, TARGET_RUNTIME, 100, 128, "batchable no batching"), (50, "10us", TARGET_RUNTIME, 100, 128, "batchable 10us timeout"), + (50, "20us", TARGET_RUNTIME, 100, 128, "batchable 20us timeout"), (50, "100us", TARGET_RUNTIME, 100, 128, "batchable 100us timeout"), (50, "1ms", TARGET_RUNTIME, 100, 128, "batchable 1ms timeout"), ],