mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-26 17:40:37 +00:00
wip: track read path stats in context
This commit is contained in:
@@ -86,6 +86,11 @@
|
||||
//! [`RequestContext`] argument. Functions in the middle of the call chain
|
||||
//! only need to pass it on.
|
||||
|
||||
use std::{
|
||||
sync::{atomic::AtomicU32, Arc},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use crate::task_mgr::TaskKind;
|
||||
|
||||
pub(crate) mod optional_counter;
|
||||
@@ -98,6 +103,73 @@ pub struct RequestContext {
|
||||
access_stats_behavior: AccessStatsBehavior,
|
||||
page_content_kind: PageContentKind,
|
||||
pub micros_spent_throttled: optional_counter::MicroSecondsCounterU32,
|
||||
pub read_path_stats: ReadPathStats,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct ReadPathStats {
|
||||
pub inner: Arc<ReadPathStatsInner>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ReadPathStatsInner {
|
||||
pub get_reconstruct_data_time: AtomicU32,
|
||||
pub plan_read_time: AtomicU32,
|
||||
pub read_time: AtomicU32,
|
||||
pub sort_reconstruct_data_time: AtomicU32,
|
||||
pub layers_visited: AtomicU32,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ReadPathStatsInner {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"get_reconstruct_data_time={} plan_read_time={} read_time={} sort_time={} layers_visited={}",
|
||||
self.get_reconstruct_data_time.load(std::sync::atomic::Ordering::Relaxed),
|
||||
self.plan_read_time
|
||||
.load(std::sync::atomic::Ordering::Relaxed),
|
||||
self.read_time.load(std::sync::atomic::Ordering::Relaxed),
|
||||
self.sort_reconstruct_data_time
|
||||
.load(std::sync::atomic::Ordering::Relaxed),
|
||||
self.layers_visited
|
||||
.load(std::sync::atomic::Ordering::Relaxed)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl ReadPathStatsInner {
|
||||
pub fn add_get_reconstruct_data_time(&self, dur: Duration) {
|
||||
self.get_reconstruct_data_time.fetch_add(
|
||||
dur.as_micros().try_into().unwrap(),
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn add_plan_read_time(&self, dur: Duration) {
|
||||
self.plan_read_time.fetch_add(
|
||||
dur.as_micros().try_into().unwrap(),
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn add_read_time(&self, dur: Duration) {
|
||||
self.plan_read_time.fetch_add(
|
||||
dur.as_micros().try_into().unwrap(),
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn add_sort_reconstruct_data_time(&self, dur: Duration) {
|
||||
self.sort_reconstruct_data_time.fetch_add(
|
||||
dur.as_micros().try_into().unwrap(),
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn inc_layer_visited(&self) {
|
||||
self.layers_visited
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
/// The kind of access to the page cache.
|
||||
@@ -154,6 +226,7 @@ impl RequestContextBuilder {
|
||||
access_stats_behavior: AccessStatsBehavior::Update,
|
||||
page_content_kind: PageContentKind::Unknown,
|
||||
micros_spent_throttled: Default::default(),
|
||||
read_path_stats: Default::default(),
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -168,6 +241,7 @@ impl RequestContextBuilder {
|
||||
access_stats_behavior: original.access_stats_behavior,
|
||||
page_content_kind: original.page_content_kind,
|
||||
micros_spent_throttled: Default::default(),
|
||||
read_path_stats: original.read_path_stats.clone(),
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -291,4 +365,8 @@ impl RequestContext {
|
||||
pub(crate) fn page_content_kind(&self) -> PageContentKind {
|
||||
self.page_content_kind
|
||||
}
|
||||
|
||||
pub fn report_stats(&self) {
|
||||
tracing::info!("Read path stats: {}", *self.read_path_stats.inner)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,6 +72,7 @@ impl MicroSecondsCounterU32 {
|
||||
Err(_) => Err("add(): duration conversion error"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn close_and_checked_sub_from(&self, from: Duration) -> Result<Duration, &'static str> {
|
||||
let val = self.inner.close()?;
|
||||
let val = Duration::from_micros(val as u64);
|
||||
|
||||
@@ -58,6 +58,7 @@ use std::io::SeekFrom;
|
||||
use std::ops::Range;
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use tokio::sync::OnceCell;
|
||||
use tracing::*;
|
||||
|
||||
@@ -755,6 +756,9 @@ impl DeltaLayerInner {
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
ctx.read_path_stats.inner.inc_layer_visited();
|
||||
let plan_timer = Instant::now();
|
||||
|
||||
let mut need_image = true;
|
||||
// Scan the page versions backwards, starting from `lsn`.
|
||||
let block_reader = FileBlockReader::new(&self.file, self.file_id);
|
||||
@@ -790,6 +794,12 @@ impl DeltaLayerInner {
|
||||
)
|
||||
.await?;
|
||||
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_plan_read_time(plan_timer.elapsed());
|
||||
|
||||
let read_timer = Instant::now();
|
||||
|
||||
let ctx = &RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerValue)
|
||||
.build();
|
||||
@@ -828,6 +838,10 @@ impl DeltaLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_read_time(read_timer.elapsed());
|
||||
|
||||
// If an older page image is needed to reconstruct the page, let the
|
||||
// caller know.
|
||||
if need_image {
|
||||
@@ -851,6 +865,9 @@ impl DeltaLayerInner {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
ctx.read_path_stats.inner.inc_layer_visited();
|
||||
let plan_timer = Instant::now();
|
||||
|
||||
let block_reader = FileBlockReader::new(&self.file, self.file_id);
|
||||
let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
self.index_start_blk,
|
||||
@@ -879,11 +896,21 @@ impl DeltaLayerInner {
|
||||
.await
|
||||
.map_err(GetVectoredError::Other)?;
|
||||
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_plan_read_time(plan_timer.elapsed());
|
||||
|
||||
let read_timer = Instant::now();
|
||||
|
||||
self.do_reads_and_update_state(reads, reconstruct_state)
|
||||
.await;
|
||||
|
||||
reconstruct_state.on_lsn_advanced(&keyspace, self.lsn_range.start);
|
||||
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_read_time(read_timer.elapsed());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -55,6 +55,7 @@ use std::io::SeekFrom;
|
||||
use std::ops::Range;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::*;
|
||||
@@ -422,6 +423,9 @@ impl ImageLayerInner {
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
ctx.read_path_stats.inner.inc_layer_visited();
|
||||
let plan_timer = Instant::now();
|
||||
|
||||
let block_reader = FileBlockReader::new(&self.file, self.file_id);
|
||||
let tree_reader =
|
||||
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
|
||||
@@ -437,6 +441,12 @@ impl ImageLayerInner {
|
||||
)
|
||||
.await?
|
||||
{
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_plan_read_time(plan_timer.elapsed());
|
||||
|
||||
let read_timer = Instant::now();
|
||||
|
||||
let blob = block_reader
|
||||
.block_cursor()
|
||||
.read_blob(
|
||||
@@ -450,6 +460,11 @@ impl ImageLayerInner {
|
||||
let value = Bytes::from(blob);
|
||||
|
||||
reconstruct_state.img = Some((self.lsn, value));
|
||||
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_read_time(read_timer.elapsed());
|
||||
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
} else {
|
||||
Ok(ValueReconstructResult::Missing)
|
||||
@@ -464,14 +479,27 @@ impl ImageLayerInner {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
ctx.read_path_stats.inner.inc_layer_visited();
|
||||
let plan_timer = Instant::now();
|
||||
|
||||
let reads = self
|
||||
.plan_reads(keyspace, ctx)
|
||||
.await
|
||||
.map_err(GetVectoredError::Other)?;
|
||||
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_plan_read_time(plan_timer.elapsed());
|
||||
|
||||
let read_timer = Instant::now();
|
||||
|
||||
self.do_reads_and_update_state(reads, reconstruct_state)
|
||||
.await;
|
||||
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_read_time(read_timer.elapsed());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -298,6 +298,9 @@ impl InMemoryLayer {
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
ctx.read_path_stats.inner.inc_layer_visited();
|
||||
let read_timer = Instant::now();
|
||||
|
||||
ensure!(lsn_range.start >= self.start_lsn);
|
||||
let mut need_image = true;
|
||||
|
||||
@@ -333,6 +336,10 @@ impl InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_read_time(read_timer.elapsed());
|
||||
|
||||
// release lock on 'inner'
|
||||
|
||||
// If an older page image is needed to reconstruct the page, let the
|
||||
@@ -355,6 +362,9 @@ impl InMemoryLayer {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
ctx.read_path_stats.inner.inc_layer_visited();
|
||||
let plan_timer = Instant::now();
|
||||
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::InMemoryLayer)
|
||||
.build();
|
||||
@@ -394,6 +404,12 @@ impl InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_plan_read_time(plan_timer.elapsed());
|
||||
|
||||
let read_timer = Instant::now();
|
||||
|
||||
let keyspace_size = keyspace.total_size();
|
||||
|
||||
let mut completed_keys = HashSet::new();
|
||||
@@ -428,6 +444,10 @@ impl InMemoryLayer {
|
||||
|
||||
reconstruct_state.on_lsn_advanced(&keyspace, self.start_lsn);
|
||||
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_read_time(read_timer.elapsed());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -739,7 +739,7 @@ impl Timeline {
|
||||
None => None,
|
||||
};
|
||||
|
||||
match self.conf.get_impl {
|
||||
let res = match self.conf.get_impl {
|
||||
GetImpl::Legacy => {
|
||||
let reconstruct_state = ValueReconstructState {
|
||||
records: Vec::new(),
|
||||
@@ -796,7 +796,13 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if key.to_i128() % 100 < 10 {
|
||||
ctx.report_stats();
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
/// Not subject to [`Self::timeline_get_throttle`].
|
||||
@@ -818,9 +824,13 @@ impl Timeline {
|
||||
let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME
|
||||
.for_get_kind(crate::metrics::GetKind::Singular)
|
||||
.start_timer();
|
||||
let get_reconstruct_data_timer = Instant::now();
|
||||
let path = self
|
||||
.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
|
||||
.await?;
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_get_reconstruct_data_time(get_reconstruct_data_timer.elapsed());
|
||||
timer.stop_and_record();
|
||||
|
||||
let start = Instant::now();
|
||||
@@ -1015,8 +1025,12 @@ impl Timeline {
|
||||
let get_data_timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME
|
||||
.for_get_kind(get_kind)
|
||||
.start_timer();
|
||||
let timer = Instant::now();
|
||||
self.get_vectored_reconstruct_data(keyspace, lsn, &mut reconstruct_state, ctx)
|
||||
.await?;
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_get_reconstruct_data_time(timer.elapsed());
|
||||
get_data_timer.stop_and_record();
|
||||
|
||||
let reconstruct_timer = crate::metrics::RECONSTRUCT_TIME
|
||||
@@ -1029,7 +1043,11 @@ impl Timeline {
|
||||
results.insert(key, Err(err));
|
||||
}
|
||||
Ok(state) => {
|
||||
let timer = Instant::now();
|
||||
let state = ValueReconstructState::from(state);
|
||||
ctx.read_path_stats
|
||||
.inner
|
||||
.add_sort_reconstruct_data_time(timer.elapsed());
|
||||
|
||||
let reconstruct_res = self.reconstruct_value(key, lsn, state).await;
|
||||
results.insert(key, reconstruct_res);
|
||||
@@ -1072,10 +1090,10 @@ impl Timeline {
|
||||
panic!(concat!("Sequential get failed with {}, but vectored get did not",
|
||||
" - keyspace={:?} lsn={}"),
|
||||
seq_err, keyspace, lsn) },
|
||||
(Ok(_), Err(vec_err)) => {
|
||||
(Ok(seq_ok), Err(vec_err)) => {
|
||||
panic!(concat!("Vectored get failed with {}, but sequential get did not",
|
||||
" - keyspace={:?} lsn={}"),
|
||||
vec_err, keyspace, lsn) },
|
||||
" - keyspace={:?} lsn={} seq_ok={:?}"),
|
||||
vec_err, keyspace, lsn, seq_ok) },
|
||||
(Err(seq_err), Err(vec_err)) => {
|
||||
assert!(errors_match(seq_err, vec_err),
|
||||
"Mismatched errors: {seq_err} != {vec_err} - keyspace={keyspace:?} lsn={lsn}")},
|
||||
|
||||
Reference in New Issue
Block a user