mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
bring back flush in progress recording
This commit is contained in:
@@ -1244,10 +1244,11 @@ impl SmgrOpTimer {
|
||||
inner.throttled += *throttle;
|
||||
}
|
||||
|
||||
pub(crate) fn observe_smgr_op_completion_and_start_flushing(mut self) {
|
||||
pub(crate) fn observe_smgr_op_completion_and_start_flushing(mut self) -> Instant {
|
||||
let (flush_start, inner) = self
|
||||
.smgr_op_end()
|
||||
.expect("this method consume self, and the only other caller is drop handler");
|
||||
flush_start
|
||||
}
|
||||
|
||||
/// Returns `None`` if this method has already been called, `Some` otherwise.
|
||||
@@ -1324,6 +1325,47 @@ pub(crate) struct SmgrQueryTimePerTimeline {
|
||||
per_timeline_flush_in_progress_micros: IntCounter,
|
||||
}
|
||||
|
||||
impl SmgrQueryTimePerTimeline {
|
||||
pub(crate) async fn record_flush_in_progress<Fut, O>(
|
||||
&self,
|
||||
start_at: Instant,
|
||||
mut fut: Fut,
|
||||
) -> O
|
||||
where
|
||||
Fut: std::future::Future<Output = O>,
|
||||
{
|
||||
let mut fut = std::pin::pin!(fut);
|
||||
|
||||
// Whenever observe_guard gets called, or dropped,
|
||||
// it adds the time elapsed since its last call to metrics.
|
||||
// Last call is tracked in `now`.
|
||||
let mut base = start_at;
|
||||
let mut observe_guard = scopeguard::guard(
|
||||
|| {
|
||||
let now = Instant::now();
|
||||
let elapsed = now - base;
|
||||
self.global_flush_in_progress_micros
|
||||
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
|
||||
self.per_timeline_flush_in_progress_micros
|
||||
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
|
||||
base = now;
|
||||
},
|
||||
|mut observe| {
|
||||
observe();
|
||||
},
|
||||
);
|
||||
|
||||
loop {
|
||||
match tokio::time::timeout(Duration::from_secs(10), &mut fut).await {
|
||||
Ok(v) => return v,
|
||||
Err(_timeout) => {
|
||||
(*observe_guard)();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static SMGR_QUERY_STARTED_GLOBAL: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
// it's a counter, but, name is prepared to extend it to a histogram of queue depth
|
||||
|
||||
@@ -1077,11 +1077,22 @@ impl PageServerHandler {
|
||||
// The timer's underlying metric is used for a storage-internal latency SLO and
|
||||
// we don't want to include latency in it that we can't control.
|
||||
// And as pointed out above, in this case, we don't control the time that flush will take.
|
||||
let flushing_timer =
|
||||
timer.map(|timer| timer.observe_smgr_op_completion_and_start_flushing());
|
||||
let start_flushing_at = match timer {
|
||||
Some(timer) => timer.observe_smgr_op_completion_and_start_flushing(),
|
||||
None => Instant::now(),
|
||||
};
|
||||
|
||||
// what we want to do
|
||||
let flush_fut = pgb_writer.flush();
|
||||
let flush_fut = if let Some(handle) = &shard {
|
||||
futures::future::Either::Left(
|
||||
handle
|
||||
.query_metrics
|
||||
.record_flush_in_progress(start_flushing_at, flush_fut),
|
||||
)
|
||||
} else {
|
||||
futures::future::Either::Right(flush_fut)
|
||||
};
|
||||
// do it while respecting cancellation
|
||||
let _: () = async move {
|
||||
tokio::select! {
|
||||
|
||||
Reference in New Issue
Block a user