tokio::time::Interval based approach

batching at 10us doesn't work well enough, prob the future is ready
too soon. batching factor is just 1.5

https://www.notion.so/neondatabase/benchmarking-notes-143f189e004780c4a630cb5f426e39ba?pvs=4#144f189e004780b79c8dd6d007dbb120
This commit is contained in:
Christian Schwarz
2024-11-20 15:12:34 +01:00
parent f3ed5692ea
commit 81d99704ee
2 changed files with 22 additions and 42 deletions

View File

@@ -316,6 +316,8 @@ struct PageServerHandler {
/// See [`PageServerConf::server_side_batch_timeout`]
server_side_batch_timeout: Option<Duration>,
server_side_batch_interval: Option<tokio::time::Interval>,
}
struct Carry {
@@ -585,6 +587,11 @@ impl PageServerHandler {
timeline_handles: TimelineHandles::new(tenant_manager),
cancel,
server_side_batch_timeout,
server_side_batch_interval: server_side_batch_timeout.map(|timeout| {
let mut itv = tokio::time::interval(timeout);
itv.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
itv
}),
}
}
@@ -626,51 +633,22 @@ impl PageServerHandler {
{
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
let mut batching_deadline_storage = None; // TODO: can this just be an unsync once_cell?
loop {
// Create a future that will become ready when we need to stop batching.
use futures::future::Either;
let batching_deadline = match (
&*maybe_carry as &Option<Carry>,
&mut batching_deadline_storage,
) {
(None, None) => Either::Left(futures::future::pending()), // there's no deadline before we have something batched
(None, Some(_)) => unreachable!(),
(Some(_), Some(fut)) => Either::Right(fut), // below arm already ran
(Some(carry), None) => {
match self.server_side_batch_timeout {
None => {
return Ok(BatchOrEof::Batch(smallvec::smallvec![
maybe_carry
.take()
.expect("we already checked it's Some")
.msg
]))
}
Some(batch_timeout) => {
// Take into consideration the time the carry spent waiting.
let batch_timeout =
batch_timeout.saturating_sub(carry.started_at.elapsed());
if batch_timeout.is_zero() {
// the timer doesn't support restarting with zero duration
return Ok(BatchOrEof::Batch(smallvec::smallvec![
maybe_carry
.take()
.expect("we already checked it's Some")
.msg
]));
} else {
batching_deadline_storage = Some(Box::pin(async move {
tokio::time::sleep(batch_timeout).await;
}));
Either::Right(
batching_deadline_storage.as_mut().expect("we just set it"),
)
}
}
let batching_deadline = match &*maybe_carry as &Option<Carry> {
None => Either::Left(futures::future::pending()), // there's no deadline before we have something batched
Some(carry) => match &mut self.server_side_batch_interval {
None => {
return Ok(BatchOrEof::Batch(smallvec::smallvec![
maybe_carry
.take()
.expect("we already checked it's Some")
.msg
]))
}
}
Some(interval) => Either::Right(interval.tick()),
},
};
let msg = tokio::select! {
biased;

View File

@@ -9,7 +9,7 @@ from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.utils import humantime_to_ms
TARGET_RUNTIME = 60
TARGET_RUNTIME = 5
@pytest.mark.parametrize(
@@ -18,10 +18,12 @@ TARGET_RUNTIME = 60
# the next 4 cases demonstrate how not-batchable workloads suffer from batching timeout
(50, None, TARGET_RUNTIME, 1, 128, "not batchable no batching"),
(50, "10us", TARGET_RUNTIME, 1, 128, "not batchable 10us timeout"),
(50, "20us", TARGET_RUNTIME, 1, 128, "not batchable 20us timeout"),
(50, "1ms", TARGET_RUNTIME, 1, 128, "not batchable 1ms timeout"),
# the next 4 cases demonstrate how batchable workloads benefit from batching
(50, None, TARGET_RUNTIME, 100, 128, "batchable no batching"),
(50, "10us", TARGET_RUNTIME, 100, 128, "batchable 10us timeout"),
(50, "20us", TARGET_RUNTIME, 100, 128, "batchable 20us timeout"),
(50, "100us", TARGET_RUNTIME, 100, 128, "batchable 100us timeout"),
(50, "1ms", TARGET_RUNTIME, 100, 128, "batchable 1ms timeout"),
],