Revert "vanilla tokio based timer impl based on tokio::time::Sleep"

This reverts commit 517dda849f.
This commit is contained in:
Christian Schwarz
2024-11-20 20:17:49 +01:00
parent c68661dfb3
commit 89b6cb8eba

View File

@@ -22,7 +22,6 @@ use pq_proto::FeStartupPacket;
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
use std::borrow::Cow;
use std::io;
use std::pin::Pin;
use std::str;
use std::str::FromStr;
use std::sync::Arc;
@@ -317,8 +316,6 @@ struct PageServerHandler {
/// See [`PageServerConf::server_side_batch_timeout`]
server_side_batch_timeout: Option<Duration>,
server_side_batch_timer: Pin<Box<tokio::time::Sleep>>,
}
struct Carry {
@@ -588,7 +585,6 @@ impl PageServerHandler {
timeline_handles: TimelineHandles::new(tenant_manager),
cancel,
server_side_batch_timeout,
server_side_batch_timer: Box::pin(tokio::time::sleep(Duration::ZERO)),
}
}
@@ -654,12 +650,24 @@ impl PageServerHandler {
}
Some(batch_timeout) => {
// Take into consideration the time the carry spent waiting.
let batch_deadline = carry.started_at + batch_timeout;
self.server_side_batch_timer.as_mut().reset(batch_deadline.into());
batching_deadline_storage = Some(&mut self.server_side_batch_timer);
Either::Right(
batching_deadline_storage.as_mut().expect("we just set it"),
)
let batch_timeout =
batch_timeout.saturating_sub(carry.started_at.elapsed());
if batch_timeout.is_zero() {
// the timer doesn't support restarting with zero duration
return Ok(BatchOrEof::Batch(smallvec::smallvec![
maybe_carry
.take()
.expect("we already checked it's Some")
.msg
]));
} else {
batching_deadline_storage = Some(Box::pin(async move {
tokio::time::sleep(batch_timeout).await;
}));
Either::Right(
batching_deadline_storage.as_mut().expect("we just set it"),
)
}
}
}
}