From 9b65b268eda6eb1f154f5b0fe0f3cc902a6cfe5b Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 29 Nov 2024 16:14:03 +0100 Subject: [PATCH] stop Box'ing stuff & clean up the passing-through of errors (remove enum Batch) --- pageserver/src/page_service.rs | 91 +++++++++++++--------------------- 1 file changed, 34 insertions(+), 57 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 85d4cf91f3..1645dfc9d0 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -620,7 +620,7 @@ impl PageServerHandler { cancel: &CancellationToken, ctx: &RequestContext, parent_span: Span, - ) -> Result>, QueryError> + ) -> Result, QueryError> where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, { @@ -700,7 +700,7 @@ impl PageServerHandler { span, error: $error, }; - Ok(Some(Box::new(error))) + Ok(Some(error)) }}; } @@ -751,7 +751,7 @@ impl PageServerHandler { } } }; - Ok(Some(Box::new(batched_msg))) + Ok(Some(batched_msg)) } /// Post-condition: `batch` is Some() @@ -759,21 +759,25 @@ impl PageServerHandler { #[allow(clippy::boxed_local)] fn pagestream_do_batch( max_batch_size: NonZeroUsize, - batch: &mut Box, - this_msg: Box, - ) -> Result<(), Box> { + batch: &mut Result, + this_msg: Result, + ) -> Result<(), Result> { debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); - match (&mut **batch, *this_msg) { + let this_msg = match this_msg { + Ok(this_msg) => this_msg, + Err(e) => return Err(Err(e)), + }; + + match (&mut *batch, this_msg) { // something batched already, let's see if we can add this message to the batch ( - BatchedFeMessage::GetPage { + Ok(BatchedFeMessage::GetPage { span: _, shard: accum_shard, pages: ref mut accum_pages, effective_request_lsn: accum_lsn, - }, - // would be nice to have box pattern here + }), BatchedFeMessage::GetPage { span: _, shard: this_shard, @@ -811,7 +815,7 @@ impl PageServerHandler { // something batched already but this message is unbatchable (_, this_msg) => { // by default, don't continue batching - Err(Box::new(this_msg)) // TODO: avoid re-box + Err(Ok(this_msg)) } } } @@ -1085,7 +1089,7 @@ impl PageServerHandler { } }; let err = self - .pagesteam_handle_batched_message(pgb_writer, *msg, &cancel, ctx) + .pagesteam_handle_batched_message(pgb_writer, msg, &cancel, ctx) .await; match err { Ok(()) => {} @@ -1191,11 +1195,7 @@ impl PageServerHandler { // let cancel_batcher = self.cancel.child_token(); - enum Batch { - Request(Box), - ReadError(QueryError), - } - let (mut batch_tx, mut batch_rx) = spsc_fold::channel::(); + let (mut batch_tx, mut batch_rx) = spsc_fold::channel(); let read_messages = pipeline_stage!( "read_messages", cancel_batcher.clone(), @@ -1205,7 +1205,7 @@ impl PageServerHandler { let mut pgb_reader = pgb_reader; let mut exit = false; while !exit { - let res = Self::pagestream_read_message( + let read_res = Self::pagestream_read_message( &mut pgb_reader, tenant_id, timeline_id, @@ -1215,37 +1215,17 @@ impl PageServerHandler { request_span.clone(), ) .await; - exit |= res.is_err(); - let send_res = match res { - Ok(None) => { - debug!("sub-protocol client-initiated shutdown"); - break; - } - Ok(Some(req)) => { - batch_tx - .send(Batch::Request(req), |batch, req| match (batch, req) { - (Batch::Request(ref mut batch), Batch::Request(req)) => { - Self::pagestream_do_batch(max_batch_size, batch, req) - .map_err(Batch::Request) - } - (Batch::Request(_), x @ Batch::ReadError(_)) => Err(x), - ( - Batch::ReadError(_), - Batch::Request(_) | Batch::ReadError(_), - ) => { - unreachable!( - "we exit from batcher after storing a read error" - ); - } - }) - .await - } - Err(e) => { - exit = true; - batch_tx.send(Batch::ReadError(e), |_, req| Err(req)).await - } + let Some(read_res) = read_res.transpose() else { + debug!("client-initiated shutdown"); + break; }; - exit |= send_res.is_err(); + exit |= read_res.is_err(); + let could_send = batch_tx + .send(read_res, |batch, res| { + Self::pagestream_do_batch(max_batch_size, batch, res) + }) + .await; + exit |= could_send.is_err(); } (pgb_reader, timeline_handles) } @@ -1269,17 +1249,14 @@ impl PageServerHandler { return Ok(()); } }; - match batch { - Batch::Request(batch) => { - self.pagesteam_handle_batched_message( - pgb_writer, *batch, &cancel, &ctx, - ) - .await?; - } - Batch::ReadError(e) => { + let batch = match batch { + Ok(batch) => batch, + Err(e) => { return Err(e); } - } + }; + self.pagesteam_handle_batched_message(pgb_writer, batch, &cancel, &ctx) + .await?; } } });