mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-23 08:00:37 +00:00
pageserver: add get page perf tracing
Sampling is done in page service after reading the request from the wire. A completely separate span hierarchy is used for perf tracing. The spans live in the `RequestContext` and span relationships are expressed via the APIs exposed by `RequestContext`.
This commit is contained in:
@@ -28,7 +28,7 @@ use core::{
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use pin_project_lite::pin_project;
|
||||
use tracing::{field, span::Span, Dispatch};
|
||||
use tracing::{Dispatch, field, span::Span};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PerfSpan {
|
||||
|
||||
@@ -55,6 +55,9 @@ pub const DEFAULT_PG_VERSION: u32 = 16;
|
||||
pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
|
||||
pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
|
||||
|
||||
// Target used for performance traces.
|
||||
pub const PERF_TRACE_TARGET: &str = "P";
|
||||
|
||||
static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
|
||||
|
||||
pub use crate::metrics::preinitialize_metrics;
|
||||
|
||||
@@ -9,6 +9,7 @@ use std::sync::Arc;
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
use std::{io, str};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use anyhow::{Context, bail};
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use bytes::Buf;
|
||||
@@ -17,7 +18,7 @@ use itertools::Itertools;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::config::{
|
||||
PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
|
||||
PageServiceProtocolPipelinedExecutionStrategy,
|
||||
PageServiceProtocolPipelinedExecutionStrategy, Tracing,
|
||||
};
|
||||
use pageserver_api::key::rel_block_to_key;
|
||||
use pageserver_api::models::{
|
||||
@@ -36,6 +37,7 @@ use postgres_ffi::BLCKSZ;
|
||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||
use pq_proto::framed::ConnectionError;
|
||||
use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor};
|
||||
use rand::Rng;
|
||||
use strum_macros::IntoStaticStr;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter};
|
||||
use tokio::task::JoinHandle;
|
||||
@@ -607,6 +609,7 @@ impl std::fmt::Display for BatchedPageStreamError {
|
||||
struct BatchedGetPageRequest {
|
||||
req: PagestreamGetPageRequest,
|
||||
timer: SmgrOpTimer,
|
||||
ctx: RequestContext,
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -743,6 +746,7 @@ impl PageServerHandler {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
timeline_handles: &mut TimelineHandles,
|
||||
tracing_config: Option<&Tracing>,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
protocol_version: PagestreamProtocolVersion,
|
||||
@@ -902,10 +906,55 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
let key = rel_block_to_key(req.rel, req.blkno);
|
||||
let shard = match timeline_handles
|
||||
.get(tenant_id, timeline_id, ShardSelector::Page(key))
|
||||
.await
|
||||
{
|
||||
|
||||
let sampled = match tracing_config {
|
||||
Some(conf) => {
|
||||
let ratio = &conf.sampling_ratio;
|
||||
|
||||
if ratio.numerator == 0 {
|
||||
false
|
||||
} else {
|
||||
rand::thread_rng().gen_range(0..ratio.denominator) < ratio.numerator
|
||||
}
|
||||
}
|
||||
None => false,
|
||||
};
|
||||
|
||||
let get_page_context = if sampled {
|
||||
RequestContextBuilder::from(ctx)
|
||||
.root_perf_span(|| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
"GET_PAGE",
|
||||
tenant_id = %tenant_id,
|
||||
timeline_id = %timeline_id,
|
||||
lsn = %req.hdr.request_lsn,
|
||||
request_id = %req.hdr.reqid,
|
||||
key = %key)
|
||||
})
|
||||
.attached_child()
|
||||
} else {
|
||||
ctx.attached_child()
|
||||
};
|
||||
|
||||
let res = get_page_context
|
||||
.maybe_instrument(
|
||||
timeline_handles.get(tenant_id, timeline_id, ShardSelector::Page(key)),
|
||||
|current_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: current_perf_span,
|
||||
"SHARD_SELECTION",
|
||||
tenant_id = %tenant_id,
|
||||
timeline_id = %timeline_id,
|
||||
lsn = %req.hdr.request_lsn,
|
||||
request_id = %req.hdr.reqid
|
||||
)
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
let shard = match res {
|
||||
Ok(tl) => tl,
|
||||
Err(e) => {
|
||||
let span = mkspan!(before shard routing);
|
||||
@@ -934,24 +983,59 @@ impl PageServerHandler {
|
||||
};
|
||||
let span = mkspan!(shard.tenant_shard_id.shard_slug());
|
||||
|
||||
let timer = record_op_start_and_throttle(
|
||||
&shard,
|
||||
metrics::SmgrQueryType::GetPageAtLsn,
|
||||
received_at,
|
||||
)
|
||||
.await?;
|
||||
// TODO(vlad): why does this not show up?
|
||||
get_page_context.perf_span_record(
|
||||
"shard",
|
||||
tracing::field::display(shard.get_shard_identity().shard_slug()),
|
||||
);
|
||||
|
||||
let timer = get_page_context
|
||||
.maybe_instrument(
|
||||
record_op_start_and_throttle(
|
||||
&shard,
|
||||
metrics::SmgrQueryType::GetPageAtLsn,
|
||||
received_at,
|
||||
),
|
||||
|current_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: current_perf_span,
|
||||
"THROTTLE",
|
||||
tenant_id = %tenant_id,
|
||||
timeline_id = %timeline_id,
|
||||
lsn = %req.hdr.request_lsn,
|
||||
request_id = %req.hdr.reqid
|
||||
)
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
// We're holding the Handle
|
||||
let effective_request_lsn = match Self::wait_or_get_last_lsn(
|
||||
&shard,
|
||||
req.hdr.request_lsn,
|
||||
req.hdr.not_modified_since,
|
||||
&shard.get_applied_gc_cutoff_lsn(),
|
||||
ctx,
|
||||
)
|
||||
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
|
||||
.await
|
||||
{
|
||||
let res = get_page_context
|
||||
.maybe_instrument(
|
||||
Self::wait_or_get_last_lsn(
|
||||
&shard,
|
||||
req.hdr.request_lsn,
|
||||
req.hdr.not_modified_since,
|
||||
&shard.get_applied_gc_cutoff_lsn(),
|
||||
ctx,
|
||||
),
|
||||
|current_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: current_perf_span,
|
||||
"WAIT_LSN",
|
||||
tenant_id = %tenant_id,
|
||||
timeline_id = %timeline_id,
|
||||
lsn = %req.hdr.request_lsn,
|
||||
request_id = %req.hdr.reqid
|
||||
)
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
let effective_request_lsn = match res {
|
||||
Ok(lsn) => lsn,
|
||||
Err(e) => {
|
||||
return respond_error!(span, e);
|
||||
@@ -961,7 +1045,11 @@ impl PageServerHandler {
|
||||
span,
|
||||
shard: shard.downgrade(),
|
||||
effective_request_lsn,
|
||||
pages: smallvec::smallvec![BatchedGetPageRequest { req, timer }],
|
||||
pages: smallvec::smallvec![BatchedGetPageRequest {
|
||||
req,
|
||||
timer,
|
||||
ctx: get_page_context
|
||||
}],
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -1493,12 +1581,15 @@ impl PageServerHandler {
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
|
||||
{
|
||||
let cancel = self.cancel.clone();
|
||||
let tracing_config = self.conf.tracing.clone();
|
||||
|
||||
let err = loop {
|
||||
let msg = Self::pagestream_read_message(
|
||||
&mut pgb_reader,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&mut timeline_handles,
|
||||
tracing_config.as_ref(),
|
||||
&cancel,
|
||||
ctx,
|
||||
protocol_version,
|
||||
@@ -1632,6 +1723,8 @@ impl PageServerHandler {
|
||||
// Batcher
|
||||
//
|
||||
|
||||
let tracing_config = self.conf.tracing.clone();
|
||||
|
||||
let cancel_batcher = self.cancel.child_token();
|
||||
let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
|
||||
let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| {
|
||||
@@ -1645,6 +1738,7 @@ impl PageServerHandler {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&mut timeline_handles,
|
||||
tracing_config.as_ref(),
|
||||
&cancel_batcher,
|
||||
&ctx,
|
||||
protocol_version,
|
||||
@@ -1983,7 +2077,9 @@ impl PageServerHandler {
|
||||
|
||||
let results = timeline
|
||||
.get_rel_page_at_lsn_batched(
|
||||
requests.iter().map(|p| (&p.req.rel, &p.req.blkno)),
|
||||
requests
|
||||
.iter()
|
||||
.map(|p| (&p.req.rel, &p.req.blkno, p.ctx.attached_child())),
|
||||
effective_lsn,
|
||||
io_concurrency,
|
||||
ctx,
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
use std::collections::{BTreeMap, HashMap, HashSet, hash_map};
|
||||
use std::ops::{ControlFlow, Range};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use anyhow::{Context, ensure};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
@@ -31,7 +32,7 @@ use postgres_ffi::{BLCKSZ, Oid, RepOriginId, TimestampTz, TransactionId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use strum::IntoEnumIterator;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, info, trace, warn};
|
||||
use tracing::{debug, info, info_span, trace, warn};
|
||||
use utils::bin_ser::{BeSer, DeserializeError};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::pausable_failpoint;
|
||||
@@ -39,7 +40,7 @@ use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
|
||||
|
||||
use super::tenant::{PageReconstructError, Timeline};
|
||||
use crate::aux_file;
|
||||
use crate::context::RequestContext;
|
||||
use crate::context::{RequestContext, RequestContextBuilder};
|
||||
use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::metrics::{
|
||||
RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
|
||||
@@ -209,7 +210,9 @@ impl Timeline {
|
||||
let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
|
||||
let res = self
|
||||
.get_rel_page_at_lsn_batched(
|
||||
pages.iter().map(|(tag, blknum)| (tag, blknum)),
|
||||
pages
|
||||
.iter()
|
||||
.map(|(tag, blknum)| (tag, blknum, ctx.attached_child())),
|
||||
effective_lsn,
|
||||
io_concurrency.clone(),
|
||||
ctx,
|
||||
@@ -248,7 +251,7 @@ impl Timeline {
|
||||
/// The ordering of the returned vec corresponds to the ordering of `pages`.
|
||||
pub(crate) async fn get_rel_page_at_lsn_batched(
|
||||
&self,
|
||||
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber)>,
|
||||
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, RequestContext)>,
|
||||
effective_lsn: Lsn,
|
||||
io_concurrency: IoConcurrency,
|
||||
ctx: &RequestContext,
|
||||
@@ -262,8 +265,11 @@ impl Timeline {
|
||||
let mut result = Vec::with_capacity(pages.len());
|
||||
let result_slots = result.spare_capacity_mut();
|
||||
|
||||
let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[usize; 1]>> = BTreeMap::default();
|
||||
for (response_slot_idx, (tag, blknum)) in pages.enumerate() {
|
||||
let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
|
||||
BTreeMap::default();
|
||||
|
||||
let mut perf_instrument = false;
|
||||
for (response_slot_idx, (tag, blknum, req_ctx)) in pages.enumerate() {
|
||||
if tag.relnode == 0 {
|
||||
result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
|
||||
RelationError::InvalidRelnode.into(),
|
||||
@@ -273,6 +279,7 @@ impl Timeline {
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO: perf span
|
||||
let nblocks = match self
|
||||
.get_rel_size(*tag, Version::Lsn(effective_lsn), ctx)
|
||||
.await
|
||||
@@ -297,8 +304,12 @@ impl Timeline {
|
||||
|
||||
let key = rel_block_to_key(*tag, *blknum);
|
||||
|
||||
if req_ctx.has_perf_span() {
|
||||
perf_instrument = true;
|
||||
}
|
||||
|
||||
let key_slots = keys_slots.entry(key).or_default();
|
||||
key_slots.push(response_slot_idx);
|
||||
key_slots.push((response_slot_idx, req_ctx));
|
||||
}
|
||||
|
||||
let keyspace = {
|
||||
@@ -314,16 +325,36 @@ impl Timeline {
|
||||
acc.to_keyspace()
|
||||
};
|
||||
|
||||
match self
|
||||
.get_vectored(keyspace, effective_lsn, io_concurrency, ctx)
|
||||
.await
|
||||
{
|
||||
let get_vectored_ctx = match perf_instrument {
|
||||
true => RequestContextBuilder::from(ctx)
|
||||
.root_perf_span(|| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
"GET_VECTORED",
|
||||
tenant_id = %self.tenant_shard_id.tenant_id,
|
||||
timeline_id = %self.timeline_id,
|
||||
lsn = %effective_lsn,
|
||||
shard = %self.tenant_shard_id.shard_slug(),
|
||||
)
|
||||
})
|
||||
.attached_child(),
|
||||
false => ctx.attached_child(),
|
||||
};
|
||||
|
||||
let res = get_vectored_ctx
|
||||
.maybe_instrument(
|
||||
self.get_vectored(keyspace, effective_lsn, io_concurrency, &get_vectored_ctx),
|
||||
|current_perf_span| current_perf_span.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(results) => {
|
||||
for (key, res) in results {
|
||||
let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
|
||||
let first_slot = key_slots.next().unwrap();
|
||||
let (first_slot, first_req_ctx) = key_slots.next().unwrap();
|
||||
|
||||
for slot in key_slots {
|
||||
for (slot, req_ctx) in key_slots {
|
||||
let clone = match &res {
|
||||
Ok(buf) => Ok(buf.clone()),
|
||||
Err(err) => Err(match err {
|
||||
@@ -341,17 +372,19 @@ impl Timeline {
|
||||
};
|
||||
|
||||
result_slots[slot].write(clone);
|
||||
req_ctx.perf_follows_from(&get_vectored_ctx);
|
||||
slots_filled += 1;
|
||||
}
|
||||
|
||||
result_slots[first_slot].write(res);
|
||||
first_req_ctx.perf_follows_from(&get_vectored_ctx);
|
||||
slots_filled += 1;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
// this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
|
||||
// (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
|
||||
for slot in keys_slots.values().flatten() {
|
||||
for (slot, req_ctx) in keys_slots.values().flatten() {
|
||||
// this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
|
||||
// but without taking ownership of the GetVectoredError
|
||||
let err = match &err {
|
||||
@@ -383,6 +416,7 @@ impl Timeline {
|
||||
}
|
||||
};
|
||||
|
||||
req_ctx.perf_follows_from(&get_vectored_ctx);
|
||||
result_slots[*slot].write(err);
|
||||
}
|
||||
|
||||
|
||||
@@ -13,13 +13,13 @@ pub mod merge_iterator;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{BinaryHeap, HashMap};
|
||||
use std::future::Future;
|
||||
use std::ops::Range;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
pub use batch_split_writer::{BatchLayerWriter, SplitDeltaLayerWriter, SplitImageLayerWriter};
|
||||
use bytes::Bytes;
|
||||
pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef};
|
||||
@@ -34,7 +34,7 @@ use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::value::Value;
|
||||
use tracing::{Instrument, trace};
|
||||
use tracing::{Instrument, info_span, trace};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::sync::gate::GateGuard;
|
||||
|
||||
@@ -43,7 +43,7 @@ use super::PageReconstructError;
|
||||
use super::layer_map::InMemoryLayerDesc;
|
||||
use super::timeline::{GetVectoredError, ReadPath};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{AccessStatsBehavior, RequestContext};
|
||||
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
|
||||
|
||||
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
|
||||
where
|
||||
@@ -874,13 +874,51 @@ impl ReadableLayer {
|
||||
) -> Result<(), GetVectoredError> {
|
||||
match self {
|
||||
ReadableLayer::PersistentLayer(layer) => {
|
||||
layer
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
|
||||
let persistent_context = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"PLAN_LAYER",
|
||||
layer = %layer
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
persistent_context
|
||||
.maybe_instrument(
|
||||
layer.get_values_reconstruct_data(
|
||||
keyspace,
|
||||
lsn_range,
|
||||
reconstruct_state,
|
||||
&persistent_context,
|
||||
),
|
||||
|crnt_perf_span| crnt_perf_span.clone(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
ReadableLayer::InMemoryLayer(layer) => {
|
||||
layer
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
|
||||
let in_mem_context = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"PLAN_LAYER",
|
||||
layer = %layer
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
in_mem_context
|
||||
.maybe_instrument(
|
||||
layer.get_values_reconstruct_data(
|
||||
keyspace,
|
||||
lsn_range,
|
||||
reconstruct_state,
|
||||
&in_mem_context,
|
||||
),
|
||||
|crnt_perf_span| crnt_perf_span.clone(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,12 +3,13 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::{Duration, SystemTime};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use anyhow::Context;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::HistoricLayerInfo;
|
||||
use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId};
|
||||
use tracing::Instrument;
|
||||
use tracing::{Instrument, info_span};
|
||||
use utils::generation::Generation;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
@@ -324,16 +325,29 @@ impl Layer {
|
||||
reconstruct_data: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
let downloaded =
|
||||
self.0
|
||||
.get_or_maybe_download(true, ctx)
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => {
|
||||
GetVectoredError::Cancelled
|
||||
}
|
||||
other => GetVectoredError::Other(anyhow::anyhow!(other)),
|
||||
})?;
|
||||
let get_layer_context = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"GET_LAYER",
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
let downloaded = get_layer_context
|
||||
.maybe_instrument(
|
||||
self.0.get_or_maybe_download(true, &get_layer_context),
|
||||
|crnt_perf_context| crnt_perf_context.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => {
|
||||
GetVectoredError::Cancelled
|
||||
}
|
||||
other => GetVectoredError::Other(anyhow::anyhow!(other)),
|
||||
})?;
|
||||
|
||||
let this = ResidentLayer {
|
||||
downloaded: downloaded.clone(),
|
||||
owner: self.clone(),
|
||||
@@ -341,9 +355,29 @@ impl Layer {
|
||||
|
||||
self.record_access(ctx);
|
||||
|
||||
downloaded
|
||||
.get_values_reconstruct_data(this, keyspace, lsn_range, reconstruct_data, ctx)
|
||||
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
|
||||
let visit_layer_context = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"VISIT_LAYER",
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
visit_layer_context
|
||||
.maybe_instrument(
|
||||
downloaded
|
||||
.get_values_reconstruct_data(
|
||||
this,
|
||||
keyspace,
|
||||
lsn_range,
|
||||
reconstruct_data,
|
||||
&visit_layer_context,
|
||||
)
|
||||
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self)),
|
||||
|crnt_perf_span| crnt_perf_span.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
GetVectoredError::Other(err) => GetVectoredError::Other(
|
||||
@@ -1045,15 +1079,36 @@ impl LayerInner {
|
||||
return Err(DownloadError::DownloadRequired);
|
||||
}
|
||||
|
||||
let download_ctx = ctx.detached_child(TaskKind::LayerDownload, DownloadBehavior::Download);
|
||||
let download_ctx = if ctx.has_perf_span() {
|
||||
let dl_ctx = RequestContextBuilder::from(ctx)
|
||||
.task_kind(TaskKind::LayerDownload)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.root_perf_span(|| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
"DOWNLOAD_LAYER",
|
||||
layer = %self,
|
||||
reason = %reason
|
||||
)
|
||||
})
|
||||
.detached_child();
|
||||
ctx.perf_follows_from(&dl_ctx);
|
||||
dl_ctx
|
||||
} else {
|
||||
ctx.attached_child()
|
||||
};
|
||||
|
||||
async move {
|
||||
tracing::info!(%reason, "downloading on-demand");
|
||||
|
||||
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
|
||||
let res = self
|
||||
.download_init_and_wait(timeline, permit, download_ctx)
|
||||
let res = download_ctx
|
||||
.maybe_instrument(
|
||||
self.download_init_and_wait(timeline, permit, download_ctx.attached_child()),
|
||||
|crnt_perf_span| crnt_perf_span.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
|
||||
use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use anyhow::{Context, Result, anyhow, bail, ensure};
|
||||
use arc_swap::{ArcSwap, ArcSwapOption};
|
||||
use bytes::Bytes;
|
||||
@@ -94,7 +95,7 @@ use super::{
|
||||
};
|
||||
use crate::aux_file::AuxFileSizeEstimator;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
|
||||
use crate::disk_usage_eviction_task::{DiskUsageEvictionInfo, EvictionCandidate, finite_f32};
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
use crate::l0_flush::{self, L0FlushGlobalState};
|
||||
@@ -1274,9 +1275,28 @@ impl Timeline {
|
||||
};
|
||||
reconstruct_state.read_path = read_path;
|
||||
|
||||
let traversal_res: Result<(), _> = self
|
||||
.get_vectored_reconstruct_data(keyspace.clone(), lsn, reconstruct_state, ctx)
|
||||
let plan_context = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"PLAN_IO",
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
let traversal_res: Result<(), _> = plan_context
|
||||
.maybe_instrument(
|
||||
self.get_vectored_reconstruct_data(
|
||||
keyspace.clone(),
|
||||
lsn,
|
||||
reconstruct_state,
|
||||
&plan_context,
|
||||
),
|
||||
|crnt_perf_span| crnt_perf_span.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Err(err) = traversal_res {
|
||||
// Wait for all the spawned IOs to complete.
|
||||
// See comments on `spawn_io` inside `storage_layer` for more details.
|
||||
@@ -1290,14 +1310,45 @@ impl Timeline {
|
||||
|
||||
let layers_visited = reconstruct_state.get_layers_visited();
|
||||
|
||||
let execute_context = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"RECONSTRUCT",
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
let futs = FuturesUnordered::new();
|
||||
for (key, state) in std::mem::take(&mut reconstruct_state.keys) {
|
||||
futs.push({
|
||||
let walredo_self = self.myself.upgrade().expect("&self method holds the arc");
|
||||
let execute_key_context = RequestContextBuilder::from(&execute_context)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"RECONSTRUCT_KEY",
|
||||
key = %key,
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
async move {
|
||||
assert_eq!(state.situation, ValueReconstructSituation::Complete);
|
||||
|
||||
let converted = match state.collect_pending_ios().await {
|
||||
let res = execute_key_context
|
||||
.maybe_instrument(state.collect_pending_ios(), |crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"EXECUTE_IO",
|
||||
)
|
||||
})
|
||||
.await;
|
||||
|
||||
let converted = match res {
|
||||
Ok(ok) => ok,
|
||||
Err(err) => {
|
||||
return (key, Err(err));
|
||||
@@ -1314,16 +1365,31 @@ impl Timeline {
|
||||
"{converted:?}"
|
||||
);
|
||||
|
||||
(
|
||||
key,
|
||||
walredo_self.reconstruct_value(key, lsn, converted).await,
|
||||
)
|
||||
let walredo_deltas = converted.num_deltas();
|
||||
let walredo_res = execute_key_context
|
||||
.maybe_instrument(
|
||||
walredo_self.reconstruct_value(key, lsn, converted),
|
||||
|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"WALREDO",
|
||||
deltas = %walredo_deltas,
|
||||
)
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
(key, walredo_res)
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let results = futs
|
||||
.collect::<BTreeMap<Key, Result<Bytes, PageReconstructError>>>()
|
||||
let results = execute_context
|
||||
.maybe_instrument(
|
||||
futs.collect::<BTreeMap<Key, Result<Bytes, PageReconstructError>>>(),
|
||||
|crnt_perf_span| crnt_perf_span.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
// For aux file keys (v1 or v2) the vectored read path does not return an error
|
||||
@@ -3801,18 +3867,34 @@ impl Timeline {
|
||||
return Err(GetVectoredError::Cancelled);
|
||||
}
|
||||
|
||||
let plan_context = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"PLAN_IO_TIMELINE",
|
||||
timeline = %timeline.timeline_id,
|
||||
lsn = %cont_lsn,
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
let TimelineVisitOutcome {
|
||||
completed_keyspace: completed,
|
||||
image_covered_keyspace,
|
||||
} = Self::get_vectored_reconstruct_data_timeline(
|
||||
timeline,
|
||||
keyspace.clone(),
|
||||
cont_lsn,
|
||||
reconstruct_state,
|
||||
&self.cancel,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
} = plan_context
|
||||
.maybe_instrument(
|
||||
Self::get_vectored_reconstruct_data_timeline(
|
||||
timeline,
|
||||
keyspace.clone(),
|
||||
cont_lsn,
|
||||
reconstruct_state,
|
||||
&self.cancel,
|
||||
&plan_context,
|
||||
),
|
||||
|crnt_perf_span| crnt_perf_span.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
keyspace.remove_overlapping_with(&completed);
|
||||
|
||||
@@ -3856,8 +3938,26 @@ impl Timeline {
|
||||
|
||||
// Take the min to avoid reconstructing a page with data newer than request Lsn.
|
||||
cont_lsn = std::cmp::min(Lsn(request_lsn.0 + 1), Lsn(timeline.ancestor_lsn.0 + 1));
|
||||
timeline_owned = timeline
|
||||
.get_ready_ancestor_timeline(ancestor_timeline, ctx)
|
||||
|
||||
let get_ancestor_context = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"GET_ANCESTOR",
|
||||
timeline = %timeline.timeline_id,
|
||||
lsn = %cont_lsn,
|
||||
ancestor = %ancestor_timeline.timeline_id,
|
||||
ancestor_lsn = %timeline.ancestor_lsn
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
timeline_owned = get_ancestor_context
|
||||
.maybe_instrument(
|
||||
timeline.get_ready_ancestor_timeline(ancestor_timeline, &get_ancestor_context),
|
||||
|crnt_perf_span| crnt_perf_span.clone(),
|
||||
)
|
||||
.await?;
|
||||
timeline = &*timeline_owned;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user