mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-02 13:00:37 +00:00
tokio_timerfd::Interval
Resolution not high enough to do _any_ batching at 10us or 20us https://www.notion.so/neondatabase/benchmarking-notes-143f189e004780c4a630cb5f426e39ba?pvs=4#144f189e0047800fb74bd8f4ab6cf8e2
This commit is contained in:
23
Cargo.lock
generated
23
Cargo.lock
generated
@@ -3655,6 +3655,7 @@ dependencies = [
|
||||
"tokio-postgres",
|
||||
"tokio-stream",
|
||||
"tokio-tar",
|
||||
"tokio-timerfd",
|
||||
"tokio-util",
|
||||
"toml_edit",
|
||||
"tracing",
|
||||
@@ -6135,6 +6136,15 @@ dependencies = [
|
||||
"time-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "timerfd"
|
||||
version = "1.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "84e482e368cf7efa2c8b570f476e5b9fd9fd5e9b9219fc567832b05f13511091"
|
||||
dependencies = [
|
||||
"rustix",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tiny-keccak"
|
||||
version = "2.0.2"
|
||||
@@ -6319,6 +6329,19 @@ dependencies = [
|
||||
"xattr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-timerfd"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "87eecdae9a9b793843b1df7a64bc136f203443c1ca9889b3c4a39590afa51094"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"libc",
|
||||
"slab",
|
||||
"timerfd",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-tungstenite"
|
||||
version = "0.21.0"
|
||||
|
||||
@@ -177,6 +177,7 @@ tokio-postgres-rustls = "0.12.0"
|
||||
tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"]}
|
||||
tokio-stream = "0.1"
|
||||
tokio-tar = "0.3"
|
||||
tokio-timerfd = "0.2.0"
|
||||
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
|
||||
toml = "0.8"
|
||||
toml_edit = "0.22"
|
||||
|
||||
@@ -63,6 +63,7 @@ tokio-postgres.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
tokio-util.workspace = true
|
||||
toml_edit = { workspace = true, features = [ "serde" ] }
|
||||
tokio-timerfd.workspace = true
|
||||
tracing.workspace = true
|
||||
url.workspace = true
|
||||
walkdir.workspace = true
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
use anyhow::{bail, Context};
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use bytes::Buf;
|
||||
use futures::FutureExt;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use itertools::Itertools;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::models::{self, TenantState};
|
||||
@@ -316,6 +316,8 @@ struct PageServerHandler {
|
||||
|
||||
/// See [`PageServerConf::server_side_batch_timeout`]
|
||||
server_side_batch_timeout: Option<Duration>,
|
||||
|
||||
server_side_batch_interval: Option<tokio_timerfd::Interval>,
|
||||
}
|
||||
|
||||
struct Carry {
|
||||
@@ -585,6 +587,11 @@ impl PageServerHandler {
|
||||
timeline_handles: TimelineHandles::new(tenant_manager),
|
||||
cancel,
|
||||
server_side_batch_timeout,
|
||||
server_side_batch_interval: server_side_batch_timeout.map(|timeout| {
|
||||
// The timerfd missed tick behavior is equivalent to
|
||||
// tokio::time::MissedTickBehavior::Delay
|
||||
tokio_timerfd::Interval::new_interval(timeout).expect("TODO")
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -626,51 +633,22 @@ impl PageServerHandler {
|
||||
{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
|
||||
|
||||
let mut batching_deadline_storage = None; // TODO: can this just be an unsync once_cell?
|
||||
|
||||
loop {
|
||||
// Create a future that will become ready when we need to stop batching.
|
||||
use futures::future::Either;
|
||||
let batching_deadline = match (
|
||||
&*maybe_carry as &Option<Carry>,
|
||||
&mut batching_deadline_storage,
|
||||
) {
|
||||
(None, None) => Either::Left(futures::future::pending()), // there's no deadline before we have something batched
|
||||
(None, Some(_)) => unreachable!(),
|
||||
(Some(_), Some(fut)) => Either::Right(fut), // below arm already ran
|
||||
(Some(carry), None) => {
|
||||
match self.server_side_batch_timeout {
|
||||
None => {
|
||||
return Ok(BatchOrEof::Batch(smallvec::smallvec![
|
||||
maybe_carry
|
||||
.take()
|
||||
.expect("we already checked it's Some")
|
||||
.msg
|
||||
]))
|
||||
}
|
||||
Some(batch_timeout) => {
|
||||
// Take into consideration the time the carry spent waiting.
|
||||
let batch_timeout =
|
||||
batch_timeout.saturating_sub(carry.started_at.elapsed());
|
||||
if batch_timeout.is_zero() {
|
||||
// the timer doesn't support restarting with zero duration
|
||||
return Ok(BatchOrEof::Batch(smallvec::smallvec![
|
||||
maybe_carry
|
||||
.take()
|
||||
.expect("we already checked it's Some")
|
||||
.msg
|
||||
]));
|
||||
} else {
|
||||
batching_deadline_storage = Some(Box::pin(async move {
|
||||
tokio::time::sleep(batch_timeout).await;
|
||||
}));
|
||||
Either::Right(
|
||||
batching_deadline_storage.as_mut().expect("we just set it"),
|
||||
)
|
||||
}
|
||||
}
|
||||
let batching_deadline = match &*maybe_carry as &Option<Carry> {
|
||||
None => Either::Left(futures::future::pending()), // there's no deadline before we have something batched
|
||||
Some(carry) => match &mut self.server_side_batch_interval {
|
||||
None => {
|
||||
return Ok(BatchOrEof::Batch(smallvec::smallvec![
|
||||
maybe_carry
|
||||
.take()
|
||||
.expect("we already checked it's Some")
|
||||
.msg
|
||||
]))
|
||||
}
|
||||
}
|
||||
Some(interval) => Either::Right(interval.next()),
|
||||
},
|
||||
};
|
||||
let msg = tokio::select! {
|
||||
biased;
|
||||
|
||||
@@ -9,7 +9,7 @@ from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.utils import humantime_to_ms
|
||||
|
||||
TARGET_RUNTIME = 60
|
||||
TARGET_RUNTIME = 5
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@@ -18,10 +18,12 @@ TARGET_RUNTIME = 60
|
||||
# the next 4 cases demonstrate how not-batchable workloads suffer from batching timeout
|
||||
(50, None, TARGET_RUNTIME, 1, 128, "not batchable no batching"),
|
||||
(50, "10us", TARGET_RUNTIME, 1, 128, "not batchable 10us timeout"),
|
||||
(50, "20us", TARGET_RUNTIME, 1, 128, "not batchable 20us timeout"),
|
||||
(50, "1ms", TARGET_RUNTIME, 1, 128, "not batchable 1ms timeout"),
|
||||
# the next 4 cases demonstrate how batchable workloads benefit from batching
|
||||
(50, None, TARGET_RUNTIME, 100, 128, "batchable no batching"),
|
||||
(50, "10us", TARGET_RUNTIME, 100, 128, "batchable 10us timeout"),
|
||||
(50, "20us", TARGET_RUNTIME, 100, 128, "batchable 20us timeout"),
|
||||
(50, "100us", TARGET_RUNTIME, 100, 128, "batchable 100us timeout"),
|
||||
(50, "1ms", TARGET_RUNTIME, 100, 128, "batchable 1ms timeout"),
|
||||
],
|
||||
|
||||
Reference in New Issue
Block a user