Compare commits

...

20 Commits

Author SHA1 Message Date
Christian Schwarz
5c7cb815c3 bring back instrument() 2025-02-07 03:23:59 +01:00
Christian Schwarz
dd722fdaf6 bring back flush in progress recording 2025-02-07 03:22:09 +01:00
Christian Schwarz
0f6d3429cd Revert "undo that last piece"
This reverts commit e3481bfcae.
2025-02-07 03:22:09 +01:00
Christian Schwarz
2052b8b98d make shard available during flush 2025-02-07 03:11:03 +01:00
Christian Schwarz
e3481bfcae undo that last piece 2025-02-07 03:02:38 +01:00
Christian Schwarz
0dd2e0d744 don't haul those useless arcs around 2025-02-07 02:48:53 +01:00
Christian Schwarz
abef267bab avoid one arc counter incing 2025-02-07 02:42:37 +01:00
Christian Schwarz
89838e46bf just not instrumented, validate that again, should be equivalent with f9528ec980 2025-02-07 02:30:15 +01:00
Christian Schwarz
47c4c33e0e don't instrument, now this is equivalent to 0cd4e120da 2025-02-07 02:18:41 +01:00
Christian Schwarz
1ce78b39fe maybe it's the match statement? remove it 2025-02-07 02:13:25 +01:00
Christian Schwarz
43c47e684f preserve the switch but don't use .measure() 2025-02-07 02:07:29 +01:00
Christian Schwarz
c89defed85 avoid futures::Either 2025-02-07 02:00:41 +01:00
Christian Schwarz
6e51750e05 Revert "don't update the counters at all, forgot that those are still in there"
This reverts commit 5af0f228ed.
2025-02-07 02:00:09 +01:00
Christian Schwarz
e59422eb77 Revert "none of the previous ones was it; see if it's scopeguard before trying work around Either"
This reverts commit a67b822c24.
2025-02-07 01:58:02 +01:00
Christian Schwarz
a67b822c24 none of the previous ones was it; see if it's scopeguard before trying work around Either 2025-02-07 01:56:11 +01:00
Christian Schwarz
99cc5323b6 Revert "not mergeable: don't do tokio::time::timeout() for flush seconds counter"
This reverts commit b89bd691f6.
2025-02-07 01:51:26 +01:00
Christian Schwarz
9efaeb871a Revert "prev was still slow, what if additionally I inline; if it's still slow, it's the futures::Either"
This reverts commit fe9417c98c.
2025-02-07 01:51:16 +01:00
Christian Schwarz
5af0f228ed don't update the counters at all, forgot that those are still in there 2025-02-07 01:46:37 +01:00
Christian Schwarz
fe9417c98c prev was still slow, what if additionally I inline; if it's still slow, it's the futures::Either 2025-02-07 01:39:23 +01:00
Christian Schwarz
b89bd691f6 not mergeable: don't do tokio::time::timeout() for flush seconds counter 2025-02-07 01:14:01 +01:00
2 changed files with 63 additions and 70 deletions

View File

@@ -1230,20 +1230,11 @@ pub(crate) struct SmgrOpTimerInner {
// Optional because not all op types are tracked per-timeline
per_timeline_latency_histo: Option<Histogram>,
global_flush_in_progress_micros: IntCounter,
per_timeline_flush_in_progress_micros: IntCounter,
start: Instant,
throttled: Duration,
op: SmgrQueryType,
}
pub(crate) struct SmgrOpFlushInProgress {
base: Instant,
global_micros: IntCounter,
per_timeline_micros: IntCounter,
}
impl SmgrOpTimer {
pub(crate) fn deduct_throttle(&mut self, throttle: &Option<Duration>) {
let Some(throttle) = throttle else {
@@ -1253,20 +1244,11 @@ impl SmgrOpTimer {
inner.throttled += *throttle;
}
pub(crate) fn observe_smgr_op_completion_and_start_flushing(mut self) -> SmgrOpFlushInProgress {
pub(crate) fn observe_smgr_op_completion_and_start_flushing(mut self) -> Instant {
let (flush_start, inner) = self
.smgr_op_end()
.expect("this method consume self, and the only other caller is drop handler");
let SmgrOpTimerInner {
global_flush_in_progress_micros,
per_timeline_flush_in_progress_micros,
..
} = inner;
SmgrOpFlushInProgress {
base: flush_start,
global_micros: global_flush_in_progress_micros,
per_timeline_micros: per_timeline_flush_in_progress_micros,
}
flush_start
}
/// Returns `None`` if this method has already been called, `Some` otherwise.
@@ -1312,42 +1294,6 @@ impl Drop for SmgrOpTimer {
}
}
impl SmgrOpFlushInProgress {
pub(crate) async fn measure<Fut, O>(mut self, mut fut: Fut) -> O
where
Fut: std::future::Future<Output = O>,
{
let mut fut = std::pin::pin!(fut);
let now = Instant::now();
// Whenever observe_guard gets called, or dropped,
// it adds the time elapsed since its last call to metrics.
// Last call is tracked in `now`.
let mut observe_guard = scopeguard::guard(
|| {
let elapsed = now - self.base;
self.global_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
self.per_timeline_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
self.base = now;
},
|mut observe| {
observe();
},
);
loop {
match tokio::time::timeout(Duration::from_secs(10), &mut fut).await {
Ok(v) => return v,
Err(_timeout) => {
(*observe_guard)();
}
}
}
}
}
#[derive(
Debug,
Clone,
@@ -1379,6 +1325,47 @@ pub(crate) struct SmgrQueryTimePerTimeline {
per_timeline_flush_in_progress_micros: IntCounter,
}
impl SmgrQueryTimePerTimeline {
pub(crate) async fn record_flush_in_progress<Fut, O>(
&self,
start_at: Instant,
mut fut: Fut,
) -> O
where
Fut: std::future::Future<Output = O>,
{
let mut fut = std::pin::pin!(fut);
// Whenever observe_guard gets called, or dropped,
// it adds the time elapsed since its last call to metrics.
// Last call is tracked in `now`.
let mut base = start_at;
let mut observe_guard = scopeguard::guard(
|| {
let now = Instant::now();
let elapsed = now - base;
self.global_flush_in_progress_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
self.per_timeline_flush_in_progress_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
base = now;
},
|mut observe| {
observe();
},
);
loop {
match tokio::time::timeout(Duration::from_secs(10), &mut fut).await {
Ok(v) => return v,
Err(_timeout) => {
(*observe_guard)();
}
}
}
}
}
static SMGR_QUERY_STARTED_GLOBAL: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
// it's a counter, but, name is prepared to extend it to a histogram of queue depth
@@ -1632,10 +1619,6 @@ impl SmgrQueryTimePerTimeline {
start: started_at,
op,
throttled: Duration::ZERO,
global_flush_in_progress_micros: self.global_flush_in_progress_micros.clone(),
per_timeline_flush_in_progress_micros: self
.per_timeline_flush_in_progress_micros
.clone(),
}))
}

View File

@@ -913,9 +913,10 @@ impl PageServerHandler {
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
// invoke handler function
let (handler_results, span): (
let (handler_results, span, shard): (
Vec<Result<(PagestreamBeMessage, SmgrOpTimer), PageStreamError>>,
_,
_,
) = match batch {
BatchedFeMessage::Exists {
span,
@@ -931,6 +932,7 @@ impl PageServerHandler {
.await
.map(|msg| (msg, timer))],
span,
Some(shard),
)
}
BatchedFeMessage::Nblocks {
@@ -947,6 +949,7 @@ impl PageServerHandler {
.await
.map(|msg| (msg, timer))],
span,
Some(shard),
)
}
BatchedFeMessage::GetPage {
@@ -973,6 +976,7 @@ impl PageServerHandler {
res
},
span,
Some(shard),
)
}
BatchedFeMessage::DbSize {
@@ -989,6 +993,7 @@ impl PageServerHandler {
.await
.map(|msg| (msg, timer))],
span,
Some(shard),
)
}
BatchedFeMessage::GetSlruSegment {
@@ -1005,12 +1010,13 @@ impl PageServerHandler {
.await
.map(|msg| (msg, timer))],
span,
Some(shard),
)
}
BatchedFeMessage::RespondError { span, error } => {
// We've already decided to respond with an error, so we don't need to
// call the handler.
(vec![Err(error)], span)
(vec![Err(error)], span, None)
}
};
@@ -1071,17 +1077,21 @@ impl PageServerHandler {
// The timer's underlying metric is used for a storage-internal latency SLO and
// we don't want to include latency in it that we can't control.
// And as pointed out above, in this case, we don't control the time that flush will take.
let flushing_timer =
timer.map(|timer| timer.observe_smgr_op_completion_and_start_flushing());
let start_flushing_at = match timer {
Some(timer) => timer.observe_smgr_op_completion_and_start_flushing(),
None => Instant::now(),
};
// what we want to do
let flush_fut = pgb_writer.flush();
// metric for how long flushing takes
let flush_fut = match flushing_timer {
Some(flushing_timer) => {
futures::future::Either::Left(flushing_timer.measure(flush_fut))
}
None => futures::future::Either::Right(flush_fut),
let flush_fut = if let Some(handle) = &shard {
futures::future::Either::Left(
handle
.query_metrics
.record_flush_in_progress(start_flushing_at, flush_fut),
)
} else {
futures::future::Either::Right(flush_fut)
};
// do it while respecting cancellation
let _: () = async move {