mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
pageserver: add tracing spans for time spent in batch and flushing (#12012)
## Problem We have some gaps in our traces. This indicates missing spans. ## Summary of changes This PR adds two new spans: * WAIT_EXECUTOR: time a batched request spends in the batch waiting to be picked up * FLUSH_RESPONSE: time a get page request spends flushing the response to the compute 
This commit is contained in:
@@ -769,6 +769,9 @@ struct BatchedGetPageRequest {
|
||||
timer: SmgrOpTimer,
|
||||
lsn_range: LsnRange,
|
||||
ctx: RequestContext,
|
||||
// If the request is perf enabled, this contains a context
|
||||
// with a perf span tracking the time spent waiting for the executor.
|
||||
batch_wait_ctx: Option<RequestContext>,
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -781,6 +784,7 @@ struct BatchedTestRequest {
|
||||
/// so that we don't keep the [`Timeline::gate`] open while the batch
|
||||
/// is being built up inside the [`spsc_fold`] (pagestream pipelining).
|
||||
#[derive(IntoStaticStr)]
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
enum BatchedFeMessage {
|
||||
Exists {
|
||||
span: Span,
|
||||
@@ -1298,6 +1302,22 @@ impl PageServerHandler {
|
||||
}
|
||||
};
|
||||
|
||||
let batch_wait_ctx = if ctx.has_perf_span() {
|
||||
Some(
|
||||
RequestContextBuilder::from(&ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"WAIT_EXECUTOR",
|
||||
)
|
||||
})
|
||||
.attached_child(),
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
BatchedFeMessage::GetPage {
|
||||
span,
|
||||
shard: shard.downgrade(),
|
||||
@@ -1309,6 +1329,7 @@ impl PageServerHandler {
|
||||
request_lsn: req.hdr.request_lsn
|
||||
},
|
||||
ctx,
|
||||
batch_wait_ctx,
|
||||
}],
|
||||
// The executor grabs the batch when it becomes idle.
|
||||
// Hence, [`GetPageBatchBreakReason::ExecutorSteal`] is the
|
||||
@@ -1464,7 +1485,7 @@ impl PageServerHandler {
|
||||
let mut flush_timers = Vec::with_capacity(handler_results.len());
|
||||
for handler_result in &mut handler_results {
|
||||
let flush_timer = match handler_result {
|
||||
Ok((_, timer)) => Some(
|
||||
Ok((_response, timer, _ctx)) => Some(
|
||||
timer
|
||||
.observe_execution_end(flushing_start_time)
|
||||
.expect("we are the first caller"),
|
||||
@@ -1484,7 +1505,7 @@ impl PageServerHandler {
|
||||
// Some handler errors cause exit from pagestream protocol.
|
||||
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
|
||||
for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) {
|
||||
let response_msg = match handler_result {
|
||||
let (response_msg, ctx) = match handler_result {
|
||||
Err(e) => match &e.err {
|
||||
PageStreamError::Shutdown => {
|
||||
// If we fail to fulfil a request during shutdown, which may be _because_ of
|
||||
@@ -1509,15 +1530,30 @@ impl PageServerHandler {
|
||||
error!("error reading relation or page version: {full:#}")
|
||||
});
|
||||
|
||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
req: e.req,
|
||||
message: e.err.to_string(),
|
||||
})
|
||||
(
|
||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
req: e.req,
|
||||
message: e.err.to_string(),
|
||||
}),
|
||||
None,
|
||||
)
|
||||
}
|
||||
},
|
||||
Ok((response_msg, _op_timer_already_observed)) => response_msg,
|
||||
Ok((response_msg, _op_timer_already_observed, ctx)) => (response_msg, Some(ctx)),
|
||||
};
|
||||
|
||||
let ctx = ctx.map(|req_ctx| {
|
||||
RequestContextBuilder::from(&req_ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"FLUSH_RESPONSE",
|
||||
)
|
||||
})
|
||||
.attached_child()
|
||||
});
|
||||
|
||||
//
|
||||
// marshal & transmit response message
|
||||
//
|
||||
@@ -1540,6 +1576,17 @@ impl PageServerHandler {
|
||||
)),
|
||||
None => futures::future::Either::Right(flush_fut),
|
||||
};
|
||||
|
||||
let flush_fut = if let Some(req_ctx) = ctx.as_ref() {
|
||||
futures::future::Either::Left(
|
||||
flush_fut.maybe_perf_instrument(req_ctx, |current_perf_span| {
|
||||
current_perf_span.clone()
|
||||
}),
|
||||
)
|
||||
} else {
|
||||
futures::future::Either::Right(flush_fut)
|
||||
};
|
||||
|
||||
// do it while respecting cancellation
|
||||
let _: () = async move {
|
||||
tokio::select! {
|
||||
@@ -1569,7 +1616,7 @@ impl PageServerHandler {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<
|
||||
(
|
||||
Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>>,
|
||||
Vec<Result<(PagestreamBeMessage, SmgrOpTimer, RequestContext), BatchedPageStreamError>>,
|
||||
Span,
|
||||
),
|
||||
QueryError,
|
||||
@@ -1596,7 +1643,7 @@ impl PageServerHandler {
|
||||
self.handle_get_rel_exists_request(&shard, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer))
|
||||
.map(|msg| (msg, timer, ctx))
|
||||
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
|
||||
],
|
||||
span,
|
||||
@@ -1615,7 +1662,7 @@ impl PageServerHandler {
|
||||
self.handle_get_nblocks_request(&shard, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer))
|
||||
.map(|msg| (msg, timer, ctx))
|
||||
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
|
||||
],
|
||||
span,
|
||||
@@ -1662,7 +1709,7 @@ impl PageServerHandler {
|
||||
self.handle_db_size_request(&shard, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer))
|
||||
.map(|msg| (msg, timer, ctx))
|
||||
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
|
||||
],
|
||||
span,
|
||||
@@ -1681,7 +1728,7 @@ impl PageServerHandler {
|
||||
self.handle_get_slru_segment_request(&shard, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer))
|
||||
.map(|msg| (msg, timer, ctx))
|
||||
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
|
||||
],
|
||||
span,
|
||||
@@ -2033,12 +2080,25 @@ impl PageServerHandler {
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
let batch = match batch {
|
||||
let mut batch = match batch {
|
||||
Ok(batch) => batch,
|
||||
Err(e) => {
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
if let BatchedFeMessage::GetPage {
|
||||
pages,
|
||||
span: _,
|
||||
shard: _,
|
||||
batch_break_reason: _,
|
||||
} = &mut batch
|
||||
{
|
||||
for req in pages {
|
||||
req.batch_wait_ctx.take();
|
||||
}
|
||||
}
|
||||
|
||||
self.pagestream_handle_batched_message(
|
||||
pgb_writer,
|
||||
batch,
|
||||
@@ -2351,7 +2411,8 @@ impl PageServerHandler {
|
||||
io_concurrency: IoConcurrency,
|
||||
batch_break_reason: GetPageBatchBreakReason,
|
||||
ctx: &RequestContext,
|
||||
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
|
||||
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer, RequestContext), BatchedPageStreamError>>
|
||||
{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
timeline
|
||||
@@ -2458,6 +2519,7 @@ impl PageServerHandler {
|
||||
page,
|
||||
}),
|
||||
req.timer,
|
||||
req.ctx,
|
||||
)
|
||||
})
|
||||
.map_err(|e| BatchedPageStreamError {
|
||||
@@ -2502,7 +2564,8 @@ impl PageServerHandler {
|
||||
timeline: &Timeline,
|
||||
requests: Vec<BatchedTestRequest>,
|
||||
_ctx: &RequestContext,
|
||||
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
|
||||
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer, RequestContext), BatchedPageStreamError>>
|
||||
{
|
||||
// real requests would do something with the timeline
|
||||
let mut results = Vec::with_capacity(requests.len());
|
||||
for _req in requests.iter() {
|
||||
@@ -2529,6 +2592,10 @@ impl PageServerHandler {
|
||||
req: req.req.clone(),
|
||||
}),
|
||||
req.timer,
|
||||
RequestContext::new(
|
||||
TaskKind::PageRequestHandler,
|
||||
DownloadBehavior::Warn,
|
||||
),
|
||||
)
|
||||
})
|
||||
.map_err(|e| BatchedPageStreamError {
|
||||
|
||||
Reference in New Issue
Block a user