mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
try interval-based impl to cross-chec
=> zero batching https://www.notion.so/neondatabase/benchmarking-notes-143f189e004780c4a630cb5f426e39ba?pvs=4#144f189e00478065a9b3e51726082885
This commit is contained in:
@@ -23,7 +23,6 @@ use pq_proto::FeStartupPacket;
|
||||
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
|
||||
use std::borrow::Cow;
|
||||
use std::io;
|
||||
use std::pin::Pin;
|
||||
use std::str;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
@@ -319,7 +318,7 @@ struct PageServerHandler {
|
||||
/// See [`PageServerConf::server_side_batch_timeout`]
|
||||
server_side_batch_timeout: Option<Duration>,
|
||||
|
||||
server_side_batch_timer: Pin<Box<async_timer::timer::Platform>>,
|
||||
server_side_batch_interval: Option<async_timer::Interval>,
|
||||
}
|
||||
|
||||
struct Carry {
|
||||
@@ -589,7 +588,8 @@ impl PageServerHandler {
|
||||
timeline_handles: TimelineHandles::new(tenant_manager),
|
||||
cancel,
|
||||
server_side_batch_timeout,
|
||||
server_side_batch_timer: Box::pin(async_timer::new_timer(Duration::from_secs(999))), // reset each iteration
|
||||
server_side_batch_interval: server_side_batch_timeout
|
||||
.map(|timeout| async_timer::Interval::new(timeout)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -631,50 +631,22 @@ impl PageServerHandler {
|
||||
{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
|
||||
|
||||
let mut batching_deadline_storage = None; // TODO: can this just be an unsync once_cell?
|
||||
|
||||
loop {
|
||||
// Create a future that will become ready when we need to stop batching.
|
||||
use futures::future::Either;
|
||||
let batching_deadline = match (
|
||||
&*maybe_carry as &Option<Carry>,
|
||||
&mut batching_deadline_storage,
|
||||
) {
|
||||
(None, None) => Either::Left(futures::future::pending()), // there's no deadline before we have something batched
|
||||
(None, Some(_)) => unreachable!(),
|
||||
(Some(_), Some(fut)) => Either::Right(fut), // below arm already ran
|
||||
(Some(carry), None) => {
|
||||
match self.server_side_batch_timeout {
|
||||
None => {
|
||||
return Ok(BatchOrEof::Batch(smallvec::smallvec![
|
||||
maybe_carry
|
||||
.take()
|
||||
.expect("we already checked it's Some")
|
||||
.msg
|
||||
]))
|
||||
}
|
||||
Some(batch_timeout) => {
|
||||
// Take into consideration the time the carry spent waiting.
|
||||
let batch_timeout =
|
||||
batch_timeout.saturating_sub(carry.started_at.elapsed());
|
||||
if batch_timeout.is_zero() {
|
||||
// the timer doesn't support restarting with zero duration
|
||||
return Ok(BatchOrEof::Batch(smallvec::smallvec![
|
||||
maybe_carry
|
||||
.take()
|
||||
.expect("we already checked it's Some")
|
||||
.msg
|
||||
]));
|
||||
} else {
|
||||
self.server_side_batch_timer.restart(batch_timeout);
|
||||
batching_deadline_storage = Some(&mut self.server_side_batch_timer);
|
||||
Either::Right(
|
||||
batching_deadline_storage.as_mut().expect("we just set it"),
|
||||
)
|
||||
}
|
||||
}
|
||||
let batching_deadline = match &*maybe_carry as &Option<Carry> {
|
||||
None => Either::Left(futures::future::pending()), // there's no deadline before we have something batched
|
||||
Some(carry) => match &mut self.server_side_batch_interval {
|
||||
None => {
|
||||
return Ok(BatchOrEof::Batch(smallvec::smallvec![
|
||||
maybe_carry
|
||||
.take()
|
||||
.expect("we already checked it's Some")
|
||||
.msg
|
||||
]))
|
||||
}
|
||||
}
|
||||
Some(interval) => Either::Right(interval),
|
||||
},
|
||||
};
|
||||
let msg = tokio::select! {
|
||||
biased;
|
||||
|
||||
Reference in New Issue
Block a user