From 88fd8aed52c262987952fbb49ce37325467a758e Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 21 Nov 2024 22:55:10 +0100 Subject: [PATCH] watch-based approach --- pageserver/src/page_service.rs | 138 +++++++++++++++++++-------------- 1 file changed, 78 insertions(+), 60 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index c272ccaf24..ef7d9de752 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -752,21 +752,17 @@ impl PageServerHandler { /// Post-condition: `maybe_carry` is Some() #[instrument(skip_all, level = tracing::Level::TRACE)] - async fn pagestream_do_batch( + fn pagestream_do_batch( maybe_carry: &mut Option>, this_msg: Box, ) -> Option> { debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); match (maybe_carry.as_deref_mut(), *this_msg) { - // new batch - (None, this_msg @ BatchedFeMessage::GetPage { .. }) => { - *maybe_carry = Some(Box::new(this_msg)); // TODO: avoid this re-boxing - None - } - // nothing batched yet and this message is unbatchable + // nothing batched yet (None, this_msg) => { - Some(Box::new(this_msg)) // TODO: avoid this re-boxing + *maybe_carry = Some(Box::new(this_msg)); + None } // something batched already, let's see if we can add this message to the batch ( @@ -783,7 +779,7 @@ impl PageServerHandler { pages: this_pages, effective_request_lsn: this_lsn, }, - ) if async { + ) if (|| { assert_eq!(this_pages.len(), 1); if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize { trace!(%accum_lsn, %this_lsn, "stopping batching because of batch size"); @@ -805,8 +801,7 @@ impl PageServerHandler { return false; } true - } - .await => + })() => { // ok to batch accum_pages.extend(this_pages); @@ -815,9 +810,7 @@ impl PageServerHandler { // something batched already but this message is unbatchable (Some(_), this_msg) => { // by default, don't continue batching - let this_msg = Box::new(this_msg); // TODO: avoid this box - let carry = maybe_carry.replace(this_msg).expect("this match arm checks it's Some()"); - Some(carry) + Some(Box::new(this_msg)) // TODO: avoid re-box } } } @@ -1011,7 +1004,7 @@ impl PageServerHandler { let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1); let request_span = info_span!("request", shard_id = tracing::field::Empty); - let read_message_task = tokio::spawn( + let read_message_task: JoinHandle> = tokio::spawn( { let cancel = self.cancel.child_token(); let ctx = ctx.attached_child(); @@ -1046,48 +1039,69 @@ impl PageServerHandler { } } } - anyhow::Ok((pgb_reader, timeline_handles)) + Ok((pgb_reader, timeline_handles)) } } .instrument(tracing::info_span!("read_protocol")), ); - let ready_for_next_batch = Arc::new(tokio::sync::Notify::new()); - let (batched_tx, mut batched_rx) = tokio::sync::mpsc::channel(1); + enum BatchState { + Building(Option>), + UpstreamDead(Option>), + } + impl BatchState { + fn must_building_mut(&mut self) -> &mut Option> { + match self { + Self::Building(maybe_batch) => maybe_batch, + Self::UpstreamDead(_) => panic!("upstream dead"), + } + } + } + let (batch_tx, mut batch_rx) = tokio::sync::watch::channel(Arc::new( + std::sync::Mutex::new(BatchState::Building(None)), + )); + let notify_batcher = Arc::new(tokio::sync::Notify::new()); tokio::spawn( { - let ready_for_next_batch = Arc::clone(&ready_for_next_batch); + let notify_batcher = notify_batcher.clone(); async move { scopeguard::defer! { debug!("exiting"); } - let mut batch: Option> = None; loop { - let maybe_flush_msg = tokio::select! { - req = requests_rx.recv() => { - if let Some(req) = req { - Self::pagestream_do_batch(&mut batch, req).await - } else { - debug!("request processing pipeline upstream dead"); - std::mem::take(&mut batch) + let maybe_req = requests_rx.recv().await; + let Some(req) = maybe_req else { + batch_tx.send_modify(|pending_batch| { + let mut guard = pending_batch.lock().unwrap(); + match &mut *guard { + BatchState::Building(carry) => { + *guard = BatchState::UpstreamDead(carry.take()); + } + BatchState::UpstreamDead(_) => panic!("twice"), } - }, - () = ready_for_next_batch.notified() => { - debug!("downstream ready, flushing batch early if any available"); - std::mem::take(&mut batch) - } + }); + break; }; - let flush_msg = match maybe_flush_msg { - None => { - continue; - } - Some(flush_msg) => flush_msg, - }; - match batched_tx.send(flush_msg).await { - Ok(()) => {} - Err(_) => { - debug!("downstream is gone"); + // don't read new requests before this one has been processed + let mut req = Some(req); + loop { + let mut wait_notified = None; + let batched = batch_tx.send_if_modified(|pending_batch| { + let mut guard = pending_batch.lock().unwrap(); + let building = guard.must_building_mut(); + match Self::pagestream_do_batch(building, req.take().unwrap()) { + Some(req_was_not_batched) => { + req.replace(req_was_not_batched); + wait_notified = Some(notify_batcher.notified()); + false + } + None => true, + } + }); + if batched { break; + } else { + wait_notified.unwrap().await; } } } @@ -1096,26 +1110,30 @@ impl PageServerHandler { .instrument(tracing::info_span!("batching")), ); - loop { - let batch = match batched_rx.try_recv() { - Ok(batch) => batch, - Err(tokio::sync::mpsc::error::TryRecvError::Empty) => { - ready_for_next_batch.notify_one(); - match batched_rx.recv().await { - Some(batch) => batch, - None => { - debug!( - "batched_rx observed as disconnected while waiting for next batch" - ); - break; - } - } - } - Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => { - debug!("batched_rx observed as disconnected as part of try_recv()"); - break; + let mut stop = false; + while !stop { + match batch_rx.changed().await { + Ok(()) => {} + Err(_) => { + debug!("batch_rx observed disconnection of batcher"); } }; + let maybe_batch = { + let borrow = batch_rx.borrow(); + let mut guard = borrow.lock().unwrap(); + match &mut *guard { + BatchState::Building(maybe_batch) => maybe_batch.take(), + BatchState::UpstreamDead(maybe_batch) => { + debug!("upstream dead"); + stop = true; + maybe_batch.take() + } + } + }; + let Some(batch) = maybe_batch else { + break; + }; + notify_batcher.notify_one(); debug!("processing batch"); self.pagesteam_handle_batched_message(pgb, *batch, &ctx) .await?;