From 408bc8fc716e6d4c33ba34b6208e056bfbb02144 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 21 Nov 2024 19:42:43 +0100 Subject: [PATCH] cleanups --- pageserver/src/page_service.rs | 72 +++++++++++++++++----------------- 1 file changed, 35 insertions(+), 37 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index d246e5e00b..1d7f230d4b 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -733,18 +733,25 @@ impl PageServerHandler { Ok(Some(Box::new(batched_msg))) } + /// Post-condition: `maybe_carry` is Some() #[instrument(skip_all, level = tracing::Level::TRACE)] async fn pagestream_do_batch( maybe_carry: &mut Option>, - arg: Option>, + this_msg: Box, ) -> Option> { debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); - match (maybe_carry.as_deref_mut(), arg.map(|x| *x)) { - (None, Some(arg)) => { - *maybe_carry = Some(Box::new(arg)); // TODO: avoid this boxing + match (maybe_carry.as_deref_mut(), *this_msg) { + // new batch + (None, this_msg @ BatchedFeMessage::GetPage { .. }) => { + *maybe_carry = Some(Box::new(this_msg)); // TODO: avoid this re-boxing None } + // nothing batched yet and this message is unbatchable + (None, this_msg) => { + Some(Box::new(this_msg)) // TODO: avoid this re-boxing + } + // something batched already, let's see if we can add this message to the batch ( Some(BatchedFeMessage::GetPage { span: _, @@ -753,12 +760,12 @@ impl PageServerHandler { effective_request_lsn: accum_lsn, }), // would be nice to have box pattern here - Some(BatchedFeMessage::GetPage { + BatchedFeMessage::GetPage { span: _, shard: this_shard, pages: this_pages, effective_request_lsn: this_lsn, - }), + }, ) if async { assert_eq!(this_pages.len(), 1); if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize { @@ -788,21 +795,17 @@ impl PageServerHandler { accum_pages.extend(this_pages); None } - (Some(_), Some(this_msg)) => { + // something batched already but this message is unbatchable + (Some(_), this_msg) => { // by default, don't continue batching let this_msg = Box::new(this_msg); // TODO: avoid this box let carry = maybe_carry.replace(this_msg).expect("this match arm checks it's Some()"); Some(carry) } - (Some(_), None) => { - Some(maybe_carry.take().expect("this match arm checks it's Some()")) - } - (None, None) => { - None // TODO: can we prevent this branch trhough the typesystem - } } } + #[instrument(level = tracing::Level::DEBUG, skip_all)] async fn pagesteam_handle_batched_message( &mut self, tenant_id: TenantId, @@ -1013,10 +1016,17 @@ impl PageServerHandler { &ctx, ) .await?; + let msg = match msg { + Some(msg) => msg, + None => { + debug!("pagestream subprotocol end observed"); + break; + } + }; match requests_tx.send(msg).await { Ok(()) => {} Err(tokio::sync::mpsc::error::SendError(_)) => { - debug!("request processing pipeline downstream dead"); + debug!("downstream is gone"); break; } } @@ -1034,45 +1044,32 @@ impl PageServerHandler { let ready_for_next_batch = Arc::clone(&ready_for_next_batch); async move { let mut batch: Option> = None; - let mut stop = false; - while !stop { + loop { let maybe_flush_msg = tokio::select! { req = requests_rx.recv() => { - let arg = match req { - Some(Some(req)) => Some(req), - Some(None) => { - debug!("upstream task observed end of pagestream protocol"); - None - } - None => { - debug!("upstream task observed protocol error"); - None - } - }; - if arg.is_none() { - stop = true; + if let Some(req) = req { + Self::pagestream_do_batch(&mut batch, req).await + } else { + debug!("request processing pipeline upstream dead"); + std::mem::take(&mut batch) } - Self::pagestream_do_batch(&mut batch, arg).await }, () = ready_for_next_batch.notified() => { - debug!("downstream ready, flushing batch early"); - // pass None so the batch gets flushed - Self::pagestream_do_batch(&mut batch, None).await + debug!("downstream ready, flushing batch early if any available"); + std::mem::take(&mut batch) } }; let flush_msg = match maybe_flush_msg { None => { - debug!("not batching"); continue; } Some(flush_msg) => flush_msg, }; - debug!("flushing batch"); match batched_tx.send(flush_msg).await { Ok(()) => {} Err(_) => { - debug!("batched messages consumer is gone"); - stop = true; + debug!("downstream is gone"); + break; } } } @@ -1101,6 +1098,7 @@ impl PageServerHandler { break; } }; + debug!("processing batch"); self.pagesteam_handle_batched_message(tenant_id, timeline_id, pgb, *batch, &ctx) .await?; }