From 7680aa12a8dfc7943ded051f7bad1ddf21d195ec Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 21 Nov 2024 21:34:58 +0100 Subject: [PATCH] draft --- pageserver/src/page_service.rs | 237 +++++++++++++++++---------------- 1 file changed, 122 insertions(+), 115 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 412463d1e5..341ea8247d 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -6,6 +6,7 @@ use async_compression::tokio::write::GzipEncoder; use bytes::Buf; use futures::FutureExt; use itertools::Itertools; +use nix::libc::SCTP_CURRENT_ASSOC; use once_cell::sync::OnceCell; use pageserver_api::models::{self, TenantState}; use pageserver_api::models::{ @@ -32,6 +33,7 @@ use std::time::SystemTime; use std::time::{Duration, Instant}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncWriteExt, BufWriter}; +use tokio::sync::Notify; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::*; @@ -751,83 +753,72 @@ impl PageServerHandler { Ok(Some(Box::new(batched_msg))) } - /// Post-condition: `maybe_carry` is Some() + /// POst-condition: maybe_carry.is_some() #[instrument(skip_all, level = tracing::Level::TRACE)] - async fn pagestream_do_batch( + fn pagestream_do_batch( maybe_carry: &mut Option>, mut this_msg: Box, - ) -> smallvec::SmallVec<[Box; 2]> { + ) -> Option> { debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); - let mut ret = SmallVec::new(); - - loop { - match (maybe_carry.as_deref_mut(), *this_msg) { - // new batch - (None, this_msg @ BatchedFeMessage::GetPage { .. }) => { - *maybe_carry = Some(Box::new(this_msg)); // TODO: avoid this re-boxing - break; + match (maybe_carry.as_deref_mut(), *this_msg) { + // new batch + (None, this_msg @ BatchedFeMessage::GetPage { .. }) => { + *maybe_carry = Some(Box::new(this_msg)); // TODO: avoid this re-boxing + None + } + // nothing batched yet and this message is unbatchable + (None, this_msg) => { + Some(Box::new(this_msg)); // TODO: avoid this re-boxing + } + // something batched already, let's see if we can add this message to the batch + ( + Some(BatchedFeMessage::GetPage { + span: _, + shard: accum_shard, + pages: ref mut accum_pages, + effective_request_lsn: accum_lsn, + }), + // would be nice to have box pattern here + BatchedFeMessage::GetPage { + span: _, + shard: this_shard, + pages: this_pages, + effective_request_lsn: this_lsn, + }, + ) if (|| { + assert_eq!(this_pages.len(), 1); + if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize { + trace!(%accum_lsn, %this_lsn, "stopping batching because of batch size"); + assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize); + return false; } - // nothing batched yet and this message is unbatchable - (None, this_msg) => { - ret.push(Box::new(this_msg)); // TODO: avoid this re-boxing - break; - } - // something batched already, let's see if we can add this message to the batch - ( - Some(BatchedFeMessage::GetPage { - span: _, - shard: accum_shard, - pages: ref mut accum_pages, - effective_request_lsn: accum_lsn, - }), - // would be nice to have box pattern here - BatchedFeMessage::GetPage { - span: _, - shard: this_shard, - pages: this_pages, - effective_request_lsn: this_lsn, - }, - ) if async { - assert_eq!(this_pages.len(), 1); - if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize { - trace!(%accum_lsn, %this_lsn, "stopping batching because of batch size"); - assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize); - return false; - } - if (accum_shard.tenant_shard_id, accum_shard.timeline_id) - != (this_shard.tenant_shard_id, this_shard.timeline_id) - { - trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch"); - // TODO: we _could_ batch & execute each shard seperately (and in parallel). - // But the current logic for keeping responses in order does not support that. - return false; - } - // the vectored get currently only supports a single LSN, so, bounce as soon - // as the effective request_lsn changes - if *accum_lsn != this_lsn { - trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed"); - return false; - } - true - } - .await => + if (accum_shard.tenant_shard_id, accum_shard.timeline_id) + != (this_shard.tenant_shard_id, this_shard.timeline_id) { - // ok to batch - accum_pages.extend(this_pages); - break; + trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch"); + // TODO: we _could_ batch & execute each shard seperately (and in parallel). + // But the current logic for keeping responses in order does not support that. + return false; } - // something batched already but this message is unbatchable - (Some(_), this_msg_derefed) => { - let carry = maybe_carry.take().expect("this match arm checks it's Some()"); - ret.push(carry); - this_msg = Box::new(this_msg_derefed); // TODO: avoid this re-boxing - // next iteration will decide what to do + // the vectored get currently only supports a single LSN, so, bounce as soon + // as the effective request_lsn changes + if *accum_lsn != this_lsn { + trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed"); + return false; } + true + })() => + { + // ok to batch + accum_pages.extend(this_pages); + None + } + // something batched already but this message is unbatchable + (Some(_), this_msg_derefed) => { + Some(Box::new(*this_msg_derefed)) // TODO: avoid this re-boxing } } - assert!(!ret.spilled()); - ret } #[instrument(level = tracing::Level::DEBUG, skip_all)] @@ -1060,68 +1051,84 @@ impl PageServerHandler { .instrument(tracing::info_span!("read_protocol")), ); - let ready_for_next_batch = Arc::new(tokio::sync::Notify::new()); - let (batched_tx, mut batched_rx) = tokio::sync::mpsc::channel(1); - tokio::spawn( - { - let ready_for_next_batch = Arc::clone(&ready_for_next_batch); - async move { - scopeguard::defer! { - debug!("exiting"); - } - let mut batch: Option> = None; - loop { - let flush_msgs: smallvec::SmallVec<[_; 2]> = tokio::select! { - req = requests_rx.recv() => { - if let Some(req) = req { - Self::pagestream_do_batch(&mut batch, req).await - } else { - debug!("request processing pipeline upstream dead"); - std::mem::take(&mut batch).into_iter().collect() + let current_batch: Arc>>> = + Default::default(); + let notify_data_ready = Arc::new(Notify::new()); + let notify_data_emptied = Arc::new(Notify::new()); + let batcher = tokio::spawn({ + let current_batch = current_batch.clone(); + let notify_data_ready = notify_data_ready.clone(); + let notify_data_emptied = notify_data_emptied.clone(); + async move { + scopeguard::defer! { + debug!("exiting"); + } + loop { + match requests_rx.recv().await { + Some(req) => { + let mut guard = current_batch.lock().unwrap(); + match Self::pagestream_do_batch(&mut guard, req) { + Some(req) => { + let emptied = notify_data_emptied.notified(); + let emptied = std::pin::pin!(emptied); + if emptied.enable() { + panic!("impossible, we're holding the lock"); + } + drop(guard); + emptied.await; + let guard = current_batch.lock().unwrap(); + let prev = guard.replace(req); + assert!( + prev.is_none(), + "we were notified that data emptied but there was data" + ); } - }, - () = ready_for_next_batch.notified() => { - debug!("downstream ready, flushing batch early if any available"); - std::mem::take(&mut batch).into_iter().collect() - } - }; - for msg in flush_msgs { - match batched_tx.send(msg).await { - Ok(()) => {} - Err(tokio::sync::mpsc::error::SendError(_)) => { - debug!("downstream is gone"); - break; + None => { + // we successfully batched the message + notify_data_ready.notify_one(); } } } + None => { + debug!("request processing pipeline upstream dead"); + return; + } } } } - .instrument(tracing::info_span!("batching")), - ); + .instrument(tracing::info_span!("batching")) + }); loop { - let batch = match batched_rx.try_recv() { - Ok(batch) => batch, - Err(tokio::sync::mpsc::error::TryRecvError::Empty) => { - ready_for_next_batch.notify_one(); - match batched_rx.recv().await { - Some(batch) => batch, - None => { - debug!( - "batched_rx observed as disconnected while waiting for next batch" - ); - break; - } + let batched = { + // hot path: take whatever got batched since last await + let guard = current_batch.lock().unwrap(); + if let Some(batched) = guard.take() { + notify_data_emptied.notify_one(); + drop(guard); + batched + } else { + // cold path: nothing was batched + let notified = notify_data_ready.notified(); + let notified = std::pin::pin!(notified); + if notified.enable() { + panic!("impossible, we're holding the lock"); } - } - Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => { - debug!("batched_rx observed as disconnected as part of try_recv()"); - break; + drop(guard); + tokio::select! { + batcher_res = batcher => { + let _: () = batcher_res.context("batcher panicked")?; + } + _ = notified => { + // we're notified that there's something to process + } + }; + let mut guard = current_batch.lock().unwrap(); + guard.take().unwrap() } }; debug!("processing batch"); - self.pagesteam_handle_batched_message(pgb, *batch, &ctx) + self.pagesteam_handle_batched_message(pgb, *batched, &ctx) .await?; }