async-timer based approach (again, with data)

Yep, it's clearly the best one with best batching factor at lowest CPU
usage.

https://www.notion.so/neondatabase/benchmarking-notes-143f189e004780c4a630cb5f426e39ba?pvs=4#144f189e004780d0a205e081458b46db
This commit is contained in:
Christian Schwarz
2024-11-20 14:36:46 +01:00
parent f9bf038d2c
commit 689788cbba
5 changed files with 31 additions and 4 deletions

View File

@@ -3,6 +3,7 @@
use anyhow::{bail, Context};
use async_compression::tokio::write::GzipEncoder;
use async_timer::Oneshot;
use bytes::Buf;
use futures::FutureExt;
use itertools::Itertools;
@@ -22,6 +23,7 @@ use pq_proto::FeStartupPacket;
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
use std::borrow::Cow;
use std::io;
use std::pin::Pin;
use std::str;
use std::str::FromStr;
use std::sync::Arc;
@@ -316,6 +318,8 @@ struct PageServerHandler {
/// See [`PageServerConf::server_side_batch_timeout`]
server_side_batch_timeout: Option<Duration>,
server_side_batch_timer: Pin<Box<async_timer::oneshot::Timer>>,
}
struct Carry {
@@ -585,6 +589,9 @@ impl PageServerHandler {
timeline_handles: TimelineHandles::new(tenant_manager),
cancel,
server_side_batch_timeout,
server_side_batch_timer: Box::pin(async_timer::oneshot::Timer::new(
Duration::from_secs(999),
)), // reset each iteration
}
}
@@ -661,9 +668,13 @@ impl PageServerHandler {
.msg
]));
} else {
batching_deadline_storage = Some(Box::pin(async move {
tokio::time::sleep(batch_timeout).await;
}));
std::future::poll_fn(|ctx| {
self.server_side_batch_timer
.restart(batch_timeout, ctx.waker());
std::task::Poll::Ready(())
})
.await;
batching_deadline_storage = Some(&mut self.server_side_batch_timer);
Either::Right(
batching_deadline_storage.as_mut().expect("we just set it"),
)