mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 02:20:42 +00:00
page_service: don't count time spent flushing towards smgr latency metrics (#10042)
## Problem In #9962 I changed the smgr metrics to include time spent on flush. It isn't under our (=storage team's) control how long that flush takes because the client can stop reading requests. ## Summary of changes Stop the timer as soon as we've buffered up the response in the `pgb_writer`. Track flush time in a separate metric. --------- Co-authored-by: Yuchen Liang <70461588+yliang412@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
b1fd086c0c
commit
4d7111f240
@@ -1223,31 +1223,60 @@ pub(crate) mod virtual_file_io_engine {
|
||||
});
|
||||
}
|
||||
|
||||
pub(crate) struct SmgrOpTimer {
|
||||
pub(crate) struct SmgrOpTimer(Option<SmgrOpTimerInner>);
|
||||
pub(crate) struct SmgrOpTimerInner {
|
||||
global_latency_histo: Histogram,
|
||||
|
||||
// Optional because not all op types are tracked per-timeline
|
||||
per_timeline_latency_histo: Option<Histogram>,
|
||||
|
||||
global_flush_in_progress_micros: IntCounter,
|
||||
per_timeline_flush_in_progress_micros: IntCounter,
|
||||
|
||||
start: Instant,
|
||||
throttled: Duration,
|
||||
op: SmgrQueryType,
|
||||
}
|
||||
|
||||
pub(crate) struct SmgrOpFlushInProgress {
|
||||
base: Instant,
|
||||
global_micros: IntCounter,
|
||||
per_timeline_micros: IntCounter,
|
||||
}
|
||||
|
||||
impl SmgrOpTimer {
|
||||
pub(crate) fn deduct_throttle(&mut self, throttle: &Option<Duration>) {
|
||||
let Some(throttle) = throttle else {
|
||||
return;
|
||||
};
|
||||
self.throttled += *throttle;
|
||||
let inner = self.0.as_mut().expect("other public methods consume self");
|
||||
inner.throttled += *throttle;
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for SmgrOpTimer {
|
||||
fn drop(&mut self) {
|
||||
let elapsed = self.start.elapsed();
|
||||
pub(crate) fn observe_smgr_op_completion_and_start_flushing(mut self) -> SmgrOpFlushInProgress {
|
||||
let (flush_start, inner) = self
|
||||
.smgr_op_end()
|
||||
.expect("this method consume self, and the only other caller is drop handler");
|
||||
let SmgrOpTimerInner {
|
||||
global_flush_in_progress_micros,
|
||||
per_timeline_flush_in_progress_micros,
|
||||
..
|
||||
} = inner;
|
||||
SmgrOpFlushInProgress {
|
||||
base: flush_start,
|
||||
global_micros: global_flush_in_progress_micros,
|
||||
per_timeline_micros: per_timeline_flush_in_progress_micros,
|
||||
}
|
||||
}
|
||||
|
||||
let elapsed = match elapsed.checked_sub(self.throttled) {
|
||||
/// Returns `None`` if this method has already been called, `Some` otherwise.
|
||||
fn smgr_op_end(&mut self) -> Option<(Instant, SmgrOpTimerInner)> {
|
||||
let inner = self.0.take()?;
|
||||
|
||||
let now = Instant::now();
|
||||
let elapsed = now - inner.start;
|
||||
|
||||
let elapsed = match elapsed.checked_sub(inner.throttled) {
|
||||
Some(elapsed) => elapsed,
|
||||
None => {
|
||||
use utils::rate_limit::RateLimit;
|
||||
@@ -1258,9 +1287,9 @@ impl Drop for SmgrOpTimer {
|
||||
})))
|
||||
});
|
||||
let mut guard = LOGGED.lock().unwrap();
|
||||
let rate_limit = &mut guard[self.op];
|
||||
let rate_limit = &mut guard[inner.op];
|
||||
rate_limit.call(|| {
|
||||
warn!(op=?self.op, ?elapsed, ?self.throttled, "implementation error: time spent throttled exceeds total request wall clock time");
|
||||
warn!(op=?inner.op, ?elapsed, ?inner.throttled, "implementation error: time spent throttled exceeds total request wall clock time");
|
||||
});
|
||||
elapsed // un-throttled time, more info than just saturating to 0
|
||||
}
|
||||
@@ -1268,10 +1297,54 @@ impl Drop for SmgrOpTimer {
|
||||
|
||||
let elapsed = elapsed.as_secs_f64();
|
||||
|
||||
self.global_latency_histo.observe(elapsed);
|
||||
if let Some(per_timeline_getpage_histo) = &self.per_timeline_latency_histo {
|
||||
inner.global_latency_histo.observe(elapsed);
|
||||
if let Some(per_timeline_getpage_histo) = &inner.per_timeline_latency_histo {
|
||||
per_timeline_getpage_histo.observe(elapsed);
|
||||
}
|
||||
|
||||
Some((now, inner))
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for SmgrOpTimer {
|
||||
fn drop(&mut self) {
|
||||
self.smgr_op_end();
|
||||
}
|
||||
}
|
||||
|
||||
impl SmgrOpFlushInProgress {
|
||||
pub(crate) async fn measure<Fut, O>(mut self, mut fut: Fut) -> O
|
||||
where
|
||||
Fut: std::future::Future<Output = O>,
|
||||
{
|
||||
let mut fut = std::pin::pin!(fut);
|
||||
|
||||
let now = Instant::now();
|
||||
// Whenever observe_guard gets called, or dropped,
|
||||
// it adds the time elapsed since its last call to metrics.
|
||||
// Last call is tracked in `now`.
|
||||
let mut observe_guard = scopeguard::guard(
|
||||
|| {
|
||||
let elapsed = now - self.base;
|
||||
self.global_micros
|
||||
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
|
||||
self.per_timeline_micros
|
||||
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
|
||||
self.base = now;
|
||||
},
|
||||
|mut observe| {
|
||||
observe();
|
||||
},
|
||||
);
|
||||
|
||||
loop {
|
||||
match tokio::time::timeout(Duration::from_secs(10), &mut fut).await {
|
||||
Ok(v) => return v,
|
||||
Err(_timeout) => {
|
||||
(*observe_guard)();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1302,6 +1375,8 @@ pub(crate) struct SmgrQueryTimePerTimeline {
|
||||
per_timeline_getpage_latency: Histogram,
|
||||
global_batch_size: Histogram,
|
||||
per_timeline_batch_size: Histogram,
|
||||
global_flush_in_progress_micros: IntCounter,
|
||||
per_timeline_flush_in_progress_micros: IntCounter,
|
||||
}
|
||||
|
||||
static SMGR_QUERY_STARTED_GLOBAL: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
@@ -1464,6 +1539,26 @@ fn set_page_service_config_max_batch_size(conf: &PageServicePipeliningConfig) {
|
||||
.set(value.try_into().unwrap());
|
||||
}
|
||||
|
||||
static PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_page_service_pagestream_flush_in_progress_micros",
|
||||
"Counter that sums up the microseconds that a pagestream response was being flushed into the TCP connection. \
|
||||
If the flush is particularly slow, this counter will be updated periodically to make slow flushes \
|
||||
easily discoverable in monitoring. \
|
||||
Hence, this is NOT a completion latency historgram.",
|
||||
&["tenant_id", "shard_id", "timeline_id"],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_page_service_pagestream_flush_in_progress_micros_global",
|
||||
"Like pageserver_page_service_pagestream_flush_in_progress_seconds, but instance-wide.",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
impl SmgrQueryTimePerTimeline {
|
||||
pub(crate) fn new(tenant_shard_id: &TenantShardId, timeline_id: &TimelineId) -> Self {
|
||||
let tenant_id = tenant_shard_id.tenant_id.to_string();
|
||||
@@ -1504,6 +1599,12 @@ impl SmgrQueryTimePerTimeline {
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id])
|
||||
.unwrap();
|
||||
|
||||
let global_flush_in_progress_micros =
|
||||
PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL.clone();
|
||||
let per_timeline_flush_in_progress_micros = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id])
|
||||
.unwrap();
|
||||
|
||||
Self {
|
||||
global_started,
|
||||
global_latency,
|
||||
@@ -1511,6 +1612,8 @@ impl SmgrQueryTimePerTimeline {
|
||||
per_timeline_getpage_started,
|
||||
global_batch_size,
|
||||
per_timeline_batch_size,
|
||||
global_flush_in_progress_micros,
|
||||
per_timeline_flush_in_progress_micros,
|
||||
}
|
||||
}
|
||||
pub(crate) fn start_smgr_op(&self, op: SmgrQueryType, started_at: Instant) -> SmgrOpTimer {
|
||||
@@ -1523,13 +1626,17 @@ impl SmgrQueryTimePerTimeline {
|
||||
None
|
||||
};
|
||||
|
||||
SmgrOpTimer {
|
||||
SmgrOpTimer(Some(SmgrOpTimerInner {
|
||||
global_latency_histo: self.global_latency[op as usize].clone(),
|
||||
per_timeline_latency_histo,
|
||||
start: started_at,
|
||||
op,
|
||||
throttled: Duration::ZERO,
|
||||
}
|
||||
global_flush_in_progress_micros: self.global_flush_in_progress_micros.clone(),
|
||||
per_timeline_flush_in_progress_micros: self
|
||||
.per_timeline_flush_in_progress_micros
|
||||
.clone(),
|
||||
}))
|
||||
}
|
||||
|
||||
pub(crate) fn observe_getpage_batch_start(&self, batch_size: usize) {
|
||||
@@ -2777,6 +2884,11 @@ impl TimelineMetrics {
|
||||
shard_id,
|
||||
timeline_id,
|
||||
]);
|
||||
let _ = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS.remove_label_values(&[
|
||||
tenant_id,
|
||||
shard_id,
|
||||
timeline_id,
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1017,10 +1017,8 @@ impl PageServerHandler {
|
||||
// Map handler result to protocol behavior.
|
||||
// Some handler errors cause exit from pagestream protocol.
|
||||
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
|
||||
let mut timers: smallvec::SmallVec<[_; 1]> =
|
||||
smallvec::SmallVec::with_capacity(handler_results.len());
|
||||
for handler_result in handler_results {
|
||||
let response_msg = match handler_result {
|
||||
let (response_msg, timer) = match handler_result {
|
||||
Err(e) => match &e {
|
||||
PageStreamError::Shutdown => {
|
||||
// If we fail to fulfil a request during shutdown, which may be _because_ of
|
||||
@@ -1044,34 +1042,66 @@ impl PageServerHandler {
|
||||
span.in_scope(|| {
|
||||
error!("error reading relation or page version: {full:#}")
|
||||
});
|
||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
message: e.to_string(),
|
||||
})
|
||||
(
|
||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
message: e.to_string(),
|
||||
}),
|
||||
None, // TODO: measure errors
|
||||
)
|
||||
}
|
||||
},
|
||||
Ok((response_msg, timer)) => {
|
||||
// Extending the lifetime of the timers so observations on drop
|
||||
// include the flush time.
|
||||
timers.push(timer);
|
||||
response_msg
|
||||
}
|
||||
Ok((response_msg, timer)) => (response_msg, Some(timer)),
|
||||
};
|
||||
|
||||
//
|
||||
// marshal & transmit response message
|
||||
//
|
||||
|
||||
pgb_writer.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
|
||||
}
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = cancel.cancelled() => {
|
||||
// We were requested to shut down.
|
||||
info!("shutdown request received in page handler");
|
||||
return Err(QueryError::Shutdown)
|
||||
}
|
||||
res = pgb_writer.flush() => {
|
||||
res?;
|
||||
|
||||
// We purposefully don't count flush time into the timer.
|
||||
//
|
||||
// The reason is that current compute client will not perform protocol processing
|
||||
// if the postgres backend process is doing things other than `->smgr_read()`.
|
||||
// This is especially the case for prefetch.
|
||||
//
|
||||
// If the compute doesn't read from the connection, eventually TCP will backpressure
|
||||
// all the way into our flush call below.
|
||||
//
|
||||
// The timer's underlying metric is used for a storage-internal latency SLO and
|
||||
// we don't want to include latency in it that we can't control.
|
||||
// And as pointed out above, in this case, we don't control the time that flush will take.
|
||||
let flushing_timer =
|
||||
timer.map(|timer| timer.observe_smgr_op_completion_and_start_flushing());
|
||||
|
||||
// what we want to do
|
||||
let flush_fut = pgb_writer.flush();
|
||||
// metric for how long flushing takes
|
||||
let flush_fut = match flushing_timer {
|
||||
Some(flushing_timer) => {
|
||||
futures::future::Either::Left(flushing_timer.measure(flush_fut))
|
||||
}
|
||||
None => futures::future::Either::Right(flush_fut),
|
||||
};
|
||||
// do it while respecting cancellation
|
||||
let _: () = async move {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = cancel.cancelled() => {
|
||||
// We were requested to shut down.
|
||||
info!("shutdown request received in page handler");
|
||||
return Err(QueryError::Shutdown)
|
||||
}
|
||||
res = flush_fut => {
|
||||
res?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
// and log the info! line inside the request span
|
||||
.instrument(span.clone())
|
||||
.await?;
|
||||
}
|
||||
drop(timers);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -176,6 +176,7 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
|
||||
counter("pageserver_tenant_throttling_wait_usecs_sum"),
|
||||
counter("pageserver_tenant_throttling_count"),
|
||||
counter("pageserver_timeline_wal_records_received"),
|
||||
counter("pageserver_page_service_pagestream_flush_in_progress_micros"),
|
||||
*histogram("pageserver_page_service_batch_size"),
|
||||
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
|
||||
# "pageserver_directory_entries_count", -- only used if above a certain threshold
|
||||
|
||||
Reference in New Issue
Block a user