diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 1d7f230d4b..c272ccaf24 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -532,10 +532,12 @@ impl From for QueryError { enum BatchedFeMessage { Exists { span: Span, + shard: timeline::handle::Handle, req: models::PagestreamExistsRequest, }, Nblocks { span: Span, + shard: timeline::handle::Handle, req: models::PagestreamNblocksRequest, }, GetPage { @@ -546,10 +548,12 @@ enum BatchedFeMessage { }, DbSize { span: Span, + shard: timeline::handle::Handle, req: models::PagestreamDbSizeRequest, }, GetSlruSegment { span: Span, + shard: timeline::handle::Handle, req: models::PagestreamGetSlruSegmentRequest, }, RespondError { @@ -601,11 +605,12 @@ impl PageServerHandler { async fn pagestream_read_message( pgb: &mut PostgresBackendReader, - ref tenant_id: TenantId, - ref timeline_id: TimelineId, + tenant_id: TenantId, + timeline_id: TimelineId, timeline_handles: &mut TimelineHandles, cancel: &CancellationToken, ctx: &RequestContext, + parent_span: Span, ) -> Result>, QueryError> where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, @@ -644,22 +649,38 @@ impl PageServerHandler { let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?; let batched_msg = match neon_fe_msg { - PagestreamFeMessage::Exists(req) => BatchedFeMessage::Exists { - span: tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn), - req, - }, - PagestreamFeMessage::Nblocks(req) => BatchedFeMessage::Nblocks { - span: tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn), - req, - }, - PagestreamFeMessage::DbSize(req) => BatchedFeMessage::DbSize { - span: tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn), - req, - }, - PagestreamFeMessage::GetSlruSegment(req) => BatchedFeMessage::GetSlruSegment { - span: tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn), - req, - }, + PagestreamFeMessage::Exists(req) => { + let span = tracing::info_span!(parent: parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn); + let shard = timeline_handles + .get(tenant_id, timeline_id, ShardSelector::Zero) + .instrument(span.clone()) // sets `shard_id` field + .await?; + BatchedFeMessage::Exists { span, shard, req } + } + PagestreamFeMessage::Nblocks(req) => { + let span = tracing::info_span!(parent: parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn); + let shard = timeline_handles + .get(tenant_id, timeline_id, ShardSelector::Zero) + .instrument(span.clone()) // sets `shard_id` field + .await?; + BatchedFeMessage::Nblocks { span, shard, req } + } + PagestreamFeMessage::DbSize(req) => { + let span = tracing::info_span!(parent: parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn); + let shard = timeline_handles + .get(tenant_id, timeline_id, ShardSelector::Zero) + .instrument(span.clone()) // sets `shard_id` field + .await?; + BatchedFeMessage::DbSize { span, shard, req } + } + PagestreamFeMessage::GetSlruSegment(req) => { + let span = tracing::info_span!(parent: parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn); + let shard = timeline_handles + .get(tenant_id, timeline_id, ShardSelector::Zero) + .instrument(span.clone()) // sets `shard_id` field + .await?; + BatchedFeMessage::GetSlruSegment { span, shard, req } + } PagestreamFeMessage::GetPage(PagestreamGetPageRequest { request_lsn, not_modified_since, @@ -667,11 +688,7 @@ impl PageServerHandler { blkno, }) => { // shard_id is filled in by the handler - let span = tracing::info_span!( - "handle_get_page_at_lsn_request_batched", - %tenant_id, %timeline_id, shard_id = tracing::field::Empty, req_lsn = %request_lsn, - batch_size = tracing::field::Empty, batch_id = tracing::field::Empty - ); + let span = tracing::info_span!(parent: parent_span, "handle_get_page_at_lsn_request_batched", req_lsn = %request_lsn); macro_rules! respond_error { ($error:expr) => {{ @@ -685,8 +702,8 @@ impl PageServerHandler { let key = rel_block_to_key(rel, blkno); let shard = match timeline_handles - .get(*tenant_id, *timeline_id, ShardSelector::Page(key)) - .instrument(span.clone()) + .get(tenant_id, timeline_id, ShardSelector::Page(key)) + .instrument(span.clone()) // sets `shard_id` field .await { Ok(tl) => tl, @@ -808,8 +825,6 @@ impl PageServerHandler { #[instrument(level = tracing::Level::DEBUG, skip_all)] async fn pagesteam_handle_batched_message( &mut self, - tenant_id: TenantId, - timeline_id: TimelineId, pgb_writer: &mut PostgresBackend, batch: BatchedFeMessage, ctx: &RequestContext, @@ -820,22 +835,22 @@ impl PageServerHandler { // invoke handler function let (handler_results, span): (Vec>, _) = match batch { - BatchedFeMessage::Exists { span, req } => { + BatchedFeMessage::Exists { span, shard, req } => { fail::fail_point!("ps::handle-pagerequest-message::exists"); ( vec![ - self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx) + self.handle_get_rel_exists_request(&shard, &req, &ctx) .instrument(span.clone()) .await, ], span, ) } - BatchedFeMessage::Nblocks { span, req } => { + BatchedFeMessage::Nblocks { span, shard, req } => { fail::fail_point!("ps::handle-pagerequest-message::nblocks"); ( vec![ - self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx) + self.handle_get_nblocks_request(&shard, &req, &ctx) .instrument(span.clone()) .await, ], @@ -868,29 +883,24 @@ impl PageServerHandler { span, ) } - BatchedFeMessage::DbSize { span, req } => { + BatchedFeMessage::DbSize { span, shard, req } => { fail::fail_point!("ps::handle-pagerequest-message::dbsize"); ( vec![ - self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx) + self.handle_db_size_request(&shard, &req, &ctx) .instrument(span.clone()) .await, ], span, ) } - BatchedFeMessage::GetSlruSegment { span, req } => { + BatchedFeMessage::GetSlruSegment { span, shard, req } => { fail::fail_point!("ps::handle-pagerequest-message::slrusegment"); ( vec![ - self.handle_get_slru_segment_request( - tenant_id, - timeline_id, - &req, - &ctx, - ) - .instrument(span.clone()) - .await, + self.handle_get_slru_segment_request(&shard, &req, &ctx) + .instrument(span.clone()) + .await, ], span, ) @@ -1000,11 +1010,15 @@ impl PageServerHandler { .expect("implementation error: timeline_handles should not be locked"); let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1); + let request_span = info_span!("request", shard_id = tracing::field::Empty); let read_message_task = tokio::spawn( { let cancel = self.cancel.child_token(); let ctx = ctx.attached_child(); async move { + scopeguard::defer! { + debug!("exiting"); + } let mut pgb_reader = pgb_reader; loop { let msg = Self::pagestream_read_message( @@ -1014,6 +1028,7 @@ impl PageServerHandler { &mut timeline_handles, &cancel, &ctx, + request_span.clone(), ) .await?; let msg = match msg { @@ -1034,7 +1049,7 @@ impl PageServerHandler { anyhow::Ok((pgb_reader, timeline_handles)) } } - .instrument(tracing::info_span!("reading")), + .instrument(tracing::info_span!("read_protocol")), ); let ready_for_next_batch = Arc::new(tokio::sync::Notify::new()); @@ -1043,6 +1058,9 @@ impl PageServerHandler { { let ready_for_next_batch = Arc::clone(&ready_for_next_batch); async move { + scopeguard::defer! { + debug!("exiting"); + } let mut batch: Option> = None; loop { let maybe_flush_msg = tokio::select! { @@ -1099,7 +1117,7 @@ impl PageServerHandler { } }; debug!("processing batch"); - self.pagesteam_handle_batched_message(tenant_id, timeline_id, pgb, *batch, &ctx) + self.pagesteam_handle_batched_message(pgb, *batch, &ctx) .await?; } @@ -1256,17 +1274,10 @@ impl PageServerHandler { #[instrument(skip_all, fields(shard_id))] async fn handle_get_rel_exists_request( &mut self, - tenant_id: TenantId, - timeline_id: TimelineId, + timeline: &Timeline, req: &PagestreamExistsRequest, ctx: &RequestContext, ) -> Result { - let timeline = self - .timeline_handles - .as_mut() - .unwrap() - .get(tenant_id, timeline_id, ShardSelector::Zero) - .await?; let _timer = timeline .query_metrics .start_timer(metrics::SmgrQueryType::GetRelExists, ctx); @@ -1293,18 +1304,10 @@ impl PageServerHandler { #[instrument(skip_all, fields(shard_id))] async fn handle_get_nblocks_request( &mut self, - tenant_id: TenantId, - timeline_id: TimelineId, + timeline: &Timeline, req: &PagestreamNblocksRequest, ctx: &RequestContext, ) -> Result { - let timeline = self - .timeline_handles - .as_mut() - .unwrap() - .get(tenant_id, timeline_id, ShardSelector::Zero) - .await?; - let _timer = timeline .query_metrics .start_timer(metrics::SmgrQueryType::GetRelSize, ctx); @@ -1331,18 +1334,10 @@ impl PageServerHandler { #[instrument(skip_all, fields(shard_id))] async fn handle_db_size_request( &mut self, - tenant_id: TenantId, - timeline_id: TimelineId, + timeline: &Timeline, req: &PagestreamDbSizeRequest, ctx: &RequestContext, ) -> Result { - let timeline = self - .timeline_handles - .as_mut() - .unwrap() - .get(tenant_id, timeline_id, ShardSelector::Zero) - .await?; - let _timer = timeline .query_metrics .start_timer(metrics::SmgrQueryType::GetDbSize, ctx); @@ -1397,18 +1392,10 @@ impl PageServerHandler { #[instrument(skip_all, fields(shard_id))] async fn handle_get_slru_segment_request( &mut self, - tenant_id: TenantId, - timeline_id: TimelineId, + timeline: &Timeline, req: &PagestreamGetSlruSegmentRequest, ctx: &RequestContext, ) -> Result { - let timeline = self - .timeline_handles - .as_mut() - .unwrap() - .get(tenant_id, timeline_id, ShardSelector::Zero) - .await?; - let _timer = timeline .query_metrics .start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx);