From a23abb2cc0c88abf2dc615e37ab3ddc56d953c73 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 26 Nov 2024 13:30:40 +0100 Subject: [PATCH] adopt spsc_fold --- pageserver/src/page_service.rs | 176 +++++++-------------------------- 1 file changed, 38 insertions(+), 138 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index bbe90893cc..c8ea3a8ca7 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -36,6 +36,7 @@ use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::*; +use utils::sync::spsc_fold; use utils::{ auth::{Claims, Scope, SwappableJwtAuth}, id::{TenantId, TimelineId}, @@ -755,25 +756,20 @@ impl PageServerHandler { #[allow(clippy::boxed_local)] fn pagestream_do_batch( max_batch_size: NonZeroUsize, - batch: &mut Option>, + batch: &mut Box, this_msg: Box, - ) -> Option> { + ) -> Result<(), Box> { debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); - match (batch.as_deref_mut(), *this_msg) { - // nothing batched yet - (None, this_msg) => { - *batch = Some(Box::new(this_msg)); - None - } + match (&mut **batch, *this_msg) { // something batched already, let's see if we can add this message to the batch ( - Some(BatchedFeMessage::GetPage { + BatchedFeMessage::GetPage { span: _, shard: accum_shard, pages: ref mut accum_pages, effective_request_lsn: accum_lsn, - }), + }, // would be nice to have box pattern here BatchedFeMessage::GetPage { span: _, @@ -807,12 +803,12 @@ impl PageServerHandler { { // ok to batch accum_pages.extend(this_pages); - None + Ok(()) } // something batched already but this message is unbatchable - (Some(_), this_msg) => { + (_, this_msg) => { // by default, don't continue batching - Some(Box::new(this_msg)) // TODO: avoid re-box + Err(Box::new(this_msg)) // TODO: avoid re-box } } } @@ -1160,96 +1156,26 @@ impl PageServerHandler { // // Create Batching future. // - - enum BatchState { - Building(Option>), - ReadMessagesEnded(Option>), - ExecutorEnded, - } - let (batch_tx, mut batch_rx) = tokio::sync::watch::channel(Arc::new( - std::sync::Mutex::new(BatchState::Building(None)), - )); - let notify_batcher = Arc::new(tokio::sync::Notify::new()); - let batcher = { - let notify_batcher = notify_batcher.clone(); - async move { - scopeguard::defer! { - debug!("exiting"); - } - 'outer: loop { - let maybe_req = requests_rx.recv().await; - let Some(req) = maybe_req else { - batch_tx.send_modify(|pending_batch| { - let mut guard = pending_batch.lock().unwrap(); - match &mut *guard { - BatchState::Building(batch) => { - *guard = BatchState::ReadMessagesEnded(batch.take()); - } - BatchState::ReadMessagesEnded(_) => { - unreachable!("we exit the first time") - } - BatchState::ExecutorEnded => { - debug!("observing executor ended when reading upstream"); - } - } - }); + let (mut batch_tx, mut batch_rx) = spsc_fold::channel(); + let batcher = async move { + scopeguard::defer! { + debug!("exiting"); + } + loop { + let maybe_req = requests_rx.recv().await; + let Some(req) = maybe_req else { + break; + }; + let send_res = batch_tx + .send(req, |batch, req| { + Self::pagestream_do_batch(max_batch_size, batch, req) + }) + .await; + match send_res { + Ok(()) => {} + Err(spsc_fold::SendError::ReceiverGone) => { + debug!("downstream is gone"); break; - }; - // don't read new requests before this one has been processed - let mut req = Some(req); - loop { - let mut wait_notified = None; - enum Outcome { - Batched, - CannotBatchNeedWaitForExecutor, - ExecutorEndObserved, - Undefined, - } - let mut outcome = Outcome::Undefined; - batch_tx.send_if_modified(|pending_batch| { - let mut guard = pending_batch.lock().unwrap(); - let building = match &mut *guard { - BatchState::Building(building) => building, - BatchState::ReadMessagesEnded(_) => { - unreachable!("we would have bailed earlier") - } - BatchState::ExecutorEnded => { - debug!("observing executor ended when trying to batch"); - outcome = Outcome::ExecutorEndObserved; - return false; - } - }; - match Self::pagestream_do_batch( - max_batch_size, - building, - req.take().unwrap(), - ) { - Some(req_was_not_batched) => { - outcome = Outcome::CannotBatchNeedWaitForExecutor; - req.replace(req_was_not_batched); - wait_notified = Some(notify_batcher.notified()); - false - } - None => { - outcome = Outcome::Batched; - true - } - } - }); - match outcome { - Outcome::Batched => { - break; - } - Outcome::CannotBatchNeedWaitForExecutor => { - wait_notified.unwrap().await; - } - Outcome::ExecutorEndObserved => { - break 'outer; - } - Outcome::Undefined => { - unreachable!("send_if_modified should always be called") - } - } } } } @@ -1261,49 +1187,23 @@ impl PageServerHandler { // let executor = async { - let _guard = scopeguard::guard(batch_rx.clone(), |batch_rx| { + scopeguard::defer! { debug!("exiting"); - let borrow = batch_rx.borrow(); - let mut guard = borrow.lock().unwrap(); - match &*guard { - BatchState::Building(_) | BatchState::ReadMessagesEnded(_) => {} - BatchState::ExecutorEnded => unreachable!("we only set this here"), - } - *guard = BatchState::ExecutorEnded; - }); - let mut stop = false; - while !stop { - match batch_rx.changed().await { - Ok(()) => {} - Err(_) => { - debug!("batch_rx observed disconnection of batcher"); + }; + loop { + let batch = match batch_rx.recv().await { + Ok(batch) => batch, + Err(spsc_fold::RecvError::SenderGone) => { + debug!("upstream gone"); + break; } }; - let maybe_batch = { - let borrow = batch_rx.borrow(); - let mut guard = borrow.lock().unwrap(); - match &mut *guard { - BatchState::Building(maybe_batch) => maybe_batch.take(), - BatchState::ReadMessagesEnded(maybe_batch) => { - debug!("upstream dead"); - stop = true; - maybe_batch.take() - } - BatchState::ExecutorEnded => { - unreachable!("we break out of this loop after we set this state"); - } - } - }; - let Some(batch) = maybe_batch else { - break; - }; - notify_batcher.notify_one(); - debug!("processing batch"); self.pagesteam_handle_batched_message(pgb_writer, *batch, ctx) .await?; } Ok(()) - }; + } + .instrument(tracing::info_span!("executor")); // // Execute the stages until they exit.