stop Box'ing stuff & clean up the passing-through of errors (remove enum Batch)

This commit is contained in:
Christian Schwarz
2024-11-29 16:14:03 +01:00
parent 53e18b24fc
commit 9b65b268ed

View File

@@ -620,7 +620,7 @@ impl PageServerHandler {
cancel: &CancellationToken,
ctx: &RequestContext,
parent_span: Span,
) -> Result<Option<Box<BatchedFeMessage>>, QueryError>
) -> Result<Option<BatchedFeMessage>, QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
{
@@ -700,7 +700,7 @@ impl PageServerHandler {
span,
error: $error,
};
Ok(Some(Box::new(error)))
Ok(Some(error))
}};
}
@@ -751,7 +751,7 @@ impl PageServerHandler {
}
}
};
Ok(Some(Box::new(batched_msg)))
Ok(Some(batched_msg))
}
/// Post-condition: `batch` is Some()
@@ -759,21 +759,25 @@ impl PageServerHandler {
#[allow(clippy::boxed_local)]
fn pagestream_do_batch(
max_batch_size: NonZeroUsize,
batch: &mut Box<BatchedFeMessage>,
this_msg: Box<BatchedFeMessage>,
) -> Result<(), Box<BatchedFeMessage>> {
batch: &mut Result<BatchedFeMessage, QueryError>,
this_msg: Result<BatchedFeMessage, QueryError>,
) -> Result<(), Result<BatchedFeMessage, QueryError>> {
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
match (&mut **batch, *this_msg) {
let this_msg = match this_msg {
Ok(this_msg) => this_msg,
Err(e) => return Err(Err(e)),
};
match (&mut *batch, this_msg) {
// something batched already, let's see if we can add this message to the batch
(
BatchedFeMessage::GetPage {
Ok(BatchedFeMessage::GetPage {
span: _,
shard: accum_shard,
pages: ref mut accum_pages,
effective_request_lsn: accum_lsn,
},
// would be nice to have box pattern here
}),
BatchedFeMessage::GetPage {
span: _,
shard: this_shard,
@@ -811,7 +815,7 @@ impl PageServerHandler {
// something batched already but this message is unbatchable
(_, this_msg) => {
// by default, don't continue batching
Err(Box::new(this_msg)) // TODO: avoid re-box
Err(Ok(this_msg))
}
}
}
@@ -1085,7 +1089,7 @@ impl PageServerHandler {
}
};
let err = self
.pagesteam_handle_batched_message(pgb_writer, *msg, &cancel, ctx)
.pagesteam_handle_batched_message(pgb_writer, msg, &cancel, ctx)
.await;
match err {
Ok(()) => {}
@@ -1191,11 +1195,7 @@ impl PageServerHandler {
//
let cancel_batcher = self.cancel.child_token();
enum Batch {
Request(Box<BatchedFeMessage>),
ReadError(QueryError),
}
let (mut batch_tx, mut batch_rx) = spsc_fold::channel::<Batch>();
let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
let read_messages = pipeline_stage!(
"read_messages",
cancel_batcher.clone(),
@@ -1205,7 +1205,7 @@ impl PageServerHandler {
let mut pgb_reader = pgb_reader;
let mut exit = false;
while !exit {
let res = Self::pagestream_read_message(
let read_res = Self::pagestream_read_message(
&mut pgb_reader,
tenant_id,
timeline_id,
@@ -1215,37 +1215,17 @@ impl PageServerHandler {
request_span.clone(),
)
.await;
exit |= res.is_err();
let send_res = match res {
Ok(None) => {
debug!("sub-protocol client-initiated shutdown");
break;
}
Ok(Some(req)) => {
batch_tx
.send(Batch::Request(req), |batch, req| match (batch, req) {
(Batch::Request(ref mut batch), Batch::Request(req)) => {
Self::pagestream_do_batch(max_batch_size, batch, req)
.map_err(Batch::Request)
}
(Batch::Request(_), x @ Batch::ReadError(_)) => Err(x),
(
Batch::ReadError(_),
Batch::Request(_) | Batch::ReadError(_),
) => {
unreachable!(
"we exit from batcher after storing a read error"
);
}
})
.await
}
Err(e) => {
exit = true;
batch_tx.send(Batch::ReadError(e), |_, req| Err(req)).await
}
let Some(read_res) = read_res.transpose() else {
debug!("client-initiated shutdown");
break;
};
exit |= send_res.is_err();
exit |= read_res.is_err();
let could_send = batch_tx
.send(read_res, |batch, res| {
Self::pagestream_do_batch(max_batch_size, batch, res)
})
.await;
exit |= could_send.is_err();
}
(pgb_reader, timeline_handles)
}
@@ -1269,17 +1249,14 @@ impl PageServerHandler {
return Ok(());
}
};
match batch {
Batch::Request(batch) => {
self.pagesteam_handle_batched_message(
pgb_writer, *batch, &cancel, &ctx,
)
.await?;
}
Batch::ReadError(e) => {
let batch = match batch {
Ok(batch) => batch,
Err(e) => {
return Err(e);
}
}
};
self.pagesteam_handle_batched_message(pgb_writer, batch, &cancel, &ctx)
.await?;
}
}
});