try add RequestContext throttle time tracking back

This commit is contained in:
Christian Schwarz
2024-11-29 19:01:57 +01:00
parent 92b36197f8
commit 3993979a98
2 changed files with 37 additions and 30 deletions

View File

@@ -1216,16 +1216,18 @@ pub(crate) mod virtual_file_io_engine {
});
}
pub(crate) struct GlobalAndPerTimelineHistogramTimer {
pub(crate) struct GlobalAndPerTimelineHistogramTimer<'c> {
global_latency_histo: Histogram,
// Optional because not all op types are tracked per-timeline
per_timeline_latency_histo: Option<Histogram>,
start: Instant,
ctx: &'c RequestContext,
}
impl Drop for GlobalAndPerTimelineHistogramTimer {
impl<'c> Drop for GlobalAndPerTimelineHistogramTimer<'c> {
fn drop(&mut self) {
let elapsed = self.start.elapsed().as_secs_f64();
self.global_latency_histo.observe(elapsed);
@@ -1394,11 +1396,12 @@ impl SmgrQueryTimePerTimeline {
per_timeline_getpage_started,
}
}
pub(crate) fn start_timer_at(
pub(crate) fn start_timer_at<'c>(
&self,
op: SmgrQueryType,
start: Instant,
) -> GlobalAndPerTimelineHistogramTimer {
ctx: &'c RequestContext,
) -> GlobalAndPerTimelineHistogramTimer<'c> {
self.global_started[op as usize].inc();
let per_timeline_latency_histo = if matches!(op, SmgrQueryType::GetPageAtLsn) {
@@ -1412,6 +1415,7 @@ impl SmgrQueryTimePerTimeline {
global_latency_histo: self.global_latency[op as usize].clone(),
per_timeline_latency_histo,
start,
ctx,
}
}
}
@@ -1424,6 +1428,8 @@ mod smgr_query_time_tests {
use strum::IntoEnumIterator;
use utils::id::{TenantId, TimelineId};
use crate::{context::{DownloadBehavior, RequestContext}, task_mgr::TaskKind};
// Regression test, we used hard-coded string constants before using an enum.
#[test]
fn op_label_name() {
@@ -1467,7 +1473,8 @@ mod smgr_query_time_tests {
let (pre_global, pre_per_tenant_timeline) = get_counts();
assert_eq!(pre_per_tenant_timeline, 0);
let timer = metrics.start_timer_at(*op, Instant::now());
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download);
let timer = metrics.start_timer_at(*op, Instant::now(), &ctx);
drop(timer);
let (post_global, post_per_tenant_timeline) = get_counts();

View File

@@ -537,16 +537,16 @@ impl From<WaitLsnError> for QueryError {
}
}
enum BatchedFeMessage {
enum BatchedFeMessage<'c> {
Exists {
span: Span,
timer: GlobalAndPerTimelineHistogramTimer,
timer: GlobalAndPerTimelineHistogramTimer<'c>,
shard: timeline::handle::Handle<TenantManagerTypes>,
req: models::PagestreamExistsRequest,
},
Nblocks {
span: Span,
timer: GlobalAndPerTimelineHistogramTimer,
timer: GlobalAndPerTimelineHistogramTimer<'c>,
shard: timeline::handle::Handle<TenantManagerTypes>,
req: models::PagestreamNblocksRequest,
},
@@ -554,17 +554,17 @@ enum BatchedFeMessage {
span: Span,
shard: timeline::handle::Handle<TenantManagerTypes>,
effective_request_lsn: Lsn,
pages: smallvec::SmallVec<[(RelTag, BlockNumber, GlobalAndPerTimelineHistogramTimer); 1]>,
pages: smallvec::SmallVec<[(RelTag, BlockNumber, GlobalAndPerTimelineHistogramTimer<'c>); 1]>,
},
DbSize {
span: Span,
timer: GlobalAndPerTimelineHistogramTimer,
timer: GlobalAndPerTimelineHistogramTimer<'c>,
shard: timeline::handle::Handle<TenantManagerTypes>,
req: models::PagestreamDbSizeRequest,
},
GetSlruSegment {
span: Span,
timer: GlobalAndPerTimelineHistogramTimer,
timer: GlobalAndPerTimelineHistogramTimer<'c>,
shard: timeline::handle::Handle<TenantManagerTypes>,
req: models::PagestreamGetSlruSegmentRequest,
},
@@ -616,7 +616,7 @@ impl PageServerHandler {
)
}
async fn pagestream_read_message<IO>(
async fn pagestream_read_message<'c, IO>(
pgb: &mut PostgresBackendReader<IO>,
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -624,7 +624,7 @@ impl PageServerHandler {
cancel: &CancellationToken,
ctx: &RequestContext,
parent_span: Span,
) -> Result<Option<BatchedFeMessage>, QueryError>
) -> Result<Option<BatchedFeMessage<'c>>, QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
{
@@ -668,7 +668,7 @@ impl PageServerHandler {
.await?;
let timer = shard
.query_metrics
.start_timer_at(metrics::SmgrQueryType::GetRelExists, received_at);
.start_timer_at(metrics::SmgrQueryType::GetRelExists, received_at, ctx);
BatchedFeMessage::Exists {
span,
timer,
@@ -684,7 +684,7 @@ impl PageServerHandler {
.await?;
let timer = shard
.query_metrics
.start_timer_at(metrics::SmgrQueryType::GetRelSize, received_at);
.start_timer_at(metrics::SmgrQueryType::GetRelSize, received_at, ctx);
BatchedFeMessage::Nblocks {
span,
timer,
@@ -700,7 +700,7 @@ impl PageServerHandler {
.await?;
let timer = shard
.query_metrics
.start_timer_at(metrics::SmgrQueryType::GetDbSize, received_at);
.start_timer_at(metrics::SmgrQueryType::GetDbSize, received_at, ctx);
BatchedFeMessage::DbSize {
span,
timer,
@@ -716,7 +716,7 @@ impl PageServerHandler {
.await?;
let timer = shard
.query_metrics
.start_timer_at(metrics::SmgrQueryType::GetSlruSegment, received_at);
.start_timer_at(metrics::SmgrQueryType::GetSlruSegment, received_at, ctx);
BatchedFeMessage::GetSlruSegment {
span,
timer,
@@ -772,7 +772,7 @@ impl PageServerHandler {
// any serious waiting, e.g., for LSNs.
let timer = shard
.query_metrics
.start_timer_at(metrics::SmgrQueryType::GetPageAtLsn, received_at);
.start_timer_at(metrics::SmgrQueryType::GetPageAtLsn, received_at, ctx);
let effective_request_lsn = match Self::wait_or_get_last_lsn(
&shard,
@@ -803,11 +803,11 @@ impl PageServerHandler {
/// Post-condition: `batch` is Some()
#[instrument(skip_all, level = tracing::Level::TRACE)]
#[allow(clippy::boxed_local)]
fn pagestream_do_batch(
fn pagestream_do_batch<'c>(
max_batch_size: NonZeroUsize,
batch: &mut Result<BatchedFeMessage, QueryError>,
this_msg: Result<BatchedFeMessage, QueryError>,
) -> Result<(), Result<BatchedFeMessage, QueryError>> {
batch: &mut Result<BatchedFeMessage<'c>, QueryError>,
this_msg: Result<BatchedFeMessage<'c>, QueryError>,
) -> Result<(), Result<BatchedFeMessage<'c>, QueryError>> {
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
let this_msg = match this_msg {
@@ -867,19 +867,19 @@ impl PageServerHandler {
}
#[instrument(level = tracing::Level::DEBUG, skip_all)]
async fn pagesteam_handle_batched_message<IO>(
async fn pagesteam_handle_batched_message<'c, IO>(
&mut self,
pgb_writer: &mut PostgresBackend<IO>,
batch: BatchedFeMessage,
batch: BatchedFeMessage<'c>,
cancel: &CancellationToken,
ctx: &RequestContext,
ctx: &'c RequestContext,
) -> Result<(), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
// invoke handler function
let (handler_results, span): (
Vec<Result<(PagestreamBeMessage, GlobalAndPerTimelineHistogramTimer), PageStreamError>>,
Vec<Result<(PagestreamBeMessage, GlobalAndPerTimelineHistogramTimer<'c>), PageStreamError>>,
_,
) = match batch {
BatchedFeMessage::Exists {
@@ -1574,15 +1574,15 @@ impl PageServerHandler {
}
#[instrument(skip_all)]
async fn handle_get_page_at_lsn_request_batched(
async fn handle_get_page_at_lsn_request_batched<'c>(
&mut self,
timeline: &Timeline,
effective_lsn: Lsn,
requests: smallvec::SmallVec<
[(RelTag, BlockNumber, GlobalAndPerTimelineHistogramTimer); 1],
[(RelTag, BlockNumber, GlobalAndPerTimelineHistogramTimer<'c>); 1],
>,
ctx: &RequestContext,
) -> Vec<Result<(PagestreamBeMessage, GlobalAndPerTimelineHistogramTimer), PageStreamError>>
ctx: &'c RequestContext,
) -> Vec<Result<(PagestreamBeMessage, GlobalAndPerTimelineHistogramTimer<'c>), PageStreamError>>
{
debug_assert_current_span_has_tenant_and_timeline_id();