From 56de07154edeb5761a1e11a0850e0a789d167ea5 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 21 Nov 2024 20:46:56 +0100 Subject: [PATCH] fruitless debugging --- pageserver/src/page_service.rs | 148 +++++++++++++++++---------------- 1 file changed, 76 insertions(+), 72 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index c272ccaf24..412463d1e5 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -22,6 +22,7 @@ use postgres_backend::{ use pq_proto::framed::ConnectionError; use pq_proto::FeStartupPacket; use pq_proto::{BeMessage, FeMessage, RowDescriptor}; +use smallvec::SmallVec; use std::borrow::Cow; use std::io; use std::str; @@ -754,72 +755,79 @@ impl PageServerHandler { #[instrument(skip_all, level = tracing::Level::TRACE)] async fn pagestream_do_batch( maybe_carry: &mut Option>, - this_msg: Box, - ) -> Option> { + mut this_msg: Box, + ) -> smallvec::SmallVec<[Box; 2]> { debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); - match (maybe_carry.as_deref_mut(), *this_msg) { - // new batch - (None, this_msg @ BatchedFeMessage::GetPage { .. }) => { - *maybe_carry = Some(Box::new(this_msg)); // TODO: avoid this re-boxing - None - } - // nothing batched yet and this message is unbatchable - (None, this_msg) => { - Some(Box::new(this_msg)) // TODO: avoid this re-boxing - } - // something batched already, let's see if we can add this message to the batch - ( - Some(BatchedFeMessage::GetPage { - span: _, - shard: accum_shard, - pages: ref mut accum_pages, - effective_request_lsn: accum_lsn, - }), - // would be nice to have box pattern here - BatchedFeMessage::GetPage { - span: _, - shard: this_shard, - pages: this_pages, - effective_request_lsn: this_lsn, - }, - ) if async { - assert_eq!(this_pages.len(), 1); - if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize { - trace!(%accum_lsn, %this_lsn, "stopping batching because of batch size"); - assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize); - return false; + let mut ret = SmallVec::new(); + + loop { + match (maybe_carry.as_deref_mut(), *this_msg) { + // new batch + (None, this_msg @ BatchedFeMessage::GetPage { .. }) => { + *maybe_carry = Some(Box::new(this_msg)); // TODO: avoid this re-boxing + break; } - if (accum_shard.tenant_shard_id, accum_shard.timeline_id) - != (this_shard.tenant_shard_id, this_shard.timeline_id) + // nothing batched yet and this message is unbatchable + (None, this_msg) => { + ret.push(Box::new(this_msg)); // TODO: avoid this re-boxing + break; + } + // something batched already, let's see if we can add this message to the batch + ( + Some(BatchedFeMessage::GetPage { + span: _, + shard: accum_shard, + pages: ref mut accum_pages, + effective_request_lsn: accum_lsn, + }), + // would be nice to have box pattern here + BatchedFeMessage::GetPage { + span: _, + shard: this_shard, + pages: this_pages, + effective_request_lsn: this_lsn, + }, + ) if async { + assert_eq!(this_pages.len(), 1); + if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize { + trace!(%accum_lsn, %this_lsn, "stopping batching because of batch size"); + assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize); + return false; + } + if (accum_shard.tenant_shard_id, accum_shard.timeline_id) + != (this_shard.tenant_shard_id, this_shard.timeline_id) + { + trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch"); + // TODO: we _could_ batch & execute each shard seperately (and in parallel). + // But the current logic for keeping responses in order does not support that. + return false; + } + // the vectored get currently only supports a single LSN, so, bounce as soon + // as the effective request_lsn changes + if *accum_lsn != this_lsn { + trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed"); + return false; + } + true + } + .await => { - trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch"); - // TODO: we _could_ batch & execute each shard seperately (and in parallel). - // But the current logic for keeping responses in order does not support that. - return false; + // ok to batch + accum_pages.extend(this_pages); + break; } - // the vectored get currently only supports a single LSN, so, bounce as soon - // as the effective request_lsn changes - if *accum_lsn != this_lsn { - trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed"); - return false; + // something batched already but this message is unbatchable + (Some(_), this_msg_derefed) => { + let carry = maybe_carry.take().expect("this match arm checks it's Some()"); + ret.push(carry); + this_msg = Box::new(this_msg_derefed); // TODO: avoid this re-boxing + // next iteration will decide what to do } - true - } - .await => - { - // ok to batch - accum_pages.extend(this_pages); - None - } - // something batched already but this message is unbatchable - (Some(_), this_msg) => { - // by default, don't continue batching - let this_msg = Box::new(this_msg); // TODO: avoid this box - let carry = maybe_carry.replace(this_msg).expect("this match arm checks it's Some()"); - Some(carry) } } + assert!(!ret.spilled()); + ret } #[instrument(level = tracing::Level::DEBUG, skip_all)] @@ -1063,31 +1071,27 @@ impl PageServerHandler { } let mut batch: Option> = None; loop { - let maybe_flush_msg = tokio::select! { + let flush_msgs: smallvec::SmallVec<[_; 2]> = tokio::select! { req = requests_rx.recv() => { if let Some(req) = req { Self::pagestream_do_batch(&mut batch, req).await } else { debug!("request processing pipeline upstream dead"); - std::mem::take(&mut batch) + std::mem::take(&mut batch).into_iter().collect() } }, () = ready_for_next_batch.notified() => { debug!("downstream ready, flushing batch early if any available"); - std::mem::take(&mut batch) + std::mem::take(&mut batch).into_iter().collect() } }; - let flush_msg = match maybe_flush_msg { - None => { - continue; - } - Some(flush_msg) => flush_msg, - }; - match batched_tx.send(flush_msg).await { - Ok(()) => {} - Err(_) => { - debug!("downstream is gone"); - break; + for msg in flush_msgs { + match batched_tx.send(msg).await { + Ok(()) => {} + Err(tokio::sync::mpsc::error::SendError(_)) => { + debug!("downstream is gone"); + break; + } } } }