This commit is contained in:
Christian Schwarz
2024-11-19 19:01:55 +01:00
parent 61ff84a3a2
commit 911946a3cd

View File

@@ -637,7 +637,6 @@ impl PageServerHandler {
loop {
// Create a future that will become ready when we need to stop batching.
// If there's carry, take the time it already spent batching into consideration.
use futures::future::Either;
let batching_deadline = match (
&*maybe_carry as &Option<Carry>,
@@ -658,9 +657,8 @@ impl PageServerHandler {
}
Some(batch_timeout) => {
// Take into consideration the time the carry spent waiting.
let now = Instant::now();
let batch_timeout =
batch_timeout.saturating_sub(now - carry.started_at);
batch_timeout.saturating_sub(carry.started_at.elapsed());
if batch_timeout.is_zero() {
// the timer doesn't support restarting with zero duration
return Ok(BatchOrEof::Batch(smallvec::smallvec![
@@ -816,8 +814,10 @@ impl PageServerHandler {
}
};
// check if we can batch
match (maybe_carry.take(), this_msg) {
//
// batch
//
match (maybe_carry.as_mut(), this_msg) {
(None, this_msg) => {
*maybe_carry = Some(Carry { msg: this_msg, started_at: msg_start });
}
@@ -825,7 +825,7 @@ impl PageServerHandler {
Some(Carry { msg: BatchedFeMessage::GetPage {
span: _,
shard: accum_shard,
pages: mut accum_pages,
pages: ref mut accum_pages,
effective_request_lsn: accum_lsn,
}, started_at: _}),
BatchedFeMessage::GetPage {
@@ -851,7 +851,7 @@ impl PageServerHandler {
}
// the vectored get currently only supports a single LSN, so, bounce as soon
// as the effective request_lsn changes
if accum_lsn != this_lsn {
if *accum_lsn != this_lsn {
trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed");
return false;
}
@@ -864,10 +864,11 @@ impl PageServerHandler {
}
(Some(carry), this_msg) => {
// by default, don't continue batching
*maybe_carry = Some(Carry {
msg: this_msg,
started_at: msg_start,
});
let carry = std::mem::replace(carry,
Carry {
msg: this_msg,
started_at: msg_start,
});
return Ok(BatchOrEof::Batch(smallvec::smallvec![carry.msg]));
}
}