diff --git a/Cargo.lock b/Cargo.lock index a1b01de453..c7af140f7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3655,7 +3655,6 @@ dependencies = [ "tokio-postgres", "tokio-stream", "tokio-tar", - "tokio-timerfd", "tokio-util", "toml_edit", "tracing", @@ -6136,15 +6135,6 @@ dependencies = [ "time-core", ] -[[package]] -name = "timerfd" -version = "1.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84e482e368cf7efa2c8b570f476e5b9fd9fd5e9b9219fc567832b05f13511091" -dependencies = [ - "rustix", -] - [[package]] name = "tiny-keccak" version = "2.0.2" @@ -6329,19 +6319,6 @@ dependencies = [ "xattr", ] -[[package]] -name = "tokio-timerfd" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87eecdae9a9b793843b1df7a64bc136f203443c1ca9889b3c4a39590afa51094" -dependencies = [ - "futures-core", - "libc", - "slab", - "timerfd", - "tokio", -] - [[package]] name = "tokio-tungstenite" version = "0.21.0" diff --git a/Cargo.toml b/Cargo.toml index 96b56762e0..dbda930535 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -177,7 +177,6 @@ tokio-postgres-rustls = "0.12.0" tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"]} tokio-stream = "0.1" tokio-tar = "0.3" -tokio-timerfd = "0.2.0" tokio-util = { version = "0.7.10", features = ["io", "rt"] } toml = "0.8" toml_edit = "0.22" diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 135aee6d54..143d8236df 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -63,7 +63,6 @@ tokio-postgres.workspace = true tokio-stream.workspace = true tokio-util.workspace = true toml_edit = { workspace = true, features = [ "serde" ] } -tokio-timerfd.workspace = true tracing.workspace = true url.workspace = true walkdir.workspace = true diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 0e58997769..3472dda378 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -316,8 +316,6 @@ struct PageServerHandler { /// See [`PageServerConf::server_side_batch_timeout`] server_side_batch_timeout: Option, - - server_side_batch_timer: tokio_timerfd::Delay, } struct Carry { @@ -587,7 +585,6 @@ impl PageServerHandler { timeline_handles: TimelineHandles::new(tenant_manager), cancel, server_side_batch_timeout, - server_side_batch_timer: tokio_timerfd::Delay::new(Instant::now()).unwrap(), } } @@ -653,12 +650,24 @@ impl PageServerHandler { } Some(batch_timeout) => { // Take into consideration the time the carry spent waiting. - let batch_deadline = carry.started_at + batch_timeout; - self.server_side_batch_timer.reset(batch_deadline); - batching_deadline_storage = Some(&mut self.server_side_batch_timer); - Either::Right( - batching_deadline_storage.as_mut().expect("we just set it"), - ) + let batch_timeout = + batch_timeout.saturating_sub(carry.started_at.elapsed()); + if batch_timeout.is_zero() { + // the timer doesn't support restarting with zero duration + return Ok(BatchOrEof::Batch(smallvec::smallvec![ + maybe_carry + .take() + .expect("we already checked it's Some") + .msg + ])); + } else { + batching_deadline_storage = Some(Box::pin(async move { + tokio::time::sleep(batch_timeout).await; + })); + Either::Right( + batching_deadline_storage.as_mut().expect("we just set it"), + ) + } } } }