correct start & end times for smgr query observations (fixes #9925) at the expense of exclusion of throttling from metrics (too complicated lifetimes)

This commit is contained in:
Christian Schwarz
2024-11-29 18:49:41 +01:00
parent 6d36c07a33
commit 7a39ad4832
3 changed files with 194 additions and 215 deletions

View File

@@ -1216,52 +1216,21 @@ pub(crate) mod virtual_file_io_engine {
});
}
struct GlobalAndPerTimelineHistogramTimer<'a, 'c, I>
where
I: IntoIterator<Item = std::time::Instant> + ExactSizeIterator,
{
global_latency_histo: &'a Histogram,
pub(crate) struct GlobalAndPerTimelineHistogramTimer {
global_latency_histo: Histogram,
// Optional because not all op types are tracked per-timeline
per_timeline_latency_histo: Option<&'a Histogram>,
per_timeline_latency_histo: Option<Histogram>,
ctx: &'c RequestContext,
starts: I,
op: SmgrQueryType,
start: Instant,
}
impl Drop for GlobalAndPerTimelineHistogramTimer<'_, '_, _> {
impl Drop for GlobalAndPerTimelineHistogramTimer {
fn drop(&mut self) {
let elapsed = self.start.elapsed();
let ex_throttled = self
.ctx
.micros_spent_throttled
.close_and_checked_sub_from(elapsed);
let ex_throttled = match ex_throttled {
Ok(res) => res,
Err(error) => {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<enum_map::EnumMap<SmgrQueryType, RateLimit>>> =
Lazy::new(|| {
Mutex::new(enum_map::EnumMap::from_array(std::array::from_fn(|_| {
RateLimit::new(Duration::from_secs(10))
})))
});
let mut guard = LOGGED.lock().unwrap();
let rate_limit = &mut guard[self.op];
rate_limit.call(|| {
warn!(op=?self.op, error, "error deducting time spent throttled; this message is logged at a global rate limit");
});
elapsed
}
};
for _ in 0..self.count {
self.global_latency_histo
.observe(ex_throttled.as_secs_f64());
if let Some(per_timeline_getpage_histo) = self.per_timeline_latency_histo {
per_timeline_getpage_histo.observe(ex_throttled.as_secs_f64());
}
let elapsed = self.start.elapsed().as_secs_f64();
self.global_latency_histo.observe(elapsed);
if let Some(per_timeline_getpage_histo) = &self.per_timeline_latency_histo {
per_timeline_getpage_histo.observe(elapsed);
}
}
}
@@ -1425,60 +1394,36 @@ impl SmgrQueryTimePerTimeline {
per_timeline_getpage_started,
}
}
pub(crate) fn start_timer<'c: 'a, 'a>(
&'a self,
op: SmgrQueryType,
ctx: &'c RequestContext,
) -> Option<impl Drop + 'a> {
self.start_timer_many(op, 1, ctx)
}
pub(crate) fn start_timer_at<'c: 'a, 'a>(
&'a self,
pub(crate) fn start_timer_at(
&self,
op: SmgrQueryType,
start: Instant,
ctx: &'c RequestContext,
) -> Option<impl Drop + 'a> {
self.start_timer_at_many(op, std::iter::once(start), ctx)
}
) -> GlobalAndPerTimelineHistogramTimer {
self.global_started[op as usize].inc();
pub(crate) fn start_timer_at_many<'c: 'a, 'a, T>(
&'a self,
op: SmgrQueryType,
starts: T,
ctx: &'c RequestContext,
) -> Option<impl Drop + 'a>
where
T: IntoIterator<Item = Instant> + ExactSizeIterator,
{
let per_timeline_latency_histo = if matches!(op, SmgrQueryType::GetPageAtLsn) {
self.per_timeline_getpage_started.inc_by(starts.len());
Some(&self.per_timeline_getpage_latency)
self.per_timeline_getpage_started.inc();
Some(self.per_timeline_getpage_latency.clone())
} else {
None
};
Some(GlobalAndPerTimelineHistogramTimer {
global_latency_histo: &self.global_latency[op as usize],
GlobalAndPerTimelineHistogramTimer {
global_latency_histo: self.global_latency[op as usize].clone(),
per_timeline_latency_histo,
ctx,
op,
starts,
})
start,
}
}
}
#[cfg(test)]
mod smgr_query_time_tests {
use std::time::Instant;
use pageserver_api::shard::TenantShardId;
use strum::IntoEnumIterator;
use utils::id::{TenantId, TimelineId};
use crate::{
context::{DownloadBehavior, RequestContext},
task_mgr::TaskKind,
};
// Regression test, we used hard-coded string constants before using an enum.
#[test]
fn op_label_name() {
@@ -1522,8 +1467,7 @@ mod smgr_query_time_tests {
let (pre_global, pre_per_tenant_timeline) = get_counts();
assert_eq!(pre_per_tenant_timeline, 0);
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download);
let timer = metrics.start_timer(*op, &ctx);
let timer = metrics.start_timer_at(*op, Instant::now());
drop(timer);
let (post_global, post_per_tenant_timeline) = get_counts();

View File

@@ -51,7 +51,7 @@ use crate::auth::check_permission;
use crate::basebackup::BasebackupError;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::{self};
use crate::metrics::{self, GlobalAndPerTimelineHistogramTimer};
use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
use crate::pgdatadir_mapping::Version;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
@@ -540,13 +540,13 @@ impl From<WaitLsnError> for QueryError {
enum BatchedFeMessage {
Exists {
span: Span,
received_at: Instant,
timer: GlobalAndPerTimelineHistogramTimer,
shard: timeline::handle::Handle<TenantManagerTypes>,
req: models::PagestreamExistsRequest,
},
Nblocks {
span: Span,
received_at: Instant,
timer: GlobalAndPerTimelineHistogramTimer,
shard: timeline::handle::Handle<TenantManagerTypes>,
req: models::PagestreamNblocksRequest,
},
@@ -554,17 +554,17 @@ enum BatchedFeMessage {
span: Span,
shard: timeline::handle::Handle<TenantManagerTypes>,
effective_request_lsn: Lsn,
pages: smallvec::SmallVec<[(RelTag, BlockNumber, Instant); 1]>,
pages: smallvec::SmallVec<[(RelTag, BlockNumber, GlobalAndPerTimelineHistogramTimer); 1]>,
},
DbSize {
span: Span,
received_at: Instant,
timer: GlobalAndPerTimelineHistogramTimer,
shard: timeline::handle::Handle<TenantManagerTypes>,
req: models::PagestreamDbSizeRequest,
},
GetSlruSegment {
span: Span,
received_at: Instant,
timer: GlobalAndPerTimelineHistogramTimer,
shard: timeline::handle::Handle<TenantManagerTypes>,
req: models::PagestreamGetSlruSegmentRequest,
},
@@ -666,9 +666,12 @@ impl PageServerHandler {
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let timer = shard
.query_metrics
.start_timer_at(metrics::SmgrQueryType::GetRelExists, received_at);
BatchedFeMessage::Exists {
span,
received_at,
timer,
shard,
req,
}
@@ -679,9 +682,12 @@ impl PageServerHandler {
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let timer = shard
.query_metrics
.start_timer_at(metrics::SmgrQueryType::GetRelSize, received_at);
BatchedFeMessage::Nblocks {
span,
received_at,
timer,
shard,
req,
}
@@ -692,9 +698,12 @@ impl PageServerHandler {
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let timer = shard
.query_metrics
.start_timer_at(metrics::SmgrQueryType::GetDbSize, received_at);
BatchedFeMessage::DbSize {
span,
received_at,
timer,
shard,
req,
}
@@ -705,9 +714,12 @@ impl PageServerHandler {
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let timer = shard
.query_metrics
.start_timer_at(metrics::SmgrQueryType::GetSlruSegment, received_at);
BatchedFeMessage::GetSlruSegment {
span,
received_at,
timer,
shard,
req,
}
@@ -754,6 +766,14 @@ impl PageServerHandler {
return respond_error!(e.into());
}
};
// It's important to start the timer before waiting for the LSN
// so that the _started counters are incremented before we do
// any serious waiting, e.g., for LSNs.
let timer = shard
.query_metrics
.start_timer_at(metrics::SmgrQueryType::GetPageAtLsn, received_at);
let effective_request_lsn = match Self::wait_or_get_last_lsn(
&shard,
request_lsn,
@@ -773,7 +793,7 @@ impl PageServerHandler {
span,
shard,
effective_request_lsn,
pages: smallvec::smallvec![(rel, blkno, received_at)],
pages: smallvec::smallvec![(rel, blkno, timer)],
}
}
};
@@ -858,98 +878,112 @@ impl PageServerHandler {
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
// invoke handler function
let (handler_results, span): (Vec<Result<PagestreamBeMessage, PageStreamError>>, _) =
match batch {
BatchedFeMessage::Exists {
let (handler_results, span): (
Vec<Result<(PagestreamBeMessage, GlobalAndPerTimelineHistogramTimer), PageStreamError>>,
_,
) = match batch {
BatchedFeMessage::Exists {
span,
timer,
shard,
req,
} => {
fail::fail_point!("ps::handle-pagerequest-message::exists");
(
vec![self
.handle_get_rel_exists_request(&shard, &req, ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer))],
span,
received_at: _,
shard,
req,
} => {
fail::fail_point!("ps::handle-pagerequest-message::exists");
(
vec![
self.handle_get_rel_exists_request(&shard, &req, ctx)
.instrument(span.clone())
.await,
],
span,
)
}
BatchedFeMessage::Nblocks {
)
}
BatchedFeMessage::Nblocks {
span,
timer,
shard,
req,
} => {
fail::fail_point!("ps::handle-pagerequest-message::nblocks");
(
vec![self
.handle_get_nblocks_request(&shard, &req, ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer))],
span,
received_at,
shard,
req,
} => {
fail::fail_point!("ps::handle-pagerequest-message::nblocks");
(
vec![
self.handle_get_nblocks_request(&shard, &req, ctx)
.instrument(span.clone())
.await,
],
span,
)
}
BatchedFeMessage::GetPage {
)
}
BatchedFeMessage::GetPage {
span,
shard,
effective_request_lsn,
pages,
} => {
fail::fail_point!("ps::handle-pagerequest-message::getpage");
(
{
let npages = pages.len();
trace!(npages, "handling getpage request");
let res = self
.handle_get_page_at_lsn_request_batched(
&shard,
effective_request_lsn,
pages,
ctx,
)
.instrument(span.clone())
.await;
assert_eq!(res.len(), npages);
res
},
span,
shard,
effective_request_lsn,
pages,
} => {
fail::fail_point!("ps::handle-pagerequest-message::getpage");
(
{
let npages = pages.len();
trace!(npages, "handling getpage request");
let res = self
.handle_get_page_at_lsn_request_batched(
&shard,
effective_request_lsn,
pages,
ctx,
)
.instrument(span.clone())
.await;
assert_eq!(res.len(), npages);
res
},
span,
)
}
BatchedFeMessage::DbSize { span, shard, req } => {
fail::fail_point!("ps::handle-pagerequest-message::dbsize");
(
vec![
self.handle_db_size_request(&shard, &req, ctx)
.instrument(span.clone())
.await,
],
span,
)
}
BatchedFeMessage::GetSlruSegment { span, shard, req } => {
fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
(
vec![
self.handle_get_slru_segment_request(&shard, &req, ctx)
.instrument(span.clone())
.await,
],
span,
)
}
BatchedFeMessage::RespondError { span, error } => {
// We've already decided to respond with an error, so we don't need to
// call the handler.
(vec![Err(error)], span)
}
};
)
}
BatchedFeMessage::DbSize {
span,
timer,
shard,
req,
} => {
fail::fail_point!("ps::handle-pagerequest-message::dbsize");
(
vec![self
.handle_db_size_request(&shard, &req, ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer))],
span,
)
}
BatchedFeMessage::GetSlruSegment {
span,
timer,
shard,
req,
} => {
fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
(
vec![self
.handle_get_slru_segment_request(&shard, &req, ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer))],
span,
)
}
BatchedFeMessage::RespondError { span, error } => {
// We've already decided to respond with an error, so we don't need to
// call the handler.
(vec![Err(error)], span)
}
};
// Map handler result to protocol behavior.
// Some handler errors cause exit from pagestream protocol.
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
let mut timers: smallvec::SmallVec<[_; 1]> =
smallvec::SmallVec::with_capacity(handler_results.len());
for handler_result in handler_results {
let response_msg = match handler_result {
Err(e) => match &e {
@@ -980,7 +1014,10 @@ impl PageServerHandler {
})
}
},
Ok(response_msg) => response_msg,
Ok((response_msg, timer)) => {
timers.push(timer);
response_msg
}
};
// marshal & transmit response message
@@ -997,6 +1034,7 @@ impl PageServerHandler {
res?;
}
}
drop(timers);
Ok(())
}
@@ -1461,13 +1499,8 @@ impl PageServerHandler {
&mut self,
timeline: &Timeline,
req: &PagestreamExistsRequest,
received_at: Instant,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let _timer = timeline
.query_metrics
.start_timer_at(metrics::SmgrQueryType::GetRelExists, received_at, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
timeline,
@@ -1492,13 +1525,8 @@ impl PageServerHandler {
&mut self,
timeline: &Timeline,
req: &PagestreamNblocksRequest,
received_at: Instant,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let _timer = timeline
.query_metrics
.start_timer_at(metrics::SmgrQueryType::GetRelSize, received_at, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
timeline,
@@ -1523,15 +1551,8 @@ impl PageServerHandler {
&mut self,
timeline: &Timeline,
req: &PagestreamDbSizeRequest,
received_at: Instant,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let _timer = timeline.query_metrics.start_timer_at(
metrics::SmgrQueryType::GetDbSize,
received_at,
ctx,
);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
timeline,
@@ -1557,26 +1578,40 @@ impl PageServerHandler {
&mut self,
timeline: &Timeline,
effective_lsn: Lsn,
pages: smallvec::SmallVec<[(RelTag, BlockNumber, Instant); 1]>,
requests: smallvec::SmallVec<
[(RelTag, BlockNumber, GlobalAndPerTimelineHistogramTimer); 1],
>,
ctx: &RequestContext,
) -> Vec<Result<PagestreamBeMessage, PageStreamError>> {
) -> Vec<Result<(PagestreamBeMessage, GlobalAndPerTimelineHistogramTimer), PageStreamError>>
{
debug_assert_current_span_has_tenant_and_timeline_id();
let _timer = timeline.query_metrics.start_timer_many(
metrics::SmgrQueryType::GetPageAtLsn,
pages.len(),
ctx,
);
let pages = timeline
.get_rel_page_at_lsn_batched(pages, effective_lsn, ctx)
let results = timeline
.get_rel_page_at_lsn_batched(
requests.iter().map(|(reltag, blkno, _)| (reltag, blkno)),
effective_lsn,
ctx,
)
.await;
assert_eq!(results.len(), requests.len());
Vec::from_iter(pages.into_iter().map(|page| {
page.map(|page| {
PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse { page })
})
.map_err(PageStreamError::from)
}))
// TODO: avoid creating the new Vec here
Vec::from_iter(
requests
.into_iter()
.zip(results.into_iter())
.map(|((_, _, timer), res)| {
res.map(|page| {
(
PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse {
page,
}),
timer,
)
})
.map_err(PageStreamError::from)
}),
)
}
#[instrument(skip_all, fields(shard_id))]
@@ -1586,10 +1621,6 @@ impl PageServerHandler {
req: &PagestreamGetSlruSegmentRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
timeline,

View File

@@ -203,9 +203,13 @@ impl Timeline {
) -> Result<Bytes, PageReconstructError> {
match version {
Version::Lsn(effective_lsn) => {
let pages = smallvec::smallvec![(tag, blknum)];
let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
let res = self
.get_rel_page_at_lsn_batched(&pages, effective_lsn, ctx)
.get_rel_page_at_lsn_batched(
pages.iter().map(|(tag, blknum)| (tag, blknum)),
effective_lsn,
ctx,
)
.await;
assert_eq!(res.len(), 1);
res.into_iter().next().unwrap()
@@ -240,7 +244,7 @@ impl Timeline {
/// The ordering of the returned vec corresponds to the ordering of `pages`.
pub(crate) async fn get_rel_page_at_lsn_batched(
&self,
pages: &smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
pages: impl Iterator<Item = (&RelTag, &BlockNumber)> + ExactSizeIterator,
effective_lsn: Lsn,
ctx: &RequestContext,
) -> Vec<Result<Bytes, PageReconstructError>> {
@@ -254,7 +258,7 @@ impl Timeline {
let result_slots = result.spare_capacity_mut();
let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[usize; 1]>> = BTreeMap::default();
for (response_slot_idx, (tag, blknum)) in pages.iter().enumerate() {
for (response_slot_idx, (tag, blknum)) in pages.enumerate() {
if tag.relnode == 0 {
result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),