From db9093f938a2c047c069b148fa31a454b8681981 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 21 Nov 2024 22:07:05 +0100 Subject: [PATCH] revert back to 'span fixes' commit --- pageserver/src/page_service.rs | 163 ++++++++++++++------------------- 1 file changed, 71 insertions(+), 92 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index f2620d11f8..c272ccaf24 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -6,8 +6,6 @@ use async_compression::tokio::write::GzipEncoder; use bytes::Buf; use futures::FutureExt; use itertools::Itertools; -use nix::libc::SCTP_CURRENT_ASSOC; -use nix::sys::wait; use once_cell::sync::OnceCell; use pageserver_api::models::{self, TenantState}; use pageserver_api::models::{ @@ -24,7 +22,6 @@ use postgres_backend::{ use pq_proto::framed::ConnectionError; use pq_proto::FeStartupPacket; use pq_proto::{BeMessage, FeMessage, RowDescriptor}; -use smallvec::SmallVec; use std::borrow::Cow; use std::io; use std::str; @@ -34,7 +31,6 @@ use std::time::SystemTime; use std::time::{Duration, Instant}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncWriteExt, BufWriter}; -use tokio::sync::Notify; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::*; @@ -754,20 +750,24 @@ impl PageServerHandler { Ok(Some(Box::new(batched_msg))) } - /// POst-condition: maybe_carry.is_some() + /// Post-condition: `maybe_carry` is Some() #[instrument(skip_all, level = tracing::Level::TRACE)] - fn pagestream_do_batch( + async fn pagestream_do_batch( maybe_carry: &mut Option>, - mut this_msg: Box, + this_msg: Box, ) -> Option> { debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); match (maybe_carry.as_deref_mut(), *this_msg) { - // nothing batched yet - (None, this_msg) => { + // new batch + (None, this_msg @ BatchedFeMessage::GetPage { .. }) => { *maybe_carry = Some(Box::new(this_msg)); // TODO: avoid this re-boxing None } + // nothing batched yet and this message is unbatchable + (None, this_msg) => { + Some(Box::new(this_msg)) // TODO: avoid this re-boxing + } // something batched already, let's see if we can add this message to the batch ( Some(BatchedFeMessage::GetPage { @@ -783,7 +783,7 @@ impl PageServerHandler { pages: this_pages, effective_request_lsn: this_lsn, }, - ) if (|| { + ) if async { assert_eq!(this_pages.len(), 1); if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize { trace!(%accum_lsn, %this_lsn, "stopping batching because of batch size"); @@ -805,15 +805,19 @@ impl PageServerHandler { return false; } true - })() => + } + .await => { // ok to batch accum_pages.extend(this_pages); None } // something batched already but this message is unbatchable - (Some(_), this_msg_derefed) => { - Some(Box::new(this_msg_derefed)) // TODO: avoid this re-boxing + (Some(_), this_msg) => { + // by default, don't continue batching + let this_msg = Box::new(this_msg); // TODO: avoid this box + let carry = maybe_carry.replace(this_msg).expect("this match arm checks it's Some()"); + Some(carry) } } } @@ -1048,97 +1052,72 @@ impl PageServerHandler { .instrument(tracing::info_span!("read_protocol")), ); - let current_batch: Arc>>> = - Default::default(); - let notify_data_ready = Arc::new(Notify::new()); - let notify_data_emptied = Arc::new(Notify::new()); - let mut batcher = tokio::spawn({ - let current_batch = current_batch.clone(); - let notify_data_ready = notify_data_ready.clone(); - let notify_data_emptied = notify_data_emptied.clone(); - let cancel = self.cancel.child_token(); - async move { - scopeguard::defer! { - debug!("exiting"); - } - loop { - match requests_rx.recv().await { - // NB: no need to select! for cancellation here because upstream checks - Some(mut req) => { - loop { - let wait_emptied = { - let mut guard = current_batch.lock().unwrap(); - match Self::pagestream_do_batch(&mut guard, req) { - None => { - // we successfully batched the message - notify_data_ready.notify_one(); - break; - } - Some(unbatched_req) => { - let emptied = notify_data_emptied.notified(); - let mut wait_for_emptied = Box::pin(emptied); // FIXME: noalloc - if wait_for_emptied.as_mut().enable() { - panic!("impossible, we're holding the lock"); - } - drop(guard); - req = unbatched_req; // assign back to retry in next iteration - wait_for_emptied - } - } - }; - tokio::select! { - () = wait_emptied => { - debug!("notified that batch was emptied"); - } - _ = cancel.cancelled() => { - debug!("cancellation requested"); - break; - } + let ready_for_next_batch = Arc::new(tokio::sync::Notify::new()); + let (batched_tx, mut batched_rx) = tokio::sync::mpsc::channel(1); + tokio::spawn( + { + let ready_for_next_batch = Arc::clone(&ready_for_next_batch); + async move { + scopeguard::defer! { + debug!("exiting"); + } + let mut batch: Option> = None; + loop { + let maybe_flush_msg = tokio::select! { + req = requests_rx.recv() => { + if let Some(req) = req { + Self::pagestream_do_batch(&mut batch, req).await + } else { + debug!("request processing pipeline upstream dead"); + std::mem::take(&mut batch) } + }, + () = ready_for_next_batch.notified() => { + debug!("downstream ready, flushing batch early if any available"); + std::mem::take(&mut batch) + } + }; + let flush_msg = match maybe_flush_msg { + None => { + continue; + } + Some(flush_msg) => flush_msg, + }; + match batched_tx.send(flush_msg).await { + Ok(()) => {} + Err(_) => { + debug!("downstream is gone"); + break; } - } - None => { - debug!("request processing pipeline upstream dead"); - return; } } } } - .instrument(tracing::info_span!("batching")) - }); + .instrument(tracing::info_span!("batching")), + ); loop { - let batched = loop { - let notified = { - let mut guard = current_batch.lock().unwrap(); - if let Some(batched) = guard.take() { - notify_data_emptied.notify_one(); - drop(guard); - break batched; - } else { - // cold path: nothing was batched - let notified = notify_data_ready.notified(); - let mut notified = Box::pin(notified); // FIXME: no alloc please - if notified.as_mut().enable() { - panic!("impossible, we're holding the lock"); + let batch = match batched_rx.try_recv() { + Ok(batch) => batch, + Err(tokio::sync::mpsc::error::TryRecvError::Empty) => { + ready_for_next_batch.notify_one(); + match batched_rx.recv().await { + Some(batch) => batch, + None => { + debug!( + "batched_rx observed as disconnected while waiting for next batch" + ); + break; } - drop(guard); - notified } - }; - tokio::select! { - batcher_res = &mut batcher => { - let _: () = batcher_res.context("batcher panicked")?; - break; - } - _ = notified => { - // we're notified that there's something to process - // => next iteration will find it - } - }; + } + Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => { + debug!("batched_rx observed as disconnected as part of try_recv()"); + break; + } }; debug!("processing batch"); - self.pagesteam_handle_batched_message(pgb, *batched, &ctx) + self.pagesteam_handle_batched_message(pgb, *batch, &ctx) .await?; }