port forward dd722fdaf6 (flush recording arcs outside SmgrOpTimerInner)

This commit is contained in:
Christian Schwarz
2025-02-07 11:47:40 +01:00
parent 6e1cbce715
commit 8522f429eb
3 changed files with 82 additions and 93 deletions

View File

@@ -1253,9 +1253,6 @@ pub(crate) struct SmgrOpTimerInner {
global_batch_wait_time: Histogram,
per_timeline_batch_wait_time: Histogram,
global_flush_in_progress_micros: IntCounter,
per_timeline_flush_in_progress_micros: IntCounter,
throttling: Arc<tenant_throttling::Pagestream>,
timings: SmgrOpTimerState,
@@ -1366,20 +1363,18 @@ impl SmgrOpTimer {
/// The first callers receives Some, subsequent ones None.
///
/// See [`SmgrOpTimerState`] for more context.
pub(crate) fn observe_execution_end_flush_start(
&mut self,
at: Instant,
) -> Option<SmgrOpFlushInProgress> {
pub(crate) fn observe_execution_end_flush_start(&mut self, at: Instant) {
// NB: unlike the other observe_* methods, this one take()s.
#[allow(clippy::question_mark)] // maintain similar code pattern.
let Some(mut inner) = self.0.take() else {
return None;
// NB: this take() isn't needed anymore, maybe we can simplify
return;
};
let SmgrOpTimerState::Executing {
execution_started_at,
} = &inner.timings
else {
return None;
return;
};
// update metrics
let execution = at - *execution_started_at;
@@ -1394,36 +1389,9 @@ impl SmgrOpTimer {
// state transition
inner.timings = SmgrOpTimerState::Flushing;
// return the flush in progress object which
// will do the remaining metrics updates
let SmgrOpTimerInner {
global_flush_in_progress_micros,
per_timeline_flush_in_progress_micros,
..
} = inner;
Some(SmgrOpFlushInProgress {
flush_started_at: at,
global_micros: global_flush_in_progress_micros,
per_timeline_micros: per_timeline_flush_in_progress_micros,
})
}
}
/// The last stage of request processing is serializing and flushing the request
/// into the TCP connection. We want to make slow flushes observable
/// _while they are occuring_, so this struct provides a wrapper method [`Self::measure`]
/// to periodically bump the metric.
///
/// If in the future we decide that we're not interested in live updates, we can
/// add another `observe_*` method to [`SmgrOpTimer`], follow the existing pattern there,
/// and remove this struct from the code base.
pub(crate) struct SmgrOpFlushInProgress {
flush_started_at: Instant,
global_micros: IntCounter,
per_timeline_micros: IntCounter,
}
impl Drop for SmgrOpTimer {
fn drop(&mut self) {
// In case of early drop, update any of the remaining metrics with
@@ -1442,42 +1410,6 @@ impl Drop for SmgrOpTimer {
}
}
impl SmgrOpFlushInProgress {
pub(crate) async fn measure<Fut, O>(mut self, mut fut: Fut) -> O
where
Fut: std::future::Future<Output = O>,
{
let mut fut = std::pin::pin!(fut);
// Whenever observe_guard gets called, or dropped,
// it adds the time elapsed since its last call to metrics.
// Last call is tracked in `now`.
let mut observe_guard = scopeguard::guard(
|| {
let now = Instant::now();
let elapsed = now - self.flush_started_at;
self.global_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
self.per_timeline_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
self.flush_started_at = now;
},
|mut observe| {
observe();
},
);
loop {
match tokio::time::timeout(Duration::from_secs(10), &mut fut).await {
Ok(v) => return v,
Err(_timeout) => {
(*observe_guard)();
}
}
}
}
}
#[derive(
Debug,
Clone,
@@ -1513,6 +1445,56 @@ pub(crate) struct SmgrQueryTimePerTimeline {
throttling: Arc<tenant_throttling::Pagestream>,
}
impl SmgrQueryTimePerTimeline {
pub(crate) async fn record_flush_in_progress<Fut, O>(
shard: &crate::tenant::timeline::handle::WeakHandle<
crate::page_service::TenantManagerTypes,
>,
start_at: Instant,
mut fut: Fut,
) -> O
where
Fut: std::future::Future<Output = O>,
{
let mut fut = std::pin::pin!(fut);
// Whenever observe_guard gets called, or dropped,
// it adds the time elapsed since its last call to metrics.
// Last call is tracked in `now`.
let mut base = start_at;
let mut observe_guard = scopeguard::guard(
|| {
let Ok(upgraded) = shard.upgrade() else {
return;
};
let now = Instant::now();
let elapsed = now - base;
upgraded
.query_metrics
.global_flush_in_progress_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
upgraded
.query_metrics
.per_timeline_flush_in_progress_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
base = now;
},
|mut observe| {
observe();
},
);
loop {
match tokio::time::timeout(Duration::from_secs(10), &mut fut).await {
Ok(v) => return v,
Err(_timeout) => {
(*observe_guard)();
}
}
}
}
}
static SMGR_QUERY_STARTED_GLOBAL: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
// it's a counter, but, name is prepared to extend it to a histogram of queue depth
@@ -1797,10 +1779,6 @@ impl SmgrQueryTimePerTimeline {
SmgrOpTimer(Some(SmgrOpTimerInner {
global_execution_latency_histo: self.global_latency[op as usize].clone(),
per_timeline_execution_latency_histo: per_timeline_latency_histo,
global_flush_in_progress_micros: self.global_flush_in_progress_micros.clone(),
per_timeline_flush_in_progress_micros: self
.per_timeline_flush_in_progress_micros
.clone(),
global_batch_wait_time: self.global_batch_wait_time.clone(),
per_timeline_batch_wait_time: self.per_timeline_batch_wait_time.clone(),
throttling: self.throttling.clone(),

View File

@@ -1063,9 +1063,10 @@ impl PageServerHandler {
};
// invoke handler function
let (handler_results, span): (
let (handler_results, span, shard): (
Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>>,
_,
_,
) = match batch {
BatchedFeMessage::Exists {
span,
@@ -1082,6 +1083,7 @@ impl PageServerHandler {
.map(|msg| (msg, timer))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
span,
Some(shard),
)
}
BatchedFeMessage::Nblocks {
@@ -1099,6 +1101,7 @@ impl PageServerHandler {
.map(|msg| (msg, timer))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
span,
Some(shard),
)
}
BatchedFeMessage::GetPage {
@@ -1126,6 +1129,7 @@ impl PageServerHandler {
res
},
span,
Some(shard),
)
}
BatchedFeMessage::DbSize {
@@ -1143,6 +1147,7 @@ impl PageServerHandler {
.map(|msg| (msg, timer))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
span,
Some(shard),
)
}
BatchedFeMessage::GetSlruSegment {
@@ -1160,6 +1165,7 @@ impl PageServerHandler {
.map(|msg| (msg, timer))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
span,
Some(shard),
)
}
#[cfg(feature = "testing")]
@@ -1181,12 +1187,13 @@ impl PageServerHandler {
res
},
span,
Some(shard),
)
}
BatchedFeMessage::RespondError { span, error } => {
// We've already decided to respond with an error, so we don't need to
// call the handler.
(vec![Err(error)], span)
(vec![Err(error)], span, None)
}
};
@@ -1194,7 +1201,7 @@ impl PageServerHandler {
// Some handler errors cause exit from pagestream protocol.
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
for handler_result in handler_results {
let (response_msg, timer) = match handler_result {
let (response_msg, mut timer) = match handler_result {
Err(e) => match &e.err {
PageStreamError::Shutdown => {
// If we fail to fulfil a request during shutdown, which may be _because_ of
@@ -1250,20 +1257,24 @@ impl PageServerHandler {
// The timer's underlying metric is used for a storage-internal latency SLO and
// we don't want to include latency in it that we can't control.
// And as pointed out above, in this case, we don't control the time that flush will take.
let flushing_timer = timer.map(|mut timer| {
timer
.observe_execution_end_flush_start(Instant::now())
.expect("we are the first caller")
});
let start_flushing_at = Instant::now();
if let Some(timer) = &mut timer {
timer.observe_execution_end_flush_start(start_flushing_at);
}
// what we want to do
let flush_fut = pgb_writer.flush();
// metric for how long flushing takes
let flush_fut = match flushing_timer {
Some(flushing_timer) => {
futures::future::Either::Left(flushing_timer.measure(flush_fut))
}
None => futures::future::Either::Right(flush_fut),
let flush_fut = if let Some(shard) = &shard {
// don't hold upgraded handle while flushing!
futures::future::Either::Left(
metrics::SmgrQueryTimePerTimeline::record_flush_in_progress(
shard,
start_flushing_at,
flush_fut,
),
)
} else {
futures::future::Either::Right(flush_fut)
};
// do it while respecting cancellation
let _: () = async move {

View File

@@ -326,7 +326,7 @@ pub struct Timeline {
// `Timeline` doesn't write these metrics itself, but it manages the lifetime. Code
// in `crate::page_service` writes these metrics.
pub(crate) query_metrics: crate::metrics::SmgrQueryTimePerTimeline,
pub(crate) query_metrics: Arc<crate::metrics::SmgrQueryTimePerTimeline>,
directory_metrics: [AtomicU64; DirectoryKind::KINDS_NUM],
@@ -2517,11 +2517,11 @@ impl Timeline {
metrics,
query_metrics: crate::metrics::SmgrQueryTimePerTimeline::new(
query_metrics: Arc::new(crate::metrics::SmgrQueryTimePerTimeline::new(
&tenant_shard_id,
&timeline_id,
resources.pagestream_throttle_metrics,
),
)),
directory_metrics: array::from_fn(|_| AtomicU64::new(0)),