mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 02:12:56 +00:00
WIP: batching observability improvements
This commit is contained in:
@@ -1178,19 +1178,21 @@ pub(crate) mod virtual_file_io_engine {
|
||||
});
|
||||
}
|
||||
|
||||
struct GlobalAndPerTimelineHistogramTimer<'a, 'c> {
|
||||
struct GlobalAndPerTimelineHistogramTimer<'a, 'c, I>
|
||||
where
|
||||
I: IntoIterator<Item = std::time::Instant> + ExactSizeIterator,
|
||||
{
|
||||
global_latency_histo: &'a Histogram,
|
||||
|
||||
// Optional because not all op types are tracked per-timeline
|
||||
per_timeline_latency_histo: Option<&'a Histogram>,
|
||||
|
||||
ctx: &'c RequestContext,
|
||||
start: std::time::Instant,
|
||||
starts: I,
|
||||
op: SmgrQueryType,
|
||||
count: usize,
|
||||
}
|
||||
|
||||
impl Drop for GlobalAndPerTimelineHistogramTimer<'_, '_> {
|
||||
impl Drop for GlobalAndPerTimelineHistogramTimer<'_, '_, _> {
|
||||
fn drop(&mut self) {
|
||||
let elapsed = self.start.elapsed();
|
||||
let ex_throttled = self
|
||||
@@ -1392,37 +1394,27 @@ impl SmgrQueryTimePerTimeline {
|
||||
) -> Option<impl Drop + 'a> {
|
||||
self.start_timer_many(op, 1, ctx)
|
||||
}
|
||||
pub(crate) fn start_timer_many<'c: 'a, 'a>(
|
||||
|
||||
pub(crate) fn start_timer_at<'c: 'a, 'a>(
|
||||
&'a self,
|
||||
op: SmgrQueryType,
|
||||
count: usize,
|
||||
start: Instant,
|
||||
ctx: &'c RequestContext,
|
||||
) -> Option<impl Drop + 'a> {
|
||||
let start = Instant::now();
|
||||
|
||||
self.global_started[op as usize].inc();
|
||||
|
||||
// We subtract time spent throttled from the observed latency.
|
||||
match ctx.micros_spent_throttled.open() {
|
||||
Ok(()) => (),
|
||||
Err(error) => {
|
||||
use utils::rate_limit::RateLimit;
|
||||
static LOGGED: Lazy<Mutex<enum_map::EnumMap<SmgrQueryType, RateLimit>>> =
|
||||
Lazy::new(|| {
|
||||
Mutex::new(enum_map::EnumMap::from_array(std::array::from_fn(|_| {
|
||||
RateLimit::new(Duration::from_secs(10))
|
||||
})))
|
||||
});
|
||||
let mut guard = LOGGED.lock().unwrap();
|
||||
let rate_limit = &mut guard[op];
|
||||
rate_limit.call(|| {
|
||||
warn!(?op, error, "error opening micros_spent_throttled; this message is logged at a global rate limit");
|
||||
});
|
||||
}
|
||||
}
|
||||
self.start_timer_at_many(op, std::iter::once(start), ctx)
|
||||
}
|
||||
|
||||
pub(crate) fn start_timer_at_many<'c: 'a, 'a, T>(
|
||||
&'a self,
|
||||
op: SmgrQueryType,
|
||||
starts: T,
|
||||
ctx: &'c RequestContext,
|
||||
) -> Option<impl Drop + 'a>
|
||||
where
|
||||
T: IntoIterator<Item = Instant> + ExactSizeIterator,
|
||||
{
|
||||
let per_timeline_latency_histo = if matches!(op, SmgrQueryType::GetPageAtLsn) {
|
||||
self.per_timeline_getpage_started.inc();
|
||||
self.per_timeline_getpage_started.inc_by(starts.len());
|
||||
Some(&self.per_timeline_getpage_latency)
|
||||
} else {
|
||||
None
|
||||
@@ -1432,9 +1424,8 @@ impl SmgrQueryTimePerTimeline {
|
||||
global_latency_histo: &self.global_latency[op as usize],
|
||||
per_timeline_latency_histo,
|
||||
ctx,
|
||||
start,
|
||||
op,
|
||||
count,
|
||||
starts,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -536,11 +536,13 @@ impl From<WaitLsnError> for QueryError {
|
||||
enum BatchedFeMessage {
|
||||
Exists {
|
||||
span: Span,
|
||||
received_at: Instant,
|
||||
shard: timeline::handle::Handle<TenantManagerTypes>,
|
||||
req: models::PagestreamExistsRequest,
|
||||
},
|
||||
Nblocks {
|
||||
span: Span,
|
||||
received_at: Instant,
|
||||
shard: timeline::handle::Handle<TenantManagerTypes>,
|
||||
req: models::PagestreamNblocksRequest,
|
||||
},
|
||||
@@ -548,15 +550,17 @@ enum BatchedFeMessage {
|
||||
span: Span,
|
||||
shard: timeline::handle::Handle<TenantManagerTypes>,
|
||||
effective_request_lsn: Lsn,
|
||||
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
|
||||
pages: smallvec::SmallVec<[(RelTag, BlockNumber, Instant); 1]>,
|
||||
},
|
||||
DbSize {
|
||||
span: Span,
|
||||
received_at: Instant,
|
||||
shard: timeline::handle::Handle<TenantManagerTypes>,
|
||||
req: models::PagestreamDbSizeRequest,
|
||||
},
|
||||
GetSlruSegment {
|
||||
span: Span,
|
||||
received_at: Instant,
|
||||
shard: timeline::handle::Handle<TenantManagerTypes>,
|
||||
req: models::PagestreamGetSlruSegmentRequest,
|
||||
},
|
||||
@@ -628,6 +632,8 @@ impl PageServerHandler {
|
||||
msg = pgb.read_message() => { msg }
|
||||
};
|
||||
|
||||
let received_at = Instant::now();
|
||||
|
||||
let copy_data_bytes = match msg? {
|
||||
Some(FeMessage::CopyData(bytes)) => bytes,
|
||||
Some(FeMessage::Terminate) => {
|
||||
@@ -656,7 +662,12 @@ impl PageServerHandler {
|
||||
.get(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.instrument(span.clone()) // sets `shard_id` field
|
||||
.await?;
|
||||
BatchedFeMessage::Exists { span, shard, req }
|
||||
BatchedFeMessage::Exists {
|
||||
span,
|
||||
received_at,
|
||||
shard,
|
||||
req,
|
||||
}
|
||||
}
|
||||
PagestreamFeMessage::Nblocks(req) => {
|
||||
let span = tracing::info_span!(parent: parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
|
||||
@@ -664,7 +675,12 @@ impl PageServerHandler {
|
||||
.get(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.instrument(span.clone()) // sets `shard_id` field
|
||||
.await?;
|
||||
BatchedFeMessage::Nblocks { span, shard, req }
|
||||
BatchedFeMessage::Nblocks {
|
||||
span,
|
||||
received_at,
|
||||
shard,
|
||||
req,
|
||||
}
|
||||
}
|
||||
PagestreamFeMessage::DbSize(req) => {
|
||||
let span = tracing::info_span!(parent: parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
|
||||
@@ -672,7 +688,12 @@ impl PageServerHandler {
|
||||
.get(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.instrument(span.clone()) // sets `shard_id` field
|
||||
.await?;
|
||||
BatchedFeMessage::DbSize { span, shard, req }
|
||||
BatchedFeMessage::DbSize {
|
||||
span,
|
||||
received_at,
|
||||
shard,
|
||||
req,
|
||||
}
|
||||
}
|
||||
PagestreamFeMessage::GetSlruSegment(req) => {
|
||||
let span = tracing::info_span!(parent: parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
|
||||
@@ -680,7 +701,12 @@ impl PageServerHandler {
|
||||
.get(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.instrument(span.clone()) // sets `shard_id` field
|
||||
.await?;
|
||||
BatchedFeMessage::GetSlruSegment { span, shard, req }
|
||||
BatchedFeMessage::GetSlruSegment {
|
||||
span,
|
||||
received_at,
|
||||
shard,
|
||||
req,
|
||||
}
|
||||
}
|
||||
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
||||
request_lsn,
|
||||
@@ -743,7 +769,7 @@ impl PageServerHandler {
|
||||
span,
|
||||
shard,
|
||||
effective_request_lsn,
|
||||
pages: smallvec::smallvec![(rel, blkno)],
|
||||
pages: smallvec::smallvec![(rel, blkno, received_at)],
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -830,7 +856,12 @@ impl PageServerHandler {
|
||||
// invoke handler function
|
||||
let (handler_results, span): (Vec<Result<PagestreamBeMessage, PageStreamError>>, _) =
|
||||
match batch {
|
||||
BatchedFeMessage::Exists { span, shard, req } => {
|
||||
BatchedFeMessage::Exists {
|
||||
span,
|
||||
received_at: _,
|
||||
shard,
|
||||
req,
|
||||
} => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::exists");
|
||||
(
|
||||
vec![
|
||||
@@ -841,7 +872,12 @@ impl PageServerHandler {
|
||||
span,
|
||||
)
|
||||
}
|
||||
BatchedFeMessage::Nblocks { span, shard, req } => {
|
||||
BatchedFeMessage::Nblocks {
|
||||
span,
|
||||
received_at,
|
||||
shard,
|
||||
req,
|
||||
} => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::nblocks");
|
||||
(
|
||||
vec![
|
||||
@@ -1409,11 +1445,12 @@ impl PageServerHandler {
|
||||
&mut self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamExistsRequest,
|
||||
received_at: Instant,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let _timer = timeline
|
||||
.query_metrics
|
||||
.start_timer(metrics::SmgrQueryType::GetRelExists, ctx);
|
||||
.start_timer_at(metrics::SmgrQueryType::GetRelExists, received_at, ctx);
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
@@ -1439,11 +1476,12 @@ impl PageServerHandler {
|
||||
&mut self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamNblocksRequest,
|
||||
received_at: Instant,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let _timer = timeline
|
||||
.query_metrics
|
||||
.start_timer(metrics::SmgrQueryType::GetRelSize, ctx);
|
||||
.start_timer_at(metrics::SmgrQueryType::GetRelSize, received_at, ctx);
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
@@ -1469,11 +1507,14 @@ impl PageServerHandler {
|
||||
&mut self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamDbSizeRequest,
|
||||
received_at: Instant,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let _timer = timeline
|
||||
.query_metrics
|
||||
.start_timer(metrics::SmgrQueryType::GetDbSize, ctx);
|
||||
let _timer = timeline.query_metrics.start_timer_at(
|
||||
metrics::SmgrQueryType::GetDbSize,
|
||||
received_at,
|
||||
ctx,
|
||||
);
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
@@ -1500,7 +1541,7 @@ impl PageServerHandler {
|
||||
&mut self,
|
||||
timeline: &Timeline,
|
||||
effective_lsn: Lsn,
|
||||
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
|
||||
pages: smallvec::SmallVec<[(RelTag, BlockNumber, Instant); 1]>,
|
||||
ctx: &RequestContext,
|
||||
) -> Vec<Result<PagestreamBeMessage, PageStreamError>> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
@@ -202,7 +202,7 @@ impl Timeline {
|
||||
Version::Lsn(effective_lsn) => {
|
||||
let pages = smallvec::smallvec![(tag, blknum)];
|
||||
let res = self
|
||||
.get_rel_page_at_lsn_batched(pages, effective_lsn, ctx)
|
||||
.get_rel_page_at_lsn_batched(&pages, effective_lsn, ctx)
|
||||
.await;
|
||||
assert_eq!(res.len(), 1);
|
||||
res.into_iter().next().unwrap()
|
||||
@@ -237,7 +237,7 @@ impl Timeline {
|
||||
/// The ordering of the returned vec corresponds to the ordering of `pages`.
|
||||
pub(crate) async fn get_rel_page_at_lsn_batched(
|
||||
&self,
|
||||
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
|
||||
pages: &smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
|
||||
effective_lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Vec<Result<Bytes, PageReconstructError>> {
|
||||
@@ -251,7 +251,7 @@ impl Timeline {
|
||||
let result_slots = result.spare_capacity_mut();
|
||||
|
||||
let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[usize; 1]>> = BTreeMap::default();
|
||||
for (response_slot_idx, (tag, blknum)) in pages.into_iter().enumerate() {
|
||||
for (response_slot_idx, (tag, blknum)) in pages.iter().enumerate() {
|
||||
if tag.relnode == 0 {
|
||||
result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
|
||||
RelationError::InvalidRelnode.into(),
|
||||
@@ -262,7 +262,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
let nblocks = match self
|
||||
.get_rel_size(tag, Version::Lsn(effective_lsn), ctx)
|
||||
.get_rel_size(*tag, Version::Lsn(effective_lsn), ctx)
|
||||
.await
|
||||
{
|
||||
Ok(nblocks) => nblocks,
|
||||
@@ -273,7 +273,7 @@ impl Timeline {
|
||||
}
|
||||
};
|
||||
|
||||
if blknum >= nblocks {
|
||||
if *blknum >= nblocks {
|
||||
debug!(
|
||||
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
|
||||
tag, blknum, effective_lsn, nblocks
|
||||
@@ -283,7 +283,7 @@ impl Timeline {
|
||||
continue;
|
||||
}
|
||||
|
||||
let key = rel_block_to_key(tag, blknum);
|
||||
let key = rel_block_to_key(*tag, *blknum);
|
||||
|
||||
let key_slots = keys_slots.entry(key).or_default();
|
||||
key_slots.push(response_slot_idx);
|
||||
|
||||
Reference in New Issue
Block a user