diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 341ea8247d..f2620d11f8 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -7,6 +7,7 @@ use bytes::Buf; use futures::FutureExt; use itertools::Itertools; use nix::libc::SCTP_CURRENT_ASSOC; +use nix::sys::wait; use once_cell::sync::OnceCell; use pageserver_api::models::{self, TenantState}; use pageserver_api::models::{ @@ -762,15 +763,11 @@ impl PageServerHandler { debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); match (maybe_carry.as_deref_mut(), *this_msg) { - // new batch - (None, this_msg @ BatchedFeMessage::GetPage { .. }) => { + // nothing batched yet + (None, this_msg) => { *maybe_carry = Some(Box::new(this_msg)); // TODO: avoid this re-boxing None } - // nothing batched yet and this message is unbatchable - (None, this_msg) => { - Some(Box::new(this_msg)); // TODO: avoid this re-boxing - } // something batched already, let's see if we can add this message to the batch ( Some(BatchedFeMessage::GetPage { @@ -816,7 +813,7 @@ impl PageServerHandler { } // something batched already but this message is unbatchable (Some(_), this_msg_derefed) => { - Some(Box::new(*this_msg_derefed)) // TODO: avoid this re-boxing + Some(Box::new(this_msg_derefed)) // TODO: avoid this re-boxing } } } @@ -1055,37 +1052,48 @@ impl PageServerHandler { Default::default(); let notify_data_ready = Arc::new(Notify::new()); let notify_data_emptied = Arc::new(Notify::new()); - let batcher = tokio::spawn({ + let mut batcher = tokio::spawn({ let current_batch = current_batch.clone(); let notify_data_ready = notify_data_ready.clone(); let notify_data_emptied = notify_data_emptied.clone(); + let cancel = self.cancel.child_token(); async move { scopeguard::defer! { debug!("exiting"); } loop { match requests_rx.recv().await { - Some(req) => { - let mut guard = current_batch.lock().unwrap(); - match Self::pagestream_do_batch(&mut guard, req) { - Some(req) => { - let emptied = notify_data_emptied.notified(); - let emptied = std::pin::pin!(emptied); - if emptied.enable() { - panic!("impossible, we're holding the lock"); + // NB: no need to select! for cancellation here because upstream checks + Some(mut req) => { + loop { + let wait_emptied = { + let mut guard = current_batch.lock().unwrap(); + match Self::pagestream_do_batch(&mut guard, req) { + None => { + // we successfully batched the message + notify_data_ready.notify_one(); + break; + } + Some(unbatched_req) => { + let emptied = notify_data_emptied.notified(); + let mut wait_for_emptied = Box::pin(emptied); // FIXME: noalloc + if wait_for_emptied.as_mut().enable() { + panic!("impossible, we're holding the lock"); + } + drop(guard); + req = unbatched_req; // assign back to retry in next iteration + wait_for_emptied + } + } + }; + tokio::select! { + () = wait_emptied => { + debug!("notified that batch was emptied"); + } + _ = cancel.cancelled() => { + debug!("cancellation requested"); + break; } - drop(guard); - emptied.await; - let guard = current_batch.lock().unwrap(); - let prev = guard.replace(req); - assert!( - prev.is_none(), - "we were notified that data emptied but there was data" - ); - } - None => { - // we successfully batched the message - notify_data_ready.notify_one(); } } } @@ -1100,32 +1108,34 @@ impl PageServerHandler { }); loop { - let batched = { - // hot path: take whatever got batched since last await - let guard = current_batch.lock().unwrap(); - if let Some(batched) = guard.take() { - notify_data_emptied.notify_one(); - drop(guard); - batched - } else { - // cold path: nothing was batched - let notified = notify_data_ready.notified(); - let notified = std::pin::pin!(notified); - if notified.enable() { - panic!("impossible, we're holding the lock"); - } - drop(guard); - tokio::select! { - batcher_res = batcher => { - let _: () = batcher_res.context("batcher panicked")?; - } - _ = notified => { - // we're notified that there's something to process - } - }; + let batched = loop { + let notified = { let mut guard = current_batch.lock().unwrap(); - guard.take().unwrap() - } + if let Some(batched) = guard.take() { + notify_data_emptied.notify_one(); + drop(guard); + break batched; + } else { + // cold path: nothing was batched + let notified = notify_data_ready.notified(); + let mut notified = Box::pin(notified); // FIXME: no alloc please + if notified.as_mut().enable() { + panic!("impossible, we're holding the lock"); + } + drop(guard); + notified + } + }; + tokio::select! { + batcher_res = &mut batcher => { + let _: () = batcher_res.context("batcher panicked")?; + break; + } + _ = notified => { + // we're notified that there's something to process + // => next iteration will find it + } + }; }; debug!("processing batch"); self.pagesteam_handle_batched_message(pgb, *batched, &ctx)