got the lifetimes to work

This commit is contained in:
Christian Schwarz
2024-11-29 19:13:59 +01:00
parent 3993979a98
commit 724519a21b

View File

@@ -554,7 +554,8 @@ enum BatchedFeMessage<'c> {
span: Span,
shard: timeline::handle::Handle<TenantManagerTypes>,
effective_request_lsn: Lsn,
pages: smallvec::SmallVec<[(RelTag, BlockNumber, GlobalAndPerTimelineHistogramTimer<'c>); 1]>,
pages:
smallvec::SmallVec<[(RelTag, BlockNumber, GlobalAndPerTimelineHistogramTimer<'c>); 1]>,
},
DbSize {
span: Span,
@@ -622,7 +623,7 @@ impl PageServerHandler {
timeline_id: TimelineId,
timeline_handles: &mut TimelineHandles,
cancel: &CancellationToken,
ctx: &RequestContext,
ctx: &'c RequestContext,
parent_span: Span,
) -> Result<Option<BatchedFeMessage<'c>>, QueryError>
where
@@ -666,9 +667,11 @@ impl PageServerHandler {
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let timer = shard
.query_metrics
.start_timer_at(metrics::SmgrQueryType::GetRelExists, received_at, ctx);
let timer = shard.query_metrics.start_timer_at(
metrics::SmgrQueryType::GetRelExists,
received_at,
ctx,
);
BatchedFeMessage::Exists {
span,
timer,
@@ -682,9 +685,11 @@ impl PageServerHandler {
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let timer = shard
.query_metrics
.start_timer_at(metrics::SmgrQueryType::GetRelSize, received_at, ctx);
let timer = shard.query_metrics.start_timer_at(
metrics::SmgrQueryType::GetRelSize,
received_at,
ctx,
);
BatchedFeMessage::Nblocks {
span,
timer,
@@ -698,9 +703,11 @@ impl PageServerHandler {
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let timer = shard
.query_metrics
.start_timer_at(metrics::SmgrQueryType::GetDbSize, received_at, ctx);
let timer = shard.query_metrics.start_timer_at(
metrics::SmgrQueryType::GetDbSize,
received_at,
ctx,
);
BatchedFeMessage::DbSize {
span,
timer,
@@ -714,9 +721,11 @@ impl PageServerHandler {
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let timer = shard
.query_metrics
.start_timer_at(metrics::SmgrQueryType::GetSlruSegment, received_at, ctx);
let timer = shard.query_metrics.start_timer_at(
metrics::SmgrQueryType::GetSlruSegment,
received_at,
ctx,
);
BatchedFeMessage::GetSlruSegment {
span,
timer,
@@ -770,9 +779,11 @@ impl PageServerHandler {
// It's important to start the timer before waiting for the LSN
// so that the _started counters are incremented before we do
// any serious waiting, e.g., for LSNs.
let timer = shard
.query_metrics
.start_timer_at(metrics::SmgrQueryType::GetPageAtLsn, received_at, ctx);
let timer = shard.query_metrics.start_timer_at(
metrics::SmgrQueryType::GetPageAtLsn,
received_at,
ctx,
);
let effective_request_lsn = match Self::wait_or_get_last_lsn(
&shard,
@@ -879,7 +890,12 @@ impl PageServerHandler {
{
// invoke handler function
let (handler_results, span): (
Vec<Result<(PagestreamBeMessage, GlobalAndPerTimelineHistogramTimer<'c>), PageStreamError>>,
Vec<
Result<
(PagestreamBeMessage, GlobalAndPerTimelineHistogramTimer<'c>),
PageStreamError,
>,
>,
_,
) = match batch {
BatchedFeMessage::Exists {
@@ -1268,13 +1284,17 @@ impl PageServerHandler {
// Batcher
//
let batcher_ctx = ctx.attached_child();
let batcher_ctx_ref = &batcher_ctx;
let executor_ctx = ctx.attached_child();
let executor_ctx_ref = &executor_ctx;
let cancel_batcher = self.cancel.child_token();
let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
let read_messages = pipeline_stage!(
"read_messages",
cancel_batcher.clone(),
move |cancel_batcher| {
let ctx = ctx.attached_child();
async move {
let mut pgb_reader = pgb_reader;
let mut exit = false;
@@ -1285,7 +1305,7 @@ impl PageServerHandler {
timeline_id,
&mut timeline_handles,
&cancel_batcher,
&ctx,
batcher_ctx_ref,
request_span.clone(),
)
.await;
@@ -1310,28 +1330,25 @@ impl PageServerHandler {
// Executor
//
let executor = pipeline_stage!("executor", self.cancel.clone(), move |cancel| {
let ctx = ctx.attached_child();
async move {
let _cancel_batcher = cancel_batcher.drop_guard();
loop {
let maybe_batch = batch_rx.recv().await;
let batch = match maybe_batch {
Ok(batch) => batch,
Err(spsc_fold::RecvError::SenderGone) => {
debug!("upstream gone");
return Ok(());
}
};
let batch = match batch {
Ok(batch) => batch,
Err(e) => {
return Err(e);
}
};
self.pagesteam_handle_batched_message(pgb_writer, batch, &cancel, &ctx)
.await?;
}
let executor = pipeline_stage!("executor", self.cancel.clone(), move |cancel| async move {
let _cancel_batcher = cancel_batcher.drop_guard();
loop {
let maybe_batch = batch_rx.recv().await;
let batch = match maybe_batch {
Ok(batch) => batch,
Err(spsc_fold::RecvError::SenderGone) => {
debug!("upstream gone");
return Ok(());
}
};
let batch = match batch {
Ok(batch) => batch,
Err(e) => {
return Err(e);
}
};
self.pagesteam_handle_batched_message(pgb_writer, batch, &cancel, executor_ctx_ref)
.await?;
}
});
@@ -1344,14 +1361,7 @@ impl PageServerHandler {
tokio::join!(read_messages, executor)
}
PageServiceProtocolPipelinedExecutionStrategy::Tasks => {
// These tasks are not tracked anywhere.
let read_messages_task = tokio::spawn(read_messages);
let (read_messages_task_res, executor_res_) =
tokio::join!(read_messages_task, executor,);
(
read_messages_task_res.expect("propagated panic from read_messages"),
executor_res_,
)
todo!()
}
}
}