From 724519a21b5335f456631dbc8e2bb6488d6feac9 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 29 Nov 2024 19:13:59 +0100 Subject: [PATCH] got the lifetimes to work --- pageserver/src/page_service.rs | 110 ++++++++++++++++++--------------- 1 file changed, 60 insertions(+), 50 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index ef2ca3a9fe..58a4fff5f8 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -554,7 +554,8 @@ enum BatchedFeMessage<'c> { span: Span, shard: timeline::handle::Handle, effective_request_lsn: Lsn, - pages: smallvec::SmallVec<[(RelTag, BlockNumber, GlobalAndPerTimelineHistogramTimer<'c>); 1]>, + pages: + smallvec::SmallVec<[(RelTag, BlockNumber, GlobalAndPerTimelineHistogramTimer<'c>); 1]>, }, DbSize { span: Span, @@ -622,7 +623,7 @@ impl PageServerHandler { timeline_id: TimelineId, timeline_handles: &mut TimelineHandles, cancel: &CancellationToken, - ctx: &RequestContext, + ctx: &'c RequestContext, parent_span: Span, ) -> Result>, QueryError> where @@ -666,9 +667,11 @@ impl PageServerHandler { .get(tenant_id, timeline_id, ShardSelector::Zero) .instrument(span.clone()) // sets `shard_id` field .await?; - let timer = shard - .query_metrics - .start_timer_at(metrics::SmgrQueryType::GetRelExists, received_at, ctx); + let timer = shard.query_metrics.start_timer_at( + metrics::SmgrQueryType::GetRelExists, + received_at, + ctx, + ); BatchedFeMessage::Exists { span, timer, @@ -682,9 +685,11 @@ impl PageServerHandler { .get(tenant_id, timeline_id, ShardSelector::Zero) .instrument(span.clone()) // sets `shard_id` field .await?; - let timer = shard - .query_metrics - .start_timer_at(metrics::SmgrQueryType::GetRelSize, received_at, ctx); + let timer = shard.query_metrics.start_timer_at( + metrics::SmgrQueryType::GetRelSize, + received_at, + ctx, + ); BatchedFeMessage::Nblocks { span, timer, @@ -698,9 +703,11 @@ impl PageServerHandler { .get(tenant_id, timeline_id, ShardSelector::Zero) .instrument(span.clone()) // sets `shard_id` field .await?; - let timer = shard - .query_metrics - .start_timer_at(metrics::SmgrQueryType::GetDbSize, received_at, ctx); + let timer = shard.query_metrics.start_timer_at( + metrics::SmgrQueryType::GetDbSize, + received_at, + ctx, + ); BatchedFeMessage::DbSize { span, timer, @@ -714,9 +721,11 @@ impl PageServerHandler { .get(tenant_id, timeline_id, ShardSelector::Zero) .instrument(span.clone()) // sets `shard_id` field .await?; - let timer = shard - .query_metrics - .start_timer_at(metrics::SmgrQueryType::GetSlruSegment, received_at, ctx); + let timer = shard.query_metrics.start_timer_at( + metrics::SmgrQueryType::GetSlruSegment, + received_at, + ctx, + ); BatchedFeMessage::GetSlruSegment { span, timer, @@ -770,9 +779,11 @@ impl PageServerHandler { // It's important to start the timer before waiting for the LSN // so that the _started counters are incremented before we do // any serious waiting, e.g., for LSNs. - let timer = shard - .query_metrics - .start_timer_at(metrics::SmgrQueryType::GetPageAtLsn, received_at, ctx); + let timer = shard.query_metrics.start_timer_at( + metrics::SmgrQueryType::GetPageAtLsn, + received_at, + ctx, + ); let effective_request_lsn = match Self::wait_or_get_last_lsn( &shard, @@ -879,7 +890,12 @@ impl PageServerHandler { { // invoke handler function let (handler_results, span): ( - Vec), PageStreamError>>, + Vec< + Result< + (PagestreamBeMessage, GlobalAndPerTimelineHistogramTimer<'c>), + PageStreamError, + >, + >, _, ) = match batch { BatchedFeMessage::Exists { @@ -1268,13 +1284,17 @@ impl PageServerHandler { // Batcher // + let batcher_ctx = ctx.attached_child(); + let batcher_ctx_ref = &batcher_ctx; + let executor_ctx = ctx.attached_child(); + let executor_ctx_ref = &executor_ctx; + let cancel_batcher = self.cancel.child_token(); let (mut batch_tx, mut batch_rx) = spsc_fold::channel(); let read_messages = pipeline_stage!( "read_messages", cancel_batcher.clone(), move |cancel_batcher| { - let ctx = ctx.attached_child(); async move { let mut pgb_reader = pgb_reader; let mut exit = false; @@ -1285,7 +1305,7 @@ impl PageServerHandler { timeline_id, &mut timeline_handles, &cancel_batcher, - &ctx, + batcher_ctx_ref, request_span.clone(), ) .await; @@ -1310,28 +1330,25 @@ impl PageServerHandler { // Executor // - let executor = pipeline_stage!("executor", self.cancel.clone(), move |cancel| { - let ctx = ctx.attached_child(); - async move { - let _cancel_batcher = cancel_batcher.drop_guard(); - loop { - let maybe_batch = batch_rx.recv().await; - let batch = match maybe_batch { - Ok(batch) => batch, - Err(spsc_fold::RecvError::SenderGone) => { - debug!("upstream gone"); - return Ok(()); - } - }; - let batch = match batch { - Ok(batch) => batch, - Err(e) => { - return Err(e); - } - }; - self.pagesteam_handle_batched_message(pgb_writer, batch, &cancel, &ctx) - .await?; - } + let executor = pipeline_stage!("executor", self.cancel.clone(), move |cancel| async move { + let _cancel_batcher = cancel_batcher.drop_guard(); + loop { + let maybe_batch = batch_rx.recv().await; + let batch = match maybe_batch { + Ok(batch) => batch, + Err(spsc_fold::RecvError::SenderGone) => { + debug!("upstream gone"); + return Ok(()); + } + }; + let batch = match batch { + Ok(batch) => batch, + Err(e) => { + return Err(e); + } + }; + self.pagesteam_handle_batched_message(pgb_writer, batch, &cancel, executor_ctx_ref) + .await?; } }); @@ -1344,14 +1361,7 @@ impl PageServerHandler { tokio::join!(read_messages, executor) } PageServiceProtocolPipelinedExecutionStrategy::Tasks => { - // These tasks are not tracked anywhere. - let read_messages_task = tokio::spawn(read_messages); - let (read_messages_task_res, executor_res_) = - tokio::join!(read_messages_task, executor,); - ( - read_messages_task_res.expect("propagated panic from read_messages"), - executor_res_, - ) + todo!() } } }