tokio_timerfd::Delay based impl

Performs identically great to the async-timer::Timer features=tokio1 impl
Makes sense because it's the same thing that's happening under the hood.

https://www.notion.so/neondatabase/benchmarking-notes-143f189e004780c4a630cb5f426e39ba?pvs=4#144f189e004780ea9decc82281f6b8d1
This commit is contained in:
Christian Schwarz
2024-11-20 19:35:51 +01:00
parent 469ce810fc
commit fcda7a72c6
4 changed files with 34 additions and 18 deletions

23
Cargo.lock generated
View File

@@ -3655,6 +3655,7 @@ dependencies = [
"tokio-postgres",
"tokio-stream",
"tokio-tar",
"tokio-timerfd",
"tokio-util",
"toml_edit",
"tracing",
@@ -6135,6 +6136,15 @@ dependencies = [
"time-core",
]
[[package]]
name = "timerfd"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84e482e368cf7efa2c8b570f476e5b9fd9fd5e9b9219fc567832b05f13511091"
dependencies = [
"rustix",
]
[[package]]
name = "tiny-keccak"
version = "2.0.2"
@@ -6319,6 +6329,19 @@ dependencies = [
"xattr",
]
[[package]]
name = "tokio-timerfd"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87eecdae9a9b793843b1df7a64bc136f203443c1ca9889b3c4a39590afa51094"
dependencies = [
"futures-core",
"libc",
"slab",
"timerfd",
"tokio",
]
[[package]]
name = "tokio-tungstenite"
version = "0.21.0"

View File

@@ -177,6 +177,7 @@ tokio-postgres-rustls = "0.12.0"
tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"]}
tokio-stream = "0.1"
tokio-tar = "0.3"
tokio-timerfd = "0.2.0"
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
toml = "0.8"
toml_edit = "0.22"

View File

@@ -63,6 +63,7 @@ tokio-postgres.workspace = true
tokio-stream.workspace = true
tokio-util.workspace = true
toml_edit = { workspace = true, features = [ "serde" ] }
tokio-timerfd.workspace = true
tracing.workspace = true
url.workspace = true
walkdir.workspace = true

View File

@@ -316,6 +316,8 @@ struct PageServerHandler {
/// See [`PageServerConf::server_side_batch_timeout`]
server_side_batch_timeout: Option<Duration>,
server_side_batch_timer: tokio_timerfd::Delay,
}
struct Carry {
@@ -585,6 +587,7 @@ impl PageServerHandler {
timeline_handles: TimelineHandles::new(tenant_manager),
cancel,
server_side_batch_timeout,
server_side_batch_timer: tokio_timerfd::Delay::new(Instant::now()).unwrap(),
}
}
@@ -650,24 +653,12 @@ impl PageServerHandler {
}
Some(batch_timeout) => {
// Take into consideration the time the carry spent waiting.
let batch_timeout =
batch_timeout.saturating_sub(carry.started_at.elapsed());
if batch_timeout.is_zero() {
// the timer doesn't support restarting with zero duration
return Ok(BatchOrEof::Batch(smallvec::smallvec![
maybe_carry
.take()
.expect("we already checked it's Some")
.msg
]));
} else {
batching_deadline_storage = Some(Box::pin(async move {
tokio::time::sleep(batch_timeout).await;
}));
Either::Right(
batching_deadline_storage.as_mut().expect("we just set it"),
)
}
let batch_deadline = carry.started_at + batch_timeout;
self.server_side_batch_timer.reset(batch_deadline);
batching_deadline_storage = Some(&mut self.server_side_batch_timer);
Either::Right(
batching_deadline_storage.as_mut().expect("we just set it"),
)
}
}
}