diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index d25df2ef3e..c32f889958 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -637,7 +637,6 @@ impl PageServerHandler { loop { // Create a future that will become ready when we need to stop batching. - // If there's carry, take the time it already spent batching into consideration. use futures::future::Either; let batching_deadline = match ( &*maybe_carry as &Option, @@ -658,9 +657,8 @@ impl PageServerHandler { } Some(batch_timeout) => { // Take into consideration the time the carry spent waiting. - let now = Instant::now(); let batch_timeout = - batch_timeout.saturating_sub(now - carry.started_at); + batch_timeout.saturating_sub(carry.started_at.elapsed()); if batch_timeout.is_zero() { // the timer doesn't support restarting with zero duration return Ok(BatchOrEof::Batch(smallvec::smallvec![ @@ -816,8 +814,10 @@ impl PageServerHandler { } }; - // check if we can batch - match (maybe_carry.take(), this_msg) { + // + // batch + // + match (maybe_carry.as_mut(), this_msg) { (None, this_msg) => { *maybe_carry = Some(Carry { msg: this_msg, started_at: msg_start }); } @@ -825,7 +825,7 @@ impl PageServerHandler { Some(Carry { msg: BatchedFeMessage::GetPage { span: _, shard: accum_shard, - pages: mut accum_pages, + pages: ref mut accum_pages, effective_request_lsn: accum_lsn, }, started_at: _}), BatchedFeMessage::GetPage { @@ -851,7 +851,7 @@ impl PageServerHandler { } // the vectored get currently only supports a single LSN, so, bounce as soon // as the effective request_lsn changes - if accum_lsn != this_lsn { + if *accum_lsn != this_lsn { trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed"); return false; } @@ -864,10 +864,11 @@ impl PageServerHandler { } (Some(carry), this_msg) => { // by default, don't continue batching - *maybe_carry = Some(Carry { - msg: this_msg, - started_at: msg_start, - }); + let carry = std::mem::replace(carry, + Carry { + msg: this_msg, + started_at: msg_start, + }); return Ok(BatchOrEof::Batch(smallvec::smallvec![carry.msg])); } }