mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 20:20:38 +00:00
Compare commits
20 Commits
use_debug_
...
problame/l
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5c7cb815c3 | ||
|
|
dd722fdaf6 | ||
|
|
0f6d3429cd | ||
|
|
2052b8b98d | ||
|
|
e3481bfcae | ||
|
|
0dd2e0d744 | ||
|
|
abef267bab | ||
|
|
89838e46bf | ||
|
|
47c4c33e0e | ||
|
|
1ce78b39fe | ||
|
|
43c47e684f | ||
|
|
c89defed85 | ||
|
|
6e51750e05 | ||
|
|
e59422eb77 | ||
|
|
a67b822c24 | ||
|
|
99cc5323b6 | ||
|
|
9efaeb871a | ||
|
|
5af0f228ed | ||
|
|
fe9417c98c | ||
|
|
b89bd691f6 |
@@ -1230,20 +1230,11 @@ pub(crate) struct SmgrOpTimerInner {
|
||||
// Optional because not all op types are tracked per-timeline
|
||||
per_timeline_latency_histo: Option<Histogram>,
|
||||
|
||||
global_flush_in_progress_micros: IntCounter,
|
||||
per_timeline_flush_in_progress_micros: IntCounter,
|
||||
|
||||
start: Instant,
|
||||
throttled: Duration,
|
||||
op: SmgrQueryType,
|
||||
}
|
||||
|
||||
pub(crate) struct SmgrOpFlushInProgress {
|
||||
base: Instant,
|
||||
global_micros: IntCounter,
|
||||
per_timeline_micros: IntCounter,
|
||||
}
|
||||
|
||||
impl SmgrOpTimer {
|
||||
pub(crate) fn deduct_throttle(&mut self, throttle: &Option<Duration>) {
|
||||
let Some(throttle) = throttle else {
|
||||
@@ -1253,20 +1244,11 @@ impl SmgrOpTimer {
|
||||
inner.throttled += *throttle;
|
||||
}
|
||||
|
||||
pub(crate) fn observe_smgr_op_completion_and_start_flushing(mut self) -> SmgrOpFlushInProgress {
|
||||
pub(crate) fn observe_smgr_op_completion_and_start_flushing(mut self) -> Instant {
|
||||
let (flush_start, inner) = self
|
||||
.smgr_op_end()
|
||||
.expect("this method consume self, and the only other caller is drop handler");
|
||||
let SmgrOpTimerInner {
|
||||
global_flush_in_progress_micros,
|
||||
per_timeline_flush_in_progress_micros,
|
||||
..
|
||||
} = inner;
|
||||
SmgrOpFlushInProgress {
|
||||
base: flush_start,
|
||||
global_micros: global_flush_in_progress_micros,
|
||||
per_timeline_micros: per_timeline_flush_in_progress_micros,
|
||||
}
|
||||
flush_start
|
||||
}
|
||||
|
||||
/// Returns `None`` if this method has already been called, `Some` otherwise.
|
||||
@@ -1312,42 +1294,6 @@ impl Drop for SmgrOpTimer {
|
||||
}
|
||||
}
|
||||
|
||||
impl SmgrOpFlushInProgress {
|
||||
pub(crate) async fn measure<Fut, O>(mut self, mut fut: Fut) -> O
|
||||
where
|
||||
Fut: std::future::Future<Output = O>,
|
||||
{
|
||||
let mut fut = std::pin::pin!(fut);
|
||||
|
||||
let now = Instant::now();
|
||||
// Whenever observe_guard gets called, or dropped,
|
||||
// it adds the time elapsed since its last call to metrics.
|
||||
// Last call is tracked in `now`.
|
||||
let mut observe_guard = scopeguard::guard(
|
||||
|| {
|
||||
let elapsed = now - self.base;
|
||||
self.global_micros
|
||||
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
|
||||
self.per_timeline_micros
|
||||
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
|
||||
self.base = now;
|
||||
},
|
||||
|mut observe| {
|
||||
observe();
|
||||
},
|
||||
);
|
||||
|
||||
loop {
|
||||
match tokio::time::timeout(Duration::from_secs(10), &mut fut).await {
|
||||
Ok(v) => return v,
|
||||
Err(_timeout) => {
|
||||
(*observe_guard)();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Debug,
|
||||
Clone,
|
||||
@@ -1379,6 +1325,47 @@ pub(crate) struct SmgrQueryTimePerTimeline {
|
||||
per_timeline_flush_in_progress_micros: IntCounter,
|
||||
}
|
||||
|
||||
impl SmgrQueryTimePerTimeline {
|
||||
pub(crate) async fn record_flush_in_progress<Fut, O>(
|
||||
&self,
|
||||
start_at: Instant,
|
||||
mut fut: Fut,
|
||||
) -> O
|
||||
where
|
||||
Fut: std::future::Future<Output = O>,
|
||||
{
|
||||
let mut fut = std::pin::pin!(fut);
|
||||
|
||||
// Whenever observe_guard gets called, or dropped,
|
||||
// it adds the time elapsed since its last call to metrics.
|
||||
// Last call is tracked in `now`.
|
||||
let mut base = start_at;
|
||||
let mut observe_guard = scopeguard::guard(
|
||||
|| {
|
||||
let now = Instant::now();
|
||||
let elapsed = now - base;
|
||||
self.global_flush_in_progress_micros
|
||||
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
|
||||
self.per_timeline_flush_in_progress_micros
|
||||
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
|
||||
base = now;
|
||||
},
|
||||
|mut observe| {
|
||||
observe();
|
||||
},
|
||||
);
|
||||
|
||||
loop {
|
||||
match tokio::time::timeout(Duration::from_secs(10), &mut fut).await {
|
||||
Ok(v) => return v,
|
||||
Err(_timeout) => {
|
||||
(*observe_guard)();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static SMGR_QUERY_STARTED_GLOBAL: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
// it's a counter, but, name is prepared to extend it to a histogram of queue depth
|
||||
@@ -1632,10 +1619,6 @@ impl SmgrQueryTimePerTimeline {
|
||||
start: started_at,
|
||||
op,
|
||||
throttled: Duration::ZERO,
|
||||
global_flush_in_progress_micros: self.global_flush_in_progress_micros.clone(),
|
||||
per_timeline_flush_in_progress_micros: self
|
||||
.per_timeline_flush_in_progress_micros
|
||||
.clone(),
|
||||
}))
|
||||
}
|
||||
|
||||
|
||||
@@ -913,9 +913,10 @@ impl PageServerHandler {
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
// invoke handler function
|
||||
let (handler_results, span): (
|
||||
let (handler_results, span, shard): (
|
||||
Vec<Result<(PagestreamBeMessage, SmgrOpTimer), PageStreamError>>,
|
||||
_,
|
||||
_,
|
||||
) = match batch {
|
||||
BatchedFeMessage::Exists {
|
||||
span,
|
||||
@@ -931,6 +932,7 @@ impl PageServerHandler {
|
||||
.await
|
||||
.map(|msg| (msg, timer))],
|
||||
span,
|
||||
Some(shard),
|
||||
)
|
||||
}
|
||||
BatchedFeMessage::Nblocks {
|
||||
@@ -947,6 +949,7 @@ impl PageServerHandler {
|
||||
.await
|
||||
.map(|msg| (msg, timer))],
|
||||
span,
|
||||
Some(shard),
|
||||
)
|
||||
}
|
||||
BatchedFeMessage::GetPage {
|
||||
@@ -973,6 +976,7 @@ impl PageServerHandler {
|
||||
res
|
||||
},
|
||||
span,
|
||||
Some(shard),
|
||||
)
|
||||
}
|
||||
BatchedFeMessage::DbSize {
|
||||
@@ -989,6 +993,7 @@ impl PageServerHandler {
|
||||
.await
|
||||
.map(|msg| (msg, timer))],
|
||||
span,
|
||||
Some(shard),
|
||||
)
|
||||
}
|
||||
BatchedFeMessage::GetSlruSegment {
|
||||
@@ -1005,12 +1010,13 @@ impl PageServerHandler {
|
||||
.await
|
||||
.map(|msg| (msg, timer))],
|
||||
span,
|
||||
Some(shard),
|
||||
)
|
||||
}
|
||||
BatchedFeMessage::RespondError { span, error } => {
|
||||
// We've already decided to respond with an error, so we don't need to
|
||||
// call the handler.
|
||||
(vec![Err(error)], span)
|
||||
(vec![Err(error)], span, None)
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1071,17 +1077,21 @@ impl PageServerHandler {
|
||||
// The timer's underlying metric is used for a storage-internal latency SLO and
|
||||
// we don't want to include latency in it that we can't control.
|
||||
// And as pointed out above, in this case, we don't control the time that flush will take.
|
||||
let flushing_timer =
|
||||
timer.map(|timer| timer.observe_smgr_op_completion_and_start_flushing());
|
||||
let start_flushing_at = match timer {
|
||||
Some(timer) => timer.observe_smgr_op_completion_and_start_flushing(),
|
||||
None => Instant::now(),
|
||||
};
|
||||
|
||||
// what we want to do
|
||||
let flush_fut = pgb_writer.flush();
|
||||
// metric for how long flushing takes
|
||||
let flush_fut = match flushing_timer {
|
||||
Some(flushing_timer) => {
|
||||
futures::future::Either::Left(flushing_timer.measure(flush_fut))
|
||||
}
|
||||
None => futures::future::Either::Right(flush_fut),
|
||||
let flush_fut = if let Some(handle) = &shard {
|
||||
futures::future::Either::Left(
|
||||
handle
|
||||
.query_metrics
|
||||
.record_flush_in_progress(start_flushing_at, flush_fut),
|
||||
)
|
||||
} else {
|
||||
futures::future::Either::Right(flush_fut)
|
||||
};
|
||||
// do it while respecting cancellation
|
||||
let _: () = async move {
|
||||
|
||||
Reference in New Issue
Block a user