From 9db63fea7a9c67233255cfcf687633b59b857d8a Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Thu, 3 Apr 2025 18:56:51 +0100 Subject: [PATCH] pageserver: optionally export perf traces in OTEL format (#11140) Based on https://github.com/neondatabase/neon/pull/11139 ## Problem We want to export performance traces from the pageserver in OTEL format. End goal is to see them in Grafana. ## Summary of changes https://github.com/neondatabase/neon/pull/11139 introduces the infrastructure required to run the otel collector alongside the pageserver. ### Design Requirements: 1. We'd like to avoid implementing our own performance tracing stack if possible and use the `tracing` crate if possible. 2. Ideally, we'd like zero overhead of a sampling rate of zero and be a be able to change the tracing config for a tenant on the fly. 3. We should leave the current span hierarchy intact. This includes adding perf traces without modifying existing tracing. To satisfy (3) (and (2) in part) a separate span hierarchy is used. `RequestContext` gains an optional `perf_span` member that's only set when the request was chosen by sampling. All perf span related methods added to `RequestContext` are no-ops for requests that are not sampled. This on its own is not enough for (3), so performance spans use a separate tracing subscriber. The `tracing` crate doesn't have great support for this, so there's a fair amount of boilerplate to override the subscriber at all points of the perf span lifecycle. ### Perf Impact [Periodic pagebench](https://neonprod.grafana.net/d/ddqtbfykfqfi8d/e904990?orgId=1&from=2025-02-08T14:15:59.362Z&to=2025-03-10T14:15:59.362Z&timezone=utc) shows no statistically significant regression with a sample ratio of 0. There's an annotation on the dashboard on 2025-03-06. ### Overview of changes: 1. Clean up the `RequestContext` API a bit. Namely, get rid of the `RequestContext::extend` API and use the builder instead. 2. Add pageserver level configs for tracing: sampling ratio, otel endpoint, etc. 3. Introduce some perf span tracking utilities and expose them via `RequestContext`. We add a `tracing::Span` wrapper to be used for perf spans and a `tracing::Instrumented` equivalent for it. See doc comments for reason. 4. Set up OTEL tracing infra according to configuration. A separate runtime is used for the collector. 5. Add perf traces to the read path. ## Refs - epic https://github.com/neondatabase/neon/issues/9873 --------- Co-authored-by: Christian Schwarz --- Cargo.lock | 2 + libs/pageserver_api/Cargo.toml | 1 + libs/pageserver_api/src/config.rs | 50 +++++ libs/tracing-utils/Cargo.toml | 1 + libs/tracing-utils/src/lib.rs | 2 +- libs/tracing-utils/src/perf_span.rs | 153 ++++++++++++++ pageserver/src/bin/pageserver.rs | 41 +++- pageserver/src/config.rs | 15 ++ pageserver/src/context.rs | 186 ++++++++++++++---- pageserver/src/http/routes.rs | 16 +- pageserver/src/lib.rs | 3 + pageserver/src/page_service.rs | 124 ++++++++++-- pageserver/src/pgdatadir_mapping.rs | 73 +++++-- pageserver/src/tenant.rs | 4 +- pageserver/src/tenant/storage_layer.rs | 36 +++- .../src/tenant/storage_layer/delta_layer.rs | 8 +- .../src/tenant/storage_layer/image_layer.rs | 4 +- .../tenant/storage_layer/inmemory_layer.rs | 4 +- pageserver/src/tenant/storage_layer/layer.rs | 68 +++++-- .../src/tenant/storage_layer/layer/tests.rs | 17 +- pageserver/src/tenant/timeline.rs | 132 +++++++++++-- pageserver/src/tenant/timeline/compaction.rs | 4 +- test_runner/fixtures/neon_fixtures.py | 34 ++++ .../fixtures/pageserver/allowed_errors.py | 1 + ...er_max_throughput_getpage_at_latest_lsn.py | 11 ++ 25 files changed, 855 insertions(+), 135 deletions(-) create mode 100644 libs/tracing-utils/src/perf_span.rs diff --git a/Cargo.lock b/Cargo.lock index 194ad90d52..03a376cdae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4329,6 +4329,7 @@ dependencies = [ "strum", "strum_macros", "thiserror 1.0.69", + "tracing-utils", "utils", ] @@ -7603,6 +7604,7 @@ dependencies = [ "opentelemetry-otlp", "opentelemetry-semantic-conventions", "opentelemetry_sdk", + "pin-project-lite", "tokio", "tracing", "tracing-opentelemetry", diff --git a/libs/pageserver_api/Cargo.toml b/libs/pageserver_api/Cargo.toml index 87dfdfb5ec..688e9de6e7 100644 --- a/libs/pageserver_api/Cargo.toml +++ b/libs/pageserver_api/Cargo.toml @@ -34,6 +34,7 @@ postgres_backend.workspace = true nix = {workspace = true, optional = true} reqwest.workspace = true rand.workspace = true +tracing-utils.workspace = true [dev-dependencies] bincode.workspace = true diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 47c3136113..66a02b87b0 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -134,6 +134,7 @@ pub struct ConfigToml { pub load_previous_heatmap: Option, #[serde(skip_serializing_if = "Option::is_none")] pub generate_unarchival_heatmap: Option, + pub tracing: Option, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -191,6 +192,54 @@ pub enum GetVectoredConcurrentIo { SidecarTask, } +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct Ratio { + pub numerator: usize, + pub denominator: usize, +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct OtelExporterConfig { + pub endpoint: String, + pub protocol: OtelExporterProtocol, + #[serde(with = "humantime_serde")] + pub timeout: Duration, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum OtelExporterProtocol { + Grpc, + HttpBinary, + HttpJson, +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct Tracing { + pub sampling_ratio: Ratio, + pub export_config: OtelExporterConfig, +} + +impl From<&OtelExporterConfig> for tracing_utils::ExportConfig { + fn from(val: &OtelExporterConfig) -> Self { + tracing_utils::ExportConfig { + endpoint: Some(val.endpoint.clone()), + protocol: val.protocol.into(), + timeout: val.timeout, + } + } +} + +impl From for tracing_utils::Protocol { + fn from(val: OtelExporterProtocol) -> Self { + match val { + OtelExporterProtocol::Grpc => tracing_utils::Protocol::Grpc, + OtelExporterProtocol::HttpJson => tracing_utils::Protocol::HttpJson, + OtelExporterProtocol::HttpBinary => tracing_utils::Protocol::HttpBinary, + } + } +} + pub mod statvfs { pub mod mock { #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -537,6 +586,7 @@ impl Default for ConfigToml { validate_wal_contiguity: None, load_previous_heatmap: None, generate_unarchival_heatmap: None, + tracing: None, } } } diff --git a/libs/tracing-utils/Cargo.toml b/libs/tracing-utils/Cargo.toml index 60637d5b24..49a6055b1e 100644 --- a/libs/tracing-utils/Cargo.toml +++ b/libs/tracing-utils/Cargo.toml @@ -14,6 +14,7 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread"] } tracing.workspace = true tracing-opentelemetry.workspace = true tracing-subscriber.workspace = true +pin-project-lite.workspace = true [dev-dependencies] tracing-subscriber.workspace = true # For examples in docs diff --git a/libs/tracing-utils/src/lib.rs b/libs/tracing-utils/src/lib.rs index 74992a7d03..0893aa173b 100644 --- a/libs/tracing-utils/src/lib.rs +++ b/libs/tracing-utils/src/lib.rs @@ -31,10 +31,10 @@ //! .init(); //! } //! ``` -#![deny(unsafe_code)] #![deny(clippy::undocumented_unsafe_blocks)] pub mod http; +pub mod perf_span; use opentelemetry::KeyValue; use opentelemetry::trace::TracerProvider; diff --git a/libs/tracing-utils/src/perf_span.rs b/libs/tracing-utils/src/perf_span.rs new file mode 100644 index 0000000000..f2ca76a816 --- /dev/null +++ b/libs/tracing-utils/src/perf_span.rs @@ -0,0 +1,153 @@ +//! Crutch module to work around tracing infrastructure deficiencies +//! +//! We wish to collect granular request spans without impacting performance +//! by much. Ideally, we should have zero overhead for a sampling rate of 0. +//! +//! The approach taken by the pageserver crate is to use a completely different +//! span hierarchy for the performance spans. Spans are explicitly stored in +//! the request context and use a different [`tracing::Subscriber`] in order +//! to avoid expensive filtering. +//! +//! [`tracing::Span`] instances record their [`tracing::Dispatch`] and, implcitly, +//! their [`tracing::Subscriber`] at creation time. However, upon exiting the span, +//! the global default [`tracing::Dispatch`] is used. This is problematic if one +//! wishes to juggle different subscribers. +//! +//! In order to work around this, this module provides a [`PerfSpan`] type which +//! wraps a [`Span`] and sets the default subscriber when exiting the span. This +//! achieves the correct routing. +//! +//! There's also a modified version of [`tracing::Instrument`] which works with +//! [`PerfSpan`]. + +use core::{ + future::Future, + marker::Sized, + mem::ManuallyDrop, + pin::Pin, + task::{Context, Poll}, +}; +use pin_project_lite::pin_project; +use tracing::{Dispatch, field, span::Span}; + +#[derive(Debug, Clone)] +pub struct PerfSpan { + inner: ManuallyDrop, + dispatch: Dispatch, +} + +#[must_use = "once a span has been entered, it should be exited"] +pub struct PerfSpanEntered<'a> { + span: &'a PerfSpan, +} + +impl PerfSpan { + pub fn new(span: Span, dispatch: Dispatch) -> Self { + Self { + inner: ManuallyDrop::new(span), + dispatch, + } + } + + pub fn record( + &self, + field: &Q, + value: V, + ) -> &Self { + self.inner.record(field, value); + self + } + + pub fn enter(&self) -> PerfSpanEntered { + if let Some(ref id) = self.inner.id() { + self.dispatch.enter(id); + } + + PerfSpanEntered { span: self } + } + + pub fn inner(&self) -> &Span { + &self.inner + } +} + +impl Drop for PerfSpan { + fn drop(&mut self) { + // Bring the desired dispatch into scope before explicitly calling + // the span destructor. This routes the span exit to the correct + // [`tracing::Subscriber`]. + let _dispatch_guard = tracing::dispatcher::set_default(&self.dispatch); + // SAFETY: ManuallyDrop in Drop implementation + unsafe { ManuallyDrop::drop(&mut self.inner) } + } +} + +impl Drop for PerfSpanEntered<'_> { + fn drop(&mut self) { + assert!(self.span.inner.id().is_some()); + + let _dispatch_guard = tracing::dispatcher::set_default(&self.span.dispatch); + self.span.dispatch.exit(&self.span.inner.id().unwrap()); + } +} + +pub trait PerfInstrument: Sized { + fn instrument(self, span: PerfSpan) -> PerfInstrumented { + PerfInstrumented { + inner: ManuallyDrop::new(self), + span, + } + } +} + +pin_project! { + #[project = PerfInstrumentedProj] + #[derive(Debug, Clone)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct PerfInstrumented { + // `ManuallyDrop` is used here to to enter instrument `Drop` by entering + // `Span` and executing `ManuallyDrop::drop`. + #[pin] + inner: ManuallyDrop, + span: PerfSpan, + } + + impl PinnedDrop for PerfInstrumented { + fn drop(this: Pin<&mut Self>) { + let this = this.project(); + let _enter = this.span.enter(); + // SAFETY: 1. `Pin::get_unchecked_mut()` is safe, because this isn't + // different from wrapping `T` in `Option` and calling + // `Pin::set(&mut this.inner, None)`, except avoiding + // additional memory overhead. + // 2. `ManuallyDrop::drop()` is safe, because + // `PinnedDrop::drop()` is guaranteed to be called only + // once. + unsafe { ManuallyDrop::drop(this.inner.get_unchecked_mut()) } + } + } +} + +impl<'a, T> PerfInstrumentedProj<'a, T> { + /// Get a mutable reference to the [`Span`] a pinned mutable reference to + /// the wrapped type. + fn span_and_inner_pin_mut(self) -> (&'a mut PerfSpan, Pin<&'a mut T>) { + // SAFETY: As long as `ManuallyDrop` does not move, `T` won't move + // and `inner` is valid, because `ManuallyDrop::drop` is called + // only inside `Drop` of the `Instrumented`. + let inner = unsafe { self.inner.map_unchecked_mut(|v| &mut **v) }; + (self.span, inner) + } +} + +impl Future for PerfInstrumented { + type Output = T::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let (span, inner) = self.project().span_and_inner_pin_mut(); + let _enter = span.enter(); + inner.poll(cx) + } +} + +impl PerfInstrument for T {} diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 4cfc0c24f8..a575904efa 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -35,6 +35,7 @@ use tokio::signal::unix::SignalKind; use tokio::time::Instant; use tokio_util::sync::CancellationToken; use tracing::*; +use tracing_utils::OtelGuard; use utils::auth::{JwtAuth, SwappableJwtAuth}; use utils::crashsafe::syncfs; use utils::logging::TracingErrorLayerEnablement; @@ -118,6 +119,21 @@ fn main() -> anyhow::Result<()> { logging::Output::Stdout, )?; + let otel_enablement = match &conf.tracing { + Some(cfg) => tracing_utils::OtelEnablement::Enabled { + service_name: "pageserver".to_string(), + export_config: (&cfg.export_config).into(), + runtime: *COMPUTE_REQUEST_RUNTIME, + }, + None => tracing_utils::OtelEnablement::Disabled, + }; + + let otel_guard = tracing_utils::init_performance_tracing(otel_enablement); + + if otel_guard.is_some() { + info!(?conf.tracing, "starting with OTEL tracing enabled"); + } + // mind the order required here: 1. logging, 2. panic_hook, 3. sentry. // disarming this hook on pageserver, because we never tear down tracing. logging::replace_panic_hook_with_tracing_panic_hook().forget(); @@ -191,7 +207,7 @@ fn main() -> anyhow::Result<()> { tracing::info!("Initializing page_cache..."); page_cache::init(conf.page_cache_size); - start_pageserver(launch_ts, conf).context("Failed to start pageserver")?; + start_pageserver(launch_ts, conf, otel_guard).context("Failed to start pageserver")?; scenario.teardown(); Ok(()) @@ -290,6 +306,7 @@ fn startup_checkpoint(started_at: Instant, phase: &str, human_phase: &str) { fn start_pageserver( launch_ts: &'static LaunchTimestamp, conf: &'static PageServerConf, + otel_guard: Option, ) -> anyhow::Result<()> { // Monotonic time for later calculating startup duration let started_startup_at = Instant::now(); @@ -675,13 +692,21 @@ fn start_pageserver( // Spawn a task to listen for libpq connections. It will spawn further tasks // for each connection. We created the listener earlier already. - let page_service = page_service::spawn(conf, tenant_manager.clone(), pg_auth, { - let _entered = COMPUTE_REQUEST_RUNTIME.enter(); // TcpListener::from_std requires it - pageserver_listener - .set_nonblocking(true) - .context("set listener to nonblocking")?; - tokio::net::TcpListener::from_std(pageserver_listener).context("create tokio listener")? - }); + let perf_trace_dispatch = otel_guard.as_ref().map(|g| g.dispatch.clone()); + let page_service = page_service::spawn( + conf, + tenant_manager.clone(), + pg_auth, + perf_trace_dispatch, + { + let _entered = COMPUTE_REQUEST_RUNTIME.enter(); // TcpListener::from_std requires it + pageserver_listener + .set_nonblocking(true) + .context("set listener to nonblocking")?; + tokio::net::TcpListener::from_std(pageserver_listener) + .context("create tokio listener")? + }, + ); // All started up! Now just sit and wait for shutdown signal. BACKGROUND_RUNTIME.block_on(async move { diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index c336f22f8e..d9a5f8c381 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -215,6 +215,8 @@ pub struct PageServerConf { /// When set, include visible layers in the next uploaded heatmaps of an unarchived timeline. pub generate_unarchival_heatmap: bool, + + pub tracing: Option, } /// Token for authentication to safekeepers @@ -386,6 +388,7 @@ impl PageServerConf { validate_wal_contiguity, load_previous_heatmap, generate_unarchival_heatmap, + tracing, } = config_toml; let mut conf = PageServerConf { @@ -435,6 +438,7 @@ impl PageServerConf { wal_receiver_protocol, page_service_pipelining, get_vectored_concurrent_io, + tracing, // ------------------------------------------------------------ // fields that require additional validation or custom handling @@ -506,6 +510,17 @@ impl PageServerConf { ); } + if let Some(tracing_config) = conf.tracing.as_ref() { + let ratio = &tracing_config.sampling_ratio; + ensure!( + ratio.denominator != 0 && ratio.denominator >= ratio.numerator, + format!( + "Invalid sampling ratio: {}/{}", + ratio.numerator, ratio.denominator + ) + ); + } + IndexEntry::validate_checkpoint_distance(conf.default_tenant_conf.checkpoint_distance) .map_err(anyhow::Error::msg) .with_context(|| { diff --git a/pageserver/src/context.rs b/pageserver/src/context.rs index d2caf030df..279d2daf75 100644 --- a/pageserver/src/context.rs +++ b/pageserver/src/context.rs @@ -100,6 +100,12 @@ use crate::{ task_mgr::TaskKind, tenant::Timeline, }; +use futures::FutureExt; +use futures::future::BoxFuture; +use std::future::Future; +use tracing_utils::perf_span::{PerfInstrument, PerfSpan}; + +use tracing::{Dispatch, Span}; // The main structure of this module, see module-level comment. pub struct RequestContext { @@ -109,6 +115,8 @@ pub struct RequestContext { page_content_kind: PageContentKind, read_path_debug: bool, scope: Scope, + perf_span: Option, + perf_span_dispatch: Option, } #[derive(Clone)] @@ -263,22 +271,15 @@ impl RequestContextBuilder { page_content_kind: PageContentKind::Unknown, read_path_debug: false, scope: Scope::new_global(), + perf_span: None, + perf_span_dispatch: None, }, } } - pub fn extend(original: &RequestContext) -> Self { + pub fn from(original: &RequestContext) -> Self { Self { - // This is like a Copy, but avoid implementing Copy because ordinary users of - // RequestContext should always move or ref it. - inner: RequestContext { - task_kind: original.task_kind, - download_behavior: original.download_behavior, - access_stats_behavior: original.access_stats_behavior, - page_content_kind: original.page_content_kind, - read_path_debug: original.read_path_debug, - scope: original.scope.clone(), - }, + inner: original.clone(), } } @@ -316,12 +317,74 @@ impl RequestContextBuilder { self } - pub fn build(self) -> RequestContext { + pub(crate) fn perf_span_dispatch(mut self, dispatch: Option) -> Self { + self.inner.perf_span_dispatch = dispatch; + self + } + + pub fn root_perf_span(mut self, make_span: Fn) -> Self + where + Fn: FnOnce() -> Span, + { + assert!(self.inner.perf_span.is_none()); + assert!(self.inner.perf_span_dispatch.is_some()); + + let dispatcher = self.inner.perf_span_dispatch.as_ref().unwrap(); + let new_span = tracing::dispatcher::with_default(dispatcher, make_span); + + self.inner.perf_span = Some(PerfSpan::new(new_span, dispatcher.clone())); + + self + } + + pub fn perf_span(mut self, make_span: Fn) -> Self + where + Fn: FnOnce(&Span) -> Span, + { + if let Some(ref perf_span) = self.inner.perf_span { + assert!(self.inner.perf_span_dispatch.is_some()); + let dispatcher = self.inner.perf_span_dispatch.as_ref().unwrap(); + + let new_span = + tracing::dispatcher::with_default(dispatcher, || make_span(perf_span.inner())); + + self.inner.perf_span = Some(PerfSpan::new(new_span, dispatcher.clone())); + } + + self + } + + pub fn root(self) -> RequestContext { + self.inner + } + + pub fn attached_child(self) -> RequestContext { + self.inner + } + + pub fn detached_child(self) -> RequestContext { self.inner } } impl RequestContext { + /// Private clone implementation + /// + /// Callers should use the [`RequestContextBuilder`] or child spaning APIs of + /// [`RequestContext`]. + fn clone(&self) -> Self { + Self { + task_kind: self.task_kind, + download_behavior: self.download_behavior, + access_stats_behavior: self.access_stats_behavior, + page_content_kind: self.page_content_kind, + read_path_debug: self.read_path_debug, + scope: self.scope.clone(), + perf_span: self.perf_span.clone(), + perf_span_dispatch: self.perf_span_dispatch.clone(), + } + } + /// Create a new RequestContext that has no parent. /// /// The function is called `new` because, once we add children @@ -337,7 +400,7 @@ impl RequestContext { pub fn new(task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self { RequestContextBuilder::new(task_kind) .download_behavior(download_behavior) - .build() + .root() } /// Create a detached child context for a task that may outlive `self`. @@ -358,7 +421,10 @@ impl RequestContext { /// /// We could make new calls to this function fail if `self` is already canceled. pub fn detached_child(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self { - self.child_impl(task_kind, download_behavior) + RequestContextBuilder::from(self) + .task_kind(task_kind) + .download_behavior(download_behavior) + .detached_child() } /// Create a child of context `self` for a task that shall not outlive `self`. @@ -382,7 +448,7 @@ impl RequestContext { /// The method to wait for child tasks would return an error, indicating /// that the child task was not started because the context was canceled. pub fn attached_child(&self) -> Self { - self.child_impl(self.task_kind(), self.download_behavior()) + RequestContextBuilder::from(self).attached_child() } /// Use this function when you should be creating a child context using @@ -397,17 +463,10 @@ impl RequestContext { Self::new(task_kind, download_behavior) } - fn child_impl(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self { - RequestContextBuilder::extend(self) - .task_kind(task_kind) - .download_behavior(download_behavior) - .build() - } - pub fn with_scope_timeline(&self, timeline: &Arc) -> Self { - RequestContextBuilder::extend(self) + RequestContextBuilder::from(self) .scope(Scope::new_timeline(timeline)) - .build() + .attached_child() } pub(crate) fn with_scope_page_service_pagestream( @@ -416,9 +475,9 @@ impl RequestContext { crate::page_service::TenantManagerTypes, >, ) -> Self { - RequestContextBuilder::extend(self) + RequestContextBuilder::from(self) .scope(Scope::new_page_service_pagestream(timeline_handle)) - .build() + .attached_child() } pub fn with_scope_secondary_timeline( @@ -426,28 +485,30 @@ impl RequestContext { tenant_shard_id: &TenantShardId, timeline_id: &TimelineId, ) -> Self { - RequestContextBuilder::extend(self) + RequestContextBuilder::from(self) .scope(Scope::new_secondary_timeline(tenant_shard_id, timeline_id)) - .build() + .attached_child() } pub fn with_scope_secondary_tenant(&self, tenant_shard_id: &TenantShardId) -> Self { - RequestContextBuilder::extend(self) + RequestContextBuilder::from(self) .scope(Scope::new_secondary_tenant(tenant_shard_id)) - .build() + .attached_child() } #[cfg(test)] pub fn with_scope_unit_test(&self) -> Self { - RequestContextBuilder::new(TaskKind::UnitTest) + RequestContextBuilder::from(self) + .task_kind(TaskKind::UnitTest) .scope(Scope::new_unit_test()) - .build() + .attached_child() } pub fn with_scope_debug_tools(&self) -> Self { - RequestContextBuilder::new(TaskKind::DebugTool) + RequestContextBuilder::from(self) + .task_kind(TaskKind::DebugTool) .scope(Scope::new_debug_tools()) - .build() + .attached_child() } pub fn task_kind(&self) -> TaskKind { @@ -504,4 +565,61 @@ impl RequestContext { Scope::DebugTools { io_size_metrics } => io_size_metrics, } } + + pub(crate) fn perf_follows_from(&self, from: &RequestContext) { + if let (Some(span), Some(from_span)) = (&self.perf_span, &from.perf_span) { + span.inner().follows_from(from_span.inner()); + } + } + + pub(crate) fn perf_span_record< + Q: tracing::field::AsField + ?Sized, + V: tracing::field::Value, + >( + &self, + field: &Q, + value: V, + ) { + if let Some(span) = &self.perf_span { + span.record(field, value); + } + } + + pub(crate) fn has_perf_span(&self) -> bool { + self.perf_span.is_some() + } } + +/// [`Future`] extension trait that allow for creating performance +/// spans on sampled requests +pub(crate) trait PerfInstrumentFutureExt<'a>: Future + Send { + /// Instrument this future with a new performance span when the + /// provided request context indicates the originator request + /// was sampled. Otherwise, just box the future and return it as is. + fn maybe_perf_instrument( + self, + ctx: &RequestContext, + make_span: Fn, + ) -> BoxFuture<'a, Self::Output> + where + Self: Sized + 'a, + Fn: FnOnce(&Span) -> Span, + { + match &ctx.perf_span { + Some(perf_span) => { + assert!(ctx.perf_span_dispatch.is_some()); + let dispatcher = ctx.perf_span_dispatch.as_ref().unwrap(); + + let new_span = + tracing::dispatcher::with_default(dispatcher, || make_span(perf_span.inner())); + + let new_perf_span = PerfSpan::new(new_span, dispatcher.clone()); + self.instrument(new_perf_span).boxed() + } + None => self.boxed(), + } + } +} + +// Implement the trait for all types that satisfy the trait bounds +impl<'a, T: Future + Send + 'a> PerfInstrumentFutureExt<'a> for T {} diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 8dcb654e59..cf67dc596a 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -2697,11 +2697,12 @@ async fn getpage_at_lsn_handler_inner( let lsn: Option = parse_query_param(&request, "lsn")?; async { - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); - // Enable read path debugging let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?; - let ctx = RequestContextBuilder::extend(&ctx).read_path_debug(true) - .scope(context::Scope::new_timeline(&timeline)).build(); + let ctx = RequestContextBuilder::new(TaskKind::MgmtRequest) + .download_behavior(DownloadBehavior::Download) + .scope(context::Scope::new_timeline(&timeline)) + .read_path_debug(true) + .root(); // Use last_record_lsn if no lsn is provided let lsn = lsn.unwrap_or_else(|| timeline.get_last_record_lsn()); @@ -3433,14 +3434,15 @@ async fn put_tenant_timeline_import_wal( check_permission(&request, Some(tenant_id))?; - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); - let span = info_span!("import_wal", tenant_id=%tenant_id, timeline_id=%timeline_id, start_lsn=%start_lsn, end_lsn=%end_lsn); async move { let state = get_state(&request); let timeline = active_timeline_of_active_tenant(&state.tenant_manager, TenantShardId::unsharded(tenant_id), timeline_id).await?; - let ctx = RequestContextBuilder::extend(&ctx).scope(context::Scope::new_timeline(&timeline)).build(); + let ctx = RequestContextBuilder::new(TaskKind::MgmtRequest) + .download_behavior(DownloadBehavior::Warn) + .scope(context::Scope::new_timeline(&timeline)) + .root(); let mut body = StreamReader::new(request.into_body().map(|res| { res.map_err(|error| { diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 8373d0bd87..bda218444d 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -55,6 +55,9 @@ pub const DEFAULT_PG_VERSION: u32 = 16; pub const IMAGE_FILE_MAGIC: u16 = 0x5A60; pub const DELTA_FILE_MAGIC: u16 = 0x5A61; +// Target used for performance traces. +pub const PERF_TRACE_TARGET: &str = "P"; + static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]); pub use crate::metrics::preinitialize_metrics; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 2ed3e0ecb0..3ebd6d8506 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -9,6 +9,7 @@ use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; use std::{io, str}; +use crate::PERF_TRACE_TARGET; use anyhow::{Context, bail}; use async_compression::tokio::write::GzipEncoder; use bytes::Buf; @@ -17,7 +18,7 @@ use itertools::Itertools; use once_cell::sync::OnceCell; use pageserver_api::config::{ PageServicePipeliningConfig, PageServicePipeliningConfigPipelined, - PageServiceProtocolPipelinedExecutionStrategy, + PageServiceProtocolPipelinedExecutionStrategy, Tracing, }; use pageserver_api::key::rel_block_to_key; use pageserver_api::models::{ @@ -36,6 +37,7 @@ use postgres_ffi::BLCKSZ; use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID; use pq_proto::framed::ConnectionError; use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor}; +use rand::Rng; use strum_macros::IntoStaticStr; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter}; use tokio::task::JoinHandle; @@ -53,7 +55,9 @@ use utils::sync::spsc_fold; use crate::auth::check_permission; use crate::basebackup::BasebackupError; use crate::config::PageServerConf; -use crate::context::{DownloadBehavior, RequestContext}; +use crate::context::{ + DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder, +}; use crate::metrics::{ self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, LIVE_CONNECTIONS, SmgrOpTimer, TimelineMetrics, @@ -100,6 +104,7 @@ pub fn spawn( conf: &'static PageServerConf, tenant_manager: Arc, pg_auth: Option>, + perf_trace_dispatch: Option, tcp_listener: tokio::net::TcpListener, ) -> Listener { let cancel = CancellationToken::new(); @@ -117,6 +122,7 @@ pub fn spawn( conf, tenant_manager, pg_auth, + perf_trace_dispatch, tcp_listener, conf.pg_auth_type, conf.page_service_pipelining.clone(), @@ -173,6 +179,7 @@ pub async fn libpq_listener_main( conf: &'static PageServerConf, tenant_manager: Arc, auth: Option>, + perf_trace_dispatch: Option, listener: tokio::net::TcpListener, auth_type: AuthType, pipelining_config: PageServicePipeliningConfig, @@ -205,8 +212,12 @@ pub async fn libpq_listener_main( // Connection established. Spawn a new task to handle it. debug!("accepted connection from {}", peer_addr); let local_auth = auth.clone(); - let connection_ctx = listener_ctx - .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download); + let connection_ctx = RequestContextBuilder::from(&listener_ctx) + .task_kind(TaskKind::PageRequestHandler) + .download_behavior(DownloadBehavior::Download) + .perf_span_dispatch(perf_trace_dispatch.clone()) + .detached_child(); + connection_handler_tasks.spawn(page_service_conn_main( conf, tenant_manager.clone(), @@ -607,6 +618,7 @@ impl std::fmt::Display for BatchedPageStreamError { struct BatchedGetPageRequest { req: PagestreamGetPageRequest, timer: SmgrOpTimer, + ctx: RequestContext, } #[cfg(feature = "testing")] @@ -743,6 +755,7 @@ impl PageServerHandler { tenant_id: TenantId, timeline_id: TimelineId, timeline_handles: &mut TimelineHandles, + tracing_config: Option<&Tracing>, cancel: &CancellationToken, ctx: &RequestContext, protocol_version: PagestreamProtocolVersion, @@ -902,10 +915,51 @@ impl PageServerHandler { } let key = rel_block_to_key(req.rel, req.blkno); - let shard = match timeline_handles + + let sampled = match tracing_config { + Some(conf) => { + let ratio = &conf.sampling_ratio; + + if ratio.numerator == 0 { + false + } else { + rand::thread_rng().gen_range(0..ratio.denominator) < ratio.numerator + } + } + None => false, + }; + + let ctx = if sampled { + RequestContextBuilder::from(ctx) + .root_perf_span(|| { + info_span!( + target: PERF_TRACE_TARGET, + "GET_PAGE", + tenant_id = %tenant_id, + shard_id = field::Empty, + timeline_id = %timeline_id, + lsn = %req.hdr.request_lsn, + request_id = %req.hdr.reqid, + key = %key, + ) + }) + .attached_child() + } else { + ctx.attached_child() + }; + + let res = timeline_handles .get(tenant_id, timeline_id, ShardSelector::Page(key)) - .await - { + .maybe_perf_instrument(&ctx, |current_perf_span| { + info_span!( + target: PERF_TRACE_TARGET, + parent: current_perf_span, + "SHARD_SELECTION", + ) + }) + .await; + + let shard = match res { Ok(tl) => tl, Err(e) => { let span = mkspan!(before shard routing); @@ -932,26 +986,60 @@ impl PageServerHandler { } } }; + + // This ctx travels as part of the BatchedFeMessage through + // batching into the request handler. + // The request handler needs to do some per-request work + // (relsize check) before dispatching the batch as a single + // get_vectored call to the Timeline. + // This ctx will be used for the reslize check, whereas the + // get_vectored call will be a different ctx with separate + // perf span. + let ctx = ctx.with_scope_page_service_pagestream(&shard); + + // Similar game for this `span`: we funnel it through so that + // request handler log messages contain the request-specific fields. let span = mkspan!(shard.tenant_shard_id.shard_slug()); + // Enrich the perf span with shard_id now that shard routing is done. + ctx.perf_span_record( + "shard_id", + tracing::field::display(shard.get_shard_identity().shard_slug()), + ); + let timer = record_op_start_and_throttle( &shard, metrics::SmgrQueryType::GetPageAtLsn, received_at, ) + .maybe_perf_instrument(&ctx, |current_perf_span| { + info_span!( + target: PERF_TRACE_TARGET, + parent: current_perf_span, + "THROTTLE", + ) + }) .await?; // We're holding the Handle - let effective_request_lsn = match Self::wait_or_get_last_lsn( + // TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait + let res = Self::wait_or_get_last_lsn( &shard, req.hdr.request_lsn, req.hdr.not_modified_since, &shard.get_applied_gc_cutoff_lsn(), - ctx, + &ctx, ) - // TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait - .await - { + .maybe_perf_instrument(&ctx, |current_perf_span| { + info_span!( + target: PERF_TRACE_TARGET, + parent: current_perf_span, + "WAIT_LSN", + ) + }) + .await; + + let effective_request_lsn = match res { Ok(lsn) => lsn, Err(e) => { return respond_error!(span, e); @@ -961,7 +1049,7 @@ impl PageServerHandler { span, shard: shard.downgrade(), effective_request_lsn, - pages: smallvec::smallvec![BatchedGetPageRequest { req, timer }], + pages: smallvec::smallvec![BatchedGetPageRequest { req, timer, ctx }], } } #[cfg(feature = "testing")] @@ -1514,12 +1602,15 @@ impl PageServerHandler { IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, { let cancel = self.cancel.clone(); + let tracing_config = self.conf.tracing.clone(); + let err = loop { let msg = Self::pagestream_read_message( &mut pgb_reader, tenant_id, timeline_id, &mut timeline_handles, + tracing_config.as_ref(), &cancel, ctx, protocol_version, @@ -1653,6 +1744,8 @@ impl PageServerHandler { // Batcher // + let tracing_config = self.conf.tracing.clone(); + let cancel_batcher = self.cancel.child_token(); let (mut batch_tx, mut batch_rx) = spsc_fold::channel(); let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| { @@ -1666,6 +1759,7 @@ impl PageServerHandler { tenant_id, timeline_id, &mut timeline_handles, + tracing_config.as_ref(), &cancel_batcher, &ctx, protocol_version, @@ -2004,7 +2098,9 @@ impl PageServerHandler { let results = timeline .get_rel_page_at_lsn_batched( - requests.iter().map(|p| (&p.req.rel, &p.req.blkno)), + requests + .iter() + .map(|p| (&p.req.rel, &p.req.blkno, p.ctx.attached_child())), effective_lsn, io_concurrency, ctx, diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 4685f9383b..e3e06ab91a 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -9,6 +9,7 @@ use std::collections::{BTreeMap, HashMap, HashSet, hash_map}; use std::ops::{ControlFlow, Range}; +use crate::PERF_TRACE_TARGET; use anyhow::{Context, ensure}; use bytes::{Buf, Bytes, BytesMut}; use enum_map::Enum; @@ -31,7 +32,7 @@ use postgres_ffi::{BLCKSZ, Oid, RepOriginId, TimestampTz, TransactionId}; use serde::{Deserialize, Serialize}; use strum::IntoEnumIterator; use tokio_util::sync::CancellationToken; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, info, info_span, trace, warn}; use utils::bin_ser::{BeSer, DeserializeError}; use utils::lsn::Lsn; use utils::pausable_failpoint; @@ -39,7 +40,7 @@ use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta}; use super::tenant::{PageReconstructError, Timeline}; use crate::aux_file; -use crate::context::RequestContext; +use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder}; use crate::keyspace::{KeySpace, KeySpaceAccum}; use crate::metrics::{ RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD, @@ -209,7 +210,9 @@ impl Timeline { let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)]; let res = self .get_rel_page_at_lsn_batched( - pages.iter().map(|(tag, blknum)| (tag, blknum)), + pages + .iter() + .map(|(tag, blknum)| (tag, blknum, ctx.attached_child())), effective_lsn, io_concurrency.clone(), ctx, @@ -248,7 +251,7 @@ impl Timeline { /// The ordering of the returned vec corresponds to the ordering of `pages`. pub(crate) async fn get_rel_page_at_lsn_batched( &self, - pages: impl ExactSizeIterator, + pages: impl ExactSizeIterator, effective_lsn: Lsn, io_concurrency: IoConcurrency, ctx: &RequestContext, @@ -262,8 +265,11 @@ impl Timeline { let mut result = Vec::with_capacity(pages.len()); let result_slots = result.spare_capacity_mut(); - let mut keys_slots: BTreeMap> = BTreeMap::default(); - for (response_slot_idx, (tag, blknum)) in pages.enumerate() { + let mut keys_slots: BTreeMap> = + BTreeMap::default(); + + let mut perf_instrument = false; + for (response_slot_idx, (tag, blknum, ctx)) in pages.enumerate() { if tag.relnode == 0 { result_slots[response_slot_idx].write(Err(PageReconstructError::Other( RelationError::InvalidRelnode.into(), @@ -274,7 +280,16 @@ impl Timeline { } let nblocks = match self - .get_rel_size(*tag, Version::Lsn(effective_lsn), ctx) + .get_rel_size(*tag, Version::Lsn(effective_lsn), &ctx) + .maybe_perf_instrument(&ctx, |crnt_perf_span| { + info_span!( + target: PERF_TRACE_TARGET, + parent: crnt_perf_span, + "GET_REL_SIZE", + reltag=%tag, + lsn=%effective_lsn, + ) + }) .await { Ok(nblocks) => nblocks, @@ -297,8 +312,12 @@ impl Timeline { let key = rel_block_to_key(*tag, *blknum); + if ctx.has_perf_span() { + perf_instrument = true; + } + let key_slots = keys_slots.entry(key).or_default(); - key_slots.push(response_slot_idx); + key_slots.push((response_slot_idx, ctx)); } let keyspace = { @@ -314,16 +333,34 @@ impl Timeline { acc.to_keyspace() }; - match self - .get_vectored(keyspace, effective_lsn, io_concurrency, ctx) - .await - { + let ctx = match perf_instrument { + true => RequestContextBuilder::from(ctx) + .root_perf_span(|| { + info_span!( + target: PERF_TRACE_TARGET, + "GET_VECTORED", + tenant_id = %self.tenant_shard_id.tenant_id, + timeline_id = %self.timeline_id, + lsn = %effective_lsn, + shard = %self.tenant_shard_id.shard_slug(), + ) + }) + .attached_child(), + false => ctx.attached_child(), + }; + + let res = self + .get_vectored(keyspace, effective_lsn, io_concurrency, &ctx) + .maybe_perf_instrument(&ctx, |current_perf_span| current_perf_span.clone()) + .await; + + match res { Ok(results) => { for (key, res) in results { let mut key_slots = keys_slots.remove(&key).unwrap().into_iter(); - let first_slot = key_slots.next().unwrap(); + let (first_slot, first_req_ctx) = key_slots.next().unwrap(); - for slot in key_slots { + for (slot, req_ctx) in key_slots { let clone = match &res { Ok(buf) => Ok(buf.clone()), Err(err) => Err(match err { @@ -341,17 +378,22 @@ impl Timeline { }; result_slots[slot].write(clone); + // There is no standardized way to express that the batched span followed from N request spans. + // So, abuse the system and mark the request contexts as follows_from the batch span, so we get + // some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for. + req_ctx.perf_follows_from(&ctx); slots_filled += 1; } result_slots[first_slot].write(res); + first_req_ctx.perf_follows_from(&ctx); slots_filled += 1; } } Err(err) => { // this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size // (We enforce the max batch size outside of this function, in the code that constructs the batch request.) - for slot in keys_slots.values().flatten() { + for (slot, req_ctx) in keys_slots.values().flatten() { // this whole `match` is a lot like `From for PageReconstructError` // but without taking ownership of the GetVectoredError let err = match &err { @@ -383,6 +425,7 @@ impl Timeline { } }; + req_ctx.perf_follows_from(&ctx); result_slots[*slot].write(err); } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 0384fcc39f..441597d77f 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -4205,9 +4205,9 @@ impl Tenant { self.cancel.child_token(), ); - let timeline_ctx = RequestContextBuilder::extend(ctx) + let timeline_ctx = RequestContextBuilder::from(ctx) .scope(context::Scope::new_timeline(&timeline)) - .build(); + .detached_child(); Ok((timeline, timeline_ctx)) } diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index ece163b24a..2ea0c1b979 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -13,13 +13,13 @@ pub mod merge_iterator; use std::cmp::Ordering; use std::collections::hash_map::Entry; use std::collections::{BinaryHeap, HashMap}; -use std::future::Future; use std::ops::Range; use std::pin::Pin; use std::sync::Arc; use std::sync::atomic::AtomicUsize; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use crate::PERF_TRACE_TARGET; pub use batch_split_writer::{BatchLayerWriter, SplitDeltaLayerWriter, SplitImageLayerWriter}; use bytes::Bytes; pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef}; @@ -34,7 +34,7 @@ use pageserver_api::key::Key; use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum}; use pageserver_api::record::NeonWalRecord; use pageserver_api::value::Value; -use tracing::{Instrument, trace}; +use tracing::{Instrument, info_span, trace}; use utils::lsn::Lsn; use utils::sync::gate::GateGuard; @@ -43,7 +43,9 @@ use super::PageReconstructError; use super::layer_map::InMemoryLayerDesc; use super::timeline::{GetVectoredError, ReadPath}; use crate::config::PageServerConf; -use crate::context::{AccessStatsBehavior, RequestContext}; +use crate::context::{ + AccessStatsBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder, +}; pub fn range_overlaps(a: &Range, b: &Range) -> bool where @@ -874,13 +876,37 @@ impl ReadableLayer { ) -> Result<(), GetVectoredError> { match self { ReadableLayer::PersistentLayer(layer) => { + let ctx = RequestContextBuilder::from(ctx) + .perf_span(|crnt_perf_span| { + info_span!( + target: PERF_TRACE_TARGET, + parent: crnt_perf_span, + "PLAN_LAYER", + layer = %layer + ) + }) + .attached_child(); + layer - .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx) + .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, &ctx) + .maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone()) .await } ReadableLayer::InMemoryLayer(layer) => { + let ctx = RequestContextBuilder::from(ctx) + .perf_span(|crnt_perf_span| { + info_span!( + target: PERF_TRACE_TARGET, + parent: crnt_perf_span, + "PLAN_LAYER", + layer = %layer + ) + }) + .attached_child(); + layer - .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx) + .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, &ctx) + .maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone()) .await } } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 62adae1680..05b0bc1a5c 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -896,9 +896,9 @@ impl DeltaLayerInner { where Reader: BlockReader + Clone, { - let ctx = RequestContextBuilder::extend(ctx) + let ctx = RequestContextBuilder::from(ctx) .page_content_kind(PageContentKind::DeltaLayerBtreeNode) - .build(); + .attached_child(); for range in keyspace.ranges.iter() { let mut range_end_handled = false; @@ -1105,9 +1105,9 @@ impl DeltaLayerInner { all_keys.push(entry); true }, - &RequestContextBuilder::extend(ctx) + &RequestContextBuilder::from(ctx) .page_content_kind(PageContentKind::DeltaLayerBtreeNode) - .build(), + .attached_child(), ) .await?; if let Some(last) = all_keys.last_mut() { diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index b211eb5416..3243b73942 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -481,9 +481,9 @@ impl ImageLayerInner { let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader); - let ctx = RequestContextBuilder::extend(ctx) + let ctx = RequestContextBuilder::from(ctx) .page_content_kind(PageContentKind::ImageLayerBtreeNode) - .build(); + .attached_child(); for range in keyspace.ranges.iter() { let mut range_end_handled = false; diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index bb4ae38ad1..388ed3201c 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -421,9 +421,9 @@ impl InMemoryLayer { reconstruct_state: &mut ValuesReconstructState, ctx: &RequestContext, ) -> Result<(), GetVectoredError> { - let ctx = RequestContextBuilder::extend(ctx) + let ctx = RequestContextBuilder::from(ctx) .page_content_kind(PageContentKind::InMemoryLayer) - .build(); + .attached_child(); let inner = self.inner.read().await; diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 247092bf45..39665d2cc2 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -3,12 +3,13 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Weak}; use std::time::{Duration, SystemTime}; +use crate::PERF_TRACE_TARGET; use anyhow::Context; use camino::{Utf8Path, Utf8PathBuf}; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::HistoricLayerInfo; use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId}; -use tracing::Instrument; +use tracing::{Instrument, info_span}; use utils::generation::Generation; use utils::id::TimelineId; use utils::lsn::Lsn; @@ -18,7 +19,7 @@ use super::delta_layer::{self}; use super::image_layer::{self}; use super::{ AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName, - LayerVisibilityHint, PersistentLayerDesc, ValuesReconstructState, + LayerVisibilityHint, PerfInstrumentFutureExt, PersistentLayerDesc, ValuesReconstructState, }; use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder}; @@ -324,16 +325,29 @@ impl Layer { reconstruct_data: &mut ValuesReconstructState, ctx: &RequestContext, ) -> Result<(), GetVectoredError> { - let downloaded = + let downloaded = { + let ctx = RequestContextBuilder::from(ctx) + .perf_span(|crnt_perf_span| { + info_span!( + target: PERF_TRACE_TARGET, + parent: crnt_perf_span, + "GET_LAYER", + ) + }) + .attached_child(); + self.0 - .get_or_maybe_download(true, ctx) + .get_or_maybe_download(true, &ctx) + .maybe_perf_instrument(&ctx, |crnt_perf_context| crnt_perf_context.clone()) .await .map_err(|err| match err { DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => { GetVectoredError::Cancelled } other => GetVectoredError::Other(anyhow::anyhow!(other)), - })?; + })? + }; + let this = ResidentLayer { downloaded: downloaded.clone(), owner: self.clone(), @@ -341,9 +355,20 @@ impl Layer { self.record_access(ctx); + let ctx = RequestContextBuilder::from(ctx) + .perf_span(|crnt_perf_span| { + info_span!( + target: PERF_TRACE_TARGET, + parent: crnt_perf_span, + "VISIT_LAYER", + ) + }) + .attached_child(); + downloaded - .get_values_reconstruct_data(this, keyspace, lsn_range, reconstruct_data, ctx) + .get_values_reconstruct_data(this, keyspace, lsn_range, reconstruct_data, &ctx) .instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self)) + .maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone()) .await .map_err(|err| match err { GetVectoredError::Other(err) => GetVectoredError::Other( @@ -1045,15 +1070,34 @@ impl LayerInner { return Err(DownloadError::DownloadRequired); } - let download_ctx = ctx.detached_child(TaskKind::LayerDownload, DownloadBehavior::Download); + let ctx = if ctx.has_perf_span() { + let dl_ctx = RequestContextBuilder::from(ctx) + .task_kind(TaskKind::LayerDownload) + .download_behavior(DownloadBehavior::Download) + .root_perf_span(|| { + info_span!( + target: PERF_TRACE_TARGET, + "DOWNLOAD_LAYER", + layer = %self, + reason = %reason + ) + }) + .detached_child(); + ctx.perf_follows_from(&dl_ctx); + dl_ctx + } else { + ctx.attached_child() + }; async move { tracing::info!(%reason, "downloading on-demand"); let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled()); let res = self - .download_init_and_wait(timeline, permit, download_ctx) + .download_init_and_wait(timeline, permit, ctx.attached_child()) + .maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone()) .await?; + scopeguard::ScopeGuard::into_inner(init_cancelled); Ok(res) } @@ -1720,9 +1764,9 @@ impl DownloadedLayer { ); let res = if owner.desc.is_delta { - let ctx = RequestContextBuilder::extend(ctx) + let ctx = RequestContextBuilder::from(ctx) .page_content_kind(crate::context::PageContentKind::DeltaLayerSummary) - .build(); + .attached_child(); let summary = Some(delta_layer::Summary::expected( owner.desc.tenant_shard_id.tenant_id, owner.desc.timeline_id, @@ -1738,9 +1782,9 @@ impl DownloadedLayer { .await .map(LayerKind::Delta) } else { - let ctx = RequestContextBuilder::extend(ctx) + let ctx = RequestContextBuilder::from(ctx) .page_content_kind(crate::context::PageContentKind::ImageLayerSummary) - .build(); + .attached_child(); let lsn = owner.desc.image_layer_lsn(); let summary = Some(image_layer::Summary::expected( owner.desc.tenant_shard_id.tenant_id, diff --git a/pageserver/src/tenant/storage_layer/layer/tests.rs b/pageserver/src/tenant/storage_layer/layer/tests.rs index 7086429bfe..b6fd4678d6 100644 --- a/pageserver/src/tenant/storage_layer/layer/tests.rs +++ b/pageserver/src/tenant/storage_layer/layer/tests.rs @@ -119,6 +119,10 @@ async fn smoke_test() { let e = layer.evict_and_wait(FOREVER).await.unwrap_err(); assert!(matches!(e, EvictionError::NotFound)); + let dl_ctx = RequestContextBuilder::from(ctx) + .download_behavior(DownloadBehavior::Download) + .attached_child(); + // on accesses when the layer is evicted, it will automatically be downloaded. let img_after = { let mut data = ValuesReconstructState::new(io_concurrency.clone()); @@ -127,7 +131,7 @@ async fn smoke_test() { controlfile_keyspace.clone(), Lsn(0x10)..Lsn(0x11), &mut data, - ctx, + &dl_ctx, ) .instrument(download_span.clone()) .await @@ -177,7 +181,7 @@ async fn smoke_test() { // plain downloading is rarely needed layer - .download_and_keep_resident(ctx) + .download_and_keep_resident(&dl_ctx) .instrument(download_span) .await .unwrap(); @@ -645,9 +649,10 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() { let ctx = ctx.with_scope_timeline(&timeline); // This test does downloads - let ctx = RequestContextBuilder::extend(&ctx) + let ctx = RequestContextBuilder::from(&ctx) .download_behavior(DownloadBehavior::Download) - .build(); + .attached_child(); + let layer = { let mut layers = { let layers = timeline.layers.read().await; @@ -730,9 +735,9 @@ async fn evict_and_wait_does_not_wait_for_download() { let ctx = ctx.with_scope_timeline(&timeline); // This test does downloads - let ctx = RequestContextBuilder::extend(&ctx) + let ctx = RequestContextBuilder::from(&ctx) .download_behavior(DownloadBehavior::Download) - .build(); + .attached_child(); let layer = { let mut layers = { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 80a23bfa94..74e97653d2 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -23,6 +23,7 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering}; use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak}; use std::time::{Duration, Instant, SystemTime}; +use crate::PERF_TRACE_TARGET; use anyhow::{Context, Result, anyhow, bail, ensure}; use arc_swap::{ArcSwap, ArcSwapOption}; use bytes::Bytes; @@ -96,7 +97,9 @@ use super::{ }; use crate::aux_file::AuxFileSizeEstimator; use crate::config::PageServerConf; -use crate::context::{DownloadBehavior, RequestContext}; +use crate::context::{ + DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder, +}; use crate::disk_usage_eviction_task::{DiskUsageEvictionInfo, EvictionCandidate, finite_f32}; use crate::keyspace::{KeyPartitioning, KeySpace}; use crate::l0_flush::{self, L0FlushGlobalState}; @@ -1289,9 +1292,22 @@ impl Timeline { }; reconstruct_state.read_path = read_path; - let traversal_res: Result<(), _> = self - .get_vectored_reconstruct_data(keyspace.clone(), lsn, reconstruct_state, ctx) - .await; + let traversal_res: Result<(), _> = { + let ctx = RequestContextBuilder::from(ctx) + .perf_span(|crnt_perf_span| { + info_span!( + target: PERF_TRACE_TARGET, + parent: crnt_perf_span, + "PLAN_IO", + ) + }) + .attached_child(); + + self.get_vectored_reconstruct_data(keyspace.clone(), lsn, reconstruct_state, &ctx) + .maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone()) + .await + }; + if let Err(err) = traversal_res { // Wait for all the spawned IOs to complete. // See comments on `spawn_io` inside `storage_layer` for more details. @@ -1305,14 +1321,46 @@ impl Timeline { let layers_visited = reconstruct_state.get_layers_visited(); + let ctx = RequestContextBuilder::from(ctx) + .perf_span(|crnt_perf_span| { + info_span!( + target: PERF_TRACE_TARGET, + parent: crnt_perf_span, + "RECONSTRUCT", + ) + }) + .attached_child(); + let futs = FuturesUnordered::new(); for (key, state) in std::mem::take(&mut reconstruct_state.keys) { futs.push({ let walredo_self = self.myself.upgrade().expect("&self method holds the arc"); + let ctx = RequestContextBuilder::from(&ctx) + .perf_span(|crnt_perf_span| { + info_span!( + target: PERF_TRACE_TARGET, + parent: crnt_perf_span, + "RECONSTRUCT_KEY", + key = %key, + ) + }) + .attached_child(); + async move { assert_eq!(state.situation, ValueReconstructSituation::Complete); - let converted = match state.collect_pending_ios().await { + let res = state + .collect_pending_ios() + .maybe_perf_instrument(&ctx, |crnt_perf_span| { + info_span!( + target: PERF_TRACE_TARGET, + parent: crnt_perf_span, + "WAIT_FOR_IO_COMPLETIONS", + ) + }) + .await; + + let converted = match res { Ok(ok) => ok, Err(err) => { return (key, Err(err)); @@ -1329,16 +1377,27 @@ impl Timeline { "{converted:?}" ); - ( - key, - walredo_self.reconstruct_value(key, lsn, converted).await, - ) + let walredo_deltas = converted.num_deltas(); + let walredo_res = walredo_self + .reconstruct_value(key, lsn, converted) + .maybe_perf_instrument(&ctx, |crnt_perf_span| { + info_span!( + target: PERF_TRACE_TARGET, + parent: crnt_perf_span, + "WALREDO", + deltas = %walredo_deltas, + ) + }) + .await; + + (key, walredo_res) } }); } let results = futs .collect::>>() + .maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone()) .await; // For aux file keys (v1 or v2) the vectored read path does not return an error @@ -3875,15 +3934,30 @@ impl Timeline { let TimelineVisitOutcome { completed_keyspace: completed, image_covered_keyspace, - } = Self::get_vectored_reconstruct_data_timeline( - timeline, - keyspace.clone(), - cont_lsn, - reconstruct_state, - &self.cancel, - ctx, - ) - .await?; + } = { + let ctx = RequestContextBuilder::from(ctx) + .perf_span(|crnt_perf_span| { + info_span!( + target: PERF_TRACE_TARGET, + parent: crnt_perf_span, + "PLAN_IO_TIMELINE", + timeline = %timeline.timeline_id, + lsn = %cont_lsn, + ) + }) + .attached_child(); + + Self::get_vectored_reconstruct_data_timeline( + timeline, + keyspace.clone(), + cont_lsn, + reconstruct_state, + &self.cancel, + &ctx, + ) + .maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone()) + .await? + }; keyspace.remove_overlapping_with(&completed); @@ -3927,8 +4001,24 @@ impl Timeline { // Take the min to avoid reconstructing a page with data newer than request Lsn. cont_lsn = std::cmp::min(Lsn(request_lsn.0 + 1), Lsn(timeline.ancestor_lsn.0 + 1)); + + let ctx = RequestContextBuilder::from(ctx) + .perf_span(|crnt_perf_span| { + info_span!( + target: PERF_TRACE_TARGET, + parent: crnt_perf_span, + "GET_ANCESTOR", + timeline = %timeline.timeline_id, + lsn = %cont_lsn, + ancestor = %ancestor_timeline.timeline_id, + ancestor_lsn = %timeline.ancestor_lsn + ) + }) + .attached_child(); + timeline_owned = timeline - .get_ready_ancestor_timeline(ancestor_timeline, ctx) + .get_ready_ancestor_timeline(ancestor_timeline, &ctx) + .maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone()) .await?; timeline = &*timeline_owned; }; @@ -7259,9 +7349,9 @@ mod tests { eprintln!("Downloading {layer} and re-generating heatmap"); - let ctx = &RequestContextBuilder::extend(ctx) + let ctx = &RequestContextBuilder::from(ctx) .download_behavior(crate::context::DownloadBehavior::Download) - .build(); + .attached_child(); let _resident = layer .download_and_keep_resident(ctx) diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 9693d232ee..2ebb1d50cd 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1029,9 +1029,9 @@ impl Timeline { { Ok(((dense_partitioning, sparse_partitioning), lsn)) => { // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them - let image_ctx = RequestContextBuilder::extend(ctx) + let image_ctx = RequestContextBuilder::from(ctx) .access_stats_behavior(AccessStatsBehavior::Skip) - .build(); + .attached_child(); let mut partitioning = dense_partitioning; partitioning diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 7931a0a7d0..11ff2921b9 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -376,6 +376,28 @@ class PageserverWalReceiverProtocol(StrEnum): raise ValueError(f"Unknown protocol type: {proto}") +@dataclass +class PageserverTracingConfig: + sampling_ratio: tuple[int, int] + endpoint: str + protocol: str + timeout: str + + def to_config_key_value(self) -> tuple[str, dict[str, Any]]: + value = { + "sampling_ratio": { + "numerator": self.sampling_ratio[0], + "denominator": self.sampling_ratio[1], + }, + "export_config": { + "endpoint": self.endpoint, + "protocol": self.protocol, + "timeout": self.timeout, + }, + } + return ("tracing", value) + + class NeonEnvBuilder: """ Builder object to create a Neon runtime environment @@ -425,6 +447,7 @@ class NeonEnvBuilder: pageserver_virtual_file_io_mode: str | None = None, pageserver_wal_receiver_protocol: PageserverWalReceiverProtocol | None = None, pageserver_get_vectored_concurrent_io: str | None = None, + pageserver_tracing_config: PageserverTracingConfig | None = None, ): self.repo_dir = repo_dir self.rust_log_override = rust_log_override @@ -478,6 +501,8 @@ class NeonEnvBuilder: pageserver_get_vectored_concurrent_io ) + self.pageserver_tracing_config = pageserver_tracing_config + self.pageserver_default_tenant_config_compaction_algorithm: dict[str, Any] | None = ( pageserver_default_tenant_config_compaction_algorithm ) @@ -1138,6 +1163,7 @@ class NeonEnv: self.pageserver_virtual_file_io_mode = config.pageserver_virtual_file_io_mode self.pageserver_wal_receiver_protocol = config.pageserver_wal_receiver_protocol self.pageserver_get_vectored_concurrent_io = config.pageserver_get_vectored_concurrent_io + self.pageserver_tracing_config = config.pageserver_tracing_config # Create the neon_local's `NeonLocalInitConf` cfg: dict[str, Any] = { @@ -1262,6 +1288,14 @@ class NeonEnv: if key not in ps_cfg: ps_cfg[key] = value + if self.pageserver_tracing_config is not None: + key, value = self.pageserver_tracing_config.to_config_key_value() + + if key not in ps_cfg: + ps_cfg[key] = value + + ps_cfg[key] = value + # Create a corresponding NeonPageserver object self.pageservers.append( NeonPageserver(self, ps_id, port=pageserver_port, az_id=ps_cfg["availability_zone"]) diff --git a/test_runner/fixtures/pageserver/allowed_errors.py b/test_runner/fixtures/pageserver/allowed_errors.py index 27ae5507b1..24c856e279 100755 --- a/test_runner/fixtures/pageserver/allowed_errors.py +++ b/test_runner/fixtures/pageserver/allowed_errors.py @@ -110,6 +110,7 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = ( ".*delaying layer flush by \\S+ for compaction backpressure.*", ".*stalling layer flushes for compaction backpressure.*", ".*layer roll waiting for flush due to compaction backpressure.*", + ".*BatchSpanProcessor.*", ) diff --git a/test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py b/test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py index 6cbbad4bd9..8874fe663b 100644 --- a/test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py +++ b/test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py @@ -10,6 +10,7 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnv, NeonEnvBuilder, + PageserverTracingConfig, PgBin, wait_for_last_flush_lsn, ) @@ -111,6 +112,15 @@ def setup_and_run_pagebench_benchmark( neon_env_builder.pageserver_config_override = ( f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}" ) + + tracing_config = PageserverTracingConfig( + sampling_ratio=(0, 1000), + endpoint="http://localhost:4318/v1/traces", + protocol="http-binary", + timeout="10s", + ) + neon_env_builder.pageserver_tracing_config = tracing_config + ratio = tracing_config.sampling_ratio[0] / tracing_config.sampling_ratio[1] params.update( { "pageserver_config_override.page_cache_size": ( @@ -118,6 +128,7 @@ def setup_and_run_pagebench_benchmark( {"unit": "byte"}, ), "pageserver_config_override.max_file_descriptors": (max_file_descriptors, {"unit": ""}), + "pageserver_config_override.sampling_ratio": (ratio, {"unit": ""}), } )