diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 99d0dc4250..f50f6d9913 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -650,7 +650,7 @@ impl PageServerHandler { rel, blkno, }) => { - let span = tracing::info_span!("handle_get_page_at_lsn_request_batched", %tenant_id, %timeline_id, shard_id = tracing::field::Empty, req_lsn = %request_lsn, batch_size = tracing::field::Empty); + let span = tracing::info_span!("handle_get_page_at_lsn_request_batched", %tenant_id, %timeline_id, shard_id = tracing::field::Empty, req_lsn = %request_lsn, batch_size = tracing::field::Empty, batch_id = tracing::field::Empty); let key = rel_block_to_key(rel, blkno); let shard = match self .timeline_handles @@ -793,6 +793,12 @@ impl PageServerHandler { } => { CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM.observe(pages.len() as f64); span.record("batch_size", pages.len() as u64); + static BATCH_ID: Lazy = + Lazy::new(|| std::sync::atomic::AtomicUsize::new(0)); + span.record( + "batch_id", + BATCH_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed) as u64, + ); fail::fail_point!("ps::handle-pagerequest-message::getpage"); // shard_id is filled in by the handler ( diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 2a9b83d0c2..17e11a0ce3 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -9,6 +9,7 @@ mod layer_name; pub mod merge_iterator; use tokio::sync::{self}; +use tracing::{debug, Instrument}; use utils::bin_ser::BeSer; pub mod split_writer; @@ -160,7 +161,7 @@ pub(crate) struct ValuesReconstructState { enum IoConcurrency { Serial { - prev_io: Option>, + prev_io: Option<(usize, tokio::task::JoinHandle<()>)>, }, Parallel, } @@ -170,15 +171,22 @@ impl IoConcurrency { where F: std::future::Future + Send + 'static, { + static IO_COUNTER: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0); + let io_id = IO_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let span = tracing::debug_span!("spawned_io", io_id,); match self { IoConcurrency::Serial { prev_io } => { let prev = prev_io.take(); - *prev_io = Some(tokio::spawn(async move { - if let Some(prev) = prev { - prev.await.unwrap(); + *prev_io = Some((io_id, tokio::spawn( + async move { + if let Some((prev_id, prev_task)) = prev { + debug!(prev_io = prev_id, "Waiting for previous IO to complete"); + prev_task.await.unwrap(); + } + fut.await; } - fut.await; - })); + .instrument(span), + ))); } IoConcurrency::Parallel => { tokio::spawn(fut);