Compare commits

...

10 Commits

Author SHA1 Message Date
Vlad Lazar
c4ce5dda5d tests: enable sampling with zero ration in tput tests 2025-03-07 17:47:50 +01:00
Vlad Lazar
74c555fd06 pageserver: add get page perf tracing
Sampling is done in page service after reading the request from the
wire. A completely separate span hierarchy is used for perf tracing.
The spans live in the `RequestContext` and span relationships are
expressed via the APIs exposed by `RequestContext`.
2025-03-07 17:47:50 +01:00
Vlad Lazar
716bb3c361 pageserver: thread otel dispatch into connection req context 2025-03-07 17:47:50 +01:00
Vlad Lazar
b90b945c02 pageserver: add perf span utilities to request context 2025-03-07 17:47:46 +01:00
Vlad Lazar
ac1159cb33 utils: allow for setting up OTEL tracing subscriber 2025-03-07 17:44:05 +01:00
Vlad Lazar
74a7f68da0 pageserver: add new runtime for performance spans 2025-03-07 17:44:05 +01:00
Vlad Lazar
2acca9fe33 tracing-utils: add perf span tracking utilities 2025-03-07 17:44:05 +01:00
Vlad Lazar
a2b9ff0d40 tracing-utils: allow for explicit export config 2025-03-07 17:44:05 +01:00
Vlad Lazar
a7f60dd5d0 pageserver: add tracing configuration knobs 2025-03-07 17:44:05 +01:00
Vlad Lazar
2a1cbca4e5 pageserver: clean up request context api
This commit removes `RequestContextBuilder::extend`.
Instead, callers should create attached children via
`RequestContextBuilder` interface.
2025-03-07 17:43:58 +01:00
39 changed files with 963 additions and 151 deletions

4
Cargo.lock generated
View File

@@ -4301,6 +4301,7 @@ dependencies = [
"tokio-util",
"toml_edit",
"tracing",
"tracing-utils",
"url",
"utils",
"wal_decoder",
@@ -4337,6 +4338,7 @@ dependencies = [
"strum",
"strum_macros",
"thiserror 1.0.69",
"tracing-utils",
"utils",
]
@@ -7566,6 +7568,7 @@ dependencies = [
"opentelemetry-otlp",
"opentelemetry-semantic-conventions",
"opentelemetry_sdk",
"pin-project-lite",
"tokio",
"tracing",
"tracing-opentelemetry",
@@ -7806,6 +7809,7 @@ dependencies = [
"tracing",
"tracing-error",
"tracing-subscriber",
"tracing-utils",
"walkdir",
]

View File

@@ -592,6 +592,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
utils::logging::init(
utils::logging::LogFormat::Json,
utils::logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
utils::logging::OtelEnablement::Disabled,
utils::logging::Output::Stdout,
)?;

View File

@@ -24,7 +24,8 @@ pub async fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result
.with_writer(std::io::stderr);
// Initialize OpenTelemetry
let otlp_layer = tracing_utils::init_tracing("compute_ctl").await;
let otlp_layer =
tracing_utils::init_tracing("compute_ctl", tracing_utils::ExportConfig::default()).await;
// Put it all together
tracing_subscriber::registry()

View File

@@ -158,6 +158,7 @@ mod reliable_copy_test {
utils::logging::init(
utils::logging::LogFormat::Test,
utils::logging::TracingErrorLayerEnablement::Disabled,
utils::logging::OtelEnablement::Disabled,
utils::logging::Output::Stdout,
)
.expect("logging init failed");

View File

@@ -34,6 +34,7 @@ postgres_backend.workspace = true
nix = {workspace = true, optional = true}
reqwest.workspace = true
rand.workspace = true
tracing-utils.workspace = true
[dev-dependencies]
bincode.workspace = true

View File

@@ -127,6 +127,7 @@ pub struct ConfigToml {
pub load_previous_heatmap: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub generate_unarchival_heatmap: Option<bool>,
pub tracing: Option<Tracing>,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -184,6 +185,58 @@ pub enum GetVectoredConcurrentIo {
SidecarTask,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct Ratio {
pub numerator: usize,
pub denominator: usize,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct OtelExporterConfig {
pub endpoint: String,
pub protocol: OtelExporterProtocol,
#[serde(with = "humantime_serde")]
pub timeout: Duration,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum OtelExporterProtocol {
Grpc,
HttpBinary,
HttpJson,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(deny_unknown_fields)]
#[serde(rename_all = "kebab-case")]
pub struct Tracing {
pub sampling_ratio: Ratio,
pub export_config: OtelExporterConfig,
}
impl From<&OtelExporterConfig> for tracing_utils::ExportConfig {
fn from(val: &OtelExporterConfig) -> Self {
tracing_utils::ExportConfig {
endpoint: Some(val.endpoint.clone()),
protocol: val.protocol.into(),
timeout: val.timeout,
}
}
}
impl From<OtelExporterProtocol> for tracing_utils::Protocol {
fn from(val: OtelExporterProtocol) -> Self {
match val {
OtelExporterProtocol::Grpc => tracing_utils::Protocol::Grpc,
OtelExporterProtocol::HttpJson => tracing_utils::Protocol::HttpJson,
OtelExporterProtocol::HttpBinary => tracing_utils::Protocol::HttpBinary,
}
}
}
pub mod statvfs {
pub mod mock {
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -529,6 +582,7 @@ impl Default for ConfigToml {
validate_wal_contiguity: None,
load_previous_heatmap: None,
generate_unarchival_heatmap: None,
tracing: None,
}
}
}

View File

@@ -208,6 +208,7 @@ pub(crate) fn ensure_logging_ready() {
utils::logging::init(
utils::logging::LogFormat::Test,
utils::logging::TracingErrorLayerEnablement::Disabled,
utils::logging::OtelEnablement::Disabled,
utils::logging::Output::Stdout,
)
.expect("logging init failed");

View File

@@ -14,6 +14,7 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
tracing.workspace = true
tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
pin-project-lite.workspace = true
[dev-dependencies]
tracing-subscriber.workspace = true # For examples in docs

View File

@@ -21,7 +21,7 @@
//! .with_writer(std::io::stderr);
//!
//! // Initialize OpenTelemetry. Exports tracing spans as OpenTelemetry traces
//! let otlp_layer = tracing_utils::init_tracing("my_application").await;
//! let otlp_layer = tracing_utils::init_tracing("my_application", tracing_utils::ExportConfig::default()).await;
//!
//! // Put it all together
//! tracing_subscriber::registry()
@@ -31,13 +31,15 @@
//! .init();
//! }
//! ```
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
pub mod http;
pub mod perf_span;
use opentelemetry::KeyValue;
use opentelemetry::trace::TracerProvider;
use opentelemetry_otlp::WithExportConfig;
pub use opentelemetry_otlp::{ExportConfig, Protocol};
use tracing::Subscriber;
use tracing_subscriber::Layer;
use tracing_subscriber::registry::LookupSpan;
@@ -69,19 +71,28 @@ use tracing_subscriber::registry::LookupSpan;
///
/// This doesn't block, but is marked as 'async' to hint that this must be called in
/// asynchronous execution context.
pub async fn init_tracing<S>(service_name: &str) -> Option<impl Layer<S>>
pub async fn init_tracing<S>(
service_name: &str,
export_config: ExportConfig,
) -> Option<impl Layer<S>>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
if std::env::var("OTEL_SDK_DISABLED") == Ok("true".to_string()) {
return None;
};
Some(init_tracing_internal(service_name.to_string()))
Some(init_tracing_internal(
service_name.to_string(),
export_config,
))
}
/// Like `init_tracing`, but creates a separate tokio Runtime for the tracing
/// tasks.
pub fn init_tracing_without_runtime<S>(service_name: &str) -> Option<impl Layer<S>>
pub fn init_tracing_without_runtime<S>(
service_name: &str,
export_config: ExportConfig,
) -> Option<impl Layer<S>>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
@@ -112,16 +123,20 @@ where
));
let _guard = runtime.enter();
Some(init_tracing_internal(service_name.to_string()))
Some(init_tracing_internal(
service_name.to_string(),
export_config,
))
}
fn init_tracing_internal<S>(service_name: String) -> impl Layer<S>
fn init_tracing_internal<S>(service_name: String, export_config: ExportConfig) -> impl Layer<S>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
// Sets up exporter from the OTEL_EXPORTER_* environment variables.
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_http()
.with_export_config(export_config)
.build()
.expect("could not initialize opentelemetry exporter");

View File

@@ -0,0 +1,149 @@
//! Crutch module to work around tracing infrastructure deficiencies
//!
//! We wish to collect granular request spans without impacting performance
//! by much. Ideally, we should have zero overhead for a sampling rate of 0.
//!
//! The approach taken by the pageserver crate is to use a completely different
//! span hierarchy for the performance spans. Spans are explicitly stored in
//! the request context and use a different [`tracing::Subscriber`] in order
//! to avoid expensive filtering.
//!
//! [`tracing::Span`] instances record their [`tracing::Dispatch`] and, implcitly,
//! their [`tracing::Subscriber`] at creation time. However, upon exiting the span,
//! the global default [`tracing::Dispatch`] is used. This is problematic if one
//! wishes to juggle different subscribers.
//!
//! In order to work around this, this module provides a [`PerfSpan`] type which
//! wraps a [`Span`] and sets the default subscriber when exiting the span. This
//! achieves the correct routing.
//!
//! There's also a modified version of [`tracing::Instrument`] which works with
//! [`PerfSpan`].
use core::{
future::Future,
marker::Sized,
mem::ManuallyDrop,
pin::Pin,
task::{Context, Poll},
};
use pin_project_lite::pin_project;
use tracing::{Dispatch, field, span::Span};
#[derive(Debug, Clone)]
pub struct PerfSpan {
inner: ManuallyDrop<Span>,
dispatch: Dispatch,
}
#[must_use = "once a span has been entered, it should be exited"]
pub struct PerfSpanEntered<'a> {
span: &'a PerfSpan,
}
impl PerfSpan {
pub fn new(span: Span, dispatch: Dispatch) -> Self {
Self {
inner: ManuallyDrop::new(span),
dispatch,
}
}
pub fn record<Q: field::AsField + ?Sized, V: field::Value>(
&self,
field: &Q,
value: V,
) -> &Self {
self.inner.record(field, value);
self
}
pub fn enter(&self) -> PerfSpanEntered {
PerfSpanEntered { span: self }
}
pub fn inner(&self) -> &Span {
&self.inner
}
}
impl Drop for PerfSpan {
fn drop(&mut self) {
// Bring the desired dispatch into scope before explicitly calling
// the span destructor. This routes the span exit to the correct
// [`tracing::Subscriber`].
let _dispatch_guard = tracing::dispatcher::set_default(&self.dispatch);
// SAFETY: ManuallyDrop in Drop implementation
unsafe { ManuallyDrop::drop(&mut self.inner) }
}
}
impl Drop for PerfSpanEntered<'_> {
fn drop(&mut self) {
assert!(self.span.inner.id().is_some());
let _dispatch_guard = tracing::dispatcher::set_default(&self.span.dispatch);
self.span.dispatch.exit(&self.span.inner.id().unwrap());
}
}
pub trait PerfInstrument: Sized {
fn instrument(self, span: PerfSpan) -> PerfInstrumented<Self> {
PerfInstrumented {
inner: ManuallyDrop::new(self),
span,
}
}
}
pin_project! {
#[project = PerfInstrumentedProj]
#[derive(Debug, Clone)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct PerfInstrumented<T> {
// `ManuallyDrop` is used here to to enter instrument `Drop` by entering
// `Span` and executing `ManuallyDrop::drop`.
#[pin]
inner: ManuallyDrop<T>,
span: PerfSpan,
}
impl<T> PinnedDrop for PerfInstrumented<T> {
fn drop(this: Pin<&mut Self>) {
let this = this.project();
let _enter = this.span.enter();
// SAFETY: 1. `Pin::get_unchecked_mut()` is safe, because this isn't
// different from wrapping `T` in `Option` and calling
// `Pin::set(&mut this.inner, None)`, except avoiding
// additional memory overhead.
// 2. `ManuallyDrop::drop()` is safe, because
// `PinnedDrop::drop()` is guaranteed to be called only
// once.
unsafe { ManuallyDrop::drop(this.inner.get_unchecked_mut()) }
}
}
}
impl<'a, T> PerfInstrumentedProj<'a, T> {
/// Get a mutable reference to the [`Span`] a pinned mutable reference to
/// the wrapped type.
fn span_and_inner_pin_mut(self) -> (&'a mut PerfSpan, Pin<&'a mut T>) {
// SAFETY: As long as `ManuallyDrop<T>` does not move, `T` won't move
// and `inner` is valid, because `ManuallyDrop::drop` is called
// only inside `Drop` of the `Instrumented`.
let inner = unsafe { self.inner.map_unchecked_mut(|v| &mut **v) };
(self.span, inner)
}
}
impl<T: Future> Future for PerfInstrumented<T> {
type Output = T::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let (span, inner) = self.project().span_and_inner_pin_mut();
let _enter = span.enter();
inner.poll(cx)
}
}
impl<T: Sized> PerfInstrument for T {}

View File

@@ -42,6 +42,7 @@ toml_edit = { workspace = true, features = ["serde"] }
tracing.workspace = true
tracing-error.workspace = true
tracing-subscriber = { workspace = true, features = ["json", "registry"] }
tracing-utils.workspace = true
rand.workspace = true
scopeguard.workspace = true
strum.workspace = true

View File

@@ -7,7 +7,9 @@ use metrics::{IntCounter, IntCounterVec};
use once_cell::sync::Lazy;
use strum_macros::{EnumString, VariantNames};
use tokio::time::Instant;
use tracing::Dispatch;
use tracing::info;
use tracing::level_filters::LevelFilter;
/// Logs a critical error, similarly to `tracing::error!`. This will:
///
@@ -125,6 +127,15 @@ pub enum TracingErrorLayerEnablement {
EnableWithRustLogFilter,
}
pub enum OtelEnablement {
Disabled,
Enabled {
service_name: String,
export_config: tracing_utils::ExportConfig,
runtime: &'static tokio::runtime::Runtime,
},
}
/// Where the logging should output to.
#[derive(Clone, Copy)]
pub enum Output {
@@ -132,11 +143,24 @@ pub enum Output {
Stderr,
}
pub struct OtelGuard {
pub dispatch: Dispatch,
}
impl Drop for OtelGuard {
fn drop(&mut self) {
tracing_utils::shutdown_tracing();
}
}
pub const PERF_TRACE_TARGET: &str = "P";
pub fn init(
log_format: LogFormat,
tracing_error_layer_enablement: TracingErrorLayerEnablement,
otel_enablement: OtelEnablement,
output: Output,
) -> anyhow::Result<()> {
) -> anyhow::Result<Option<OtelGuard>> {
// We fall back to printing all spans at info-level or above if
// the RUST_LOG environment variable is not set.
let rust_log_env_filter = || {
@@ -165,6 +189,7 @@ pub fn init(
};
log_layer.with_filter(rust_log_env_filter())
});
let r = r.with(
TracingEventCountLayer(&TRACING_EVENT_COUNT_METRIC).with_filter(rust_log_env_filter()),
);
@@ -175,7 +200,26 @@ pub fn init(
TracingErrorLayerEnablement::Disabled => r.init(),
}
Ok(())
let otel_subscriber = match otel_enablement {
OtelEnablement::Disabled => None,
OtelEnablement::Enabled {
service_name,
export_config,
runtime,
} => {
let otel_layer = runtime
.block_on(tracing_utils::init_tracing(&service_name, export_config))
.with_filter(LevelFilter::INFO);
let otel_subscriber = tracing_subscriber::registry().with(otel_layer);
let otel_dispatch = Dispatch::new(otel_subscriber);
Some(otel_dispatch)
}
};
let otel_guard = otel_subscriber.map(|dispatch| OtelGuard { dispatch });
Ok(otel_guard)
}
/// Disable the default rust panic hook by using `set_hook`.

View File

@@ -66,6 +66,7 @@ tokio-stream.workspace = true
tokio-util.workspace = true
toml_edit = { workspace = true, features = [ "serde" ] }
tracing.workspace = true
tracing-utils.workspace = true
url.workspace = true
walkdir.workspace = true
metrics.workspace = true

View File

@@ -10,9 +10,10 @@ pub(crate) fn setup_logging() {
logging::init(
logging::LogFormat::Test,
logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
utils::logging::OtelEnablement::Disabled,
logging::Output::Stdout,
)
.expect("Failed to init test logging")
.expect("Failed to init test logging");
});
}

View File

@@ -117,6 +117,7 @@ async fn main() -> anyhow::Result<()> {
logging::init(
LogFormat::Plain,
TracingErrorLayerEnablement::EnableWithRustLogFilter,
utils::logging::OtelEnablement::Disabled,
logging::Output::Stdout,
)?;

View File

@@ -35,6 +35,7 @@ fn main() {
logging::init(
logging::LogFormat::Plain,
logging::TracingErrorLayerEnablement::Disabled,
utils::logging::OtelEnablement::Disabled,
logging::Output::Stderr,
)
.unwrap();

View File

@@ -21,7 +21,8 @@ use pageserver::deletion_queue::DeletionQueue;
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::task_mgr::{
BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME, WALRECEIVER_RUNTIME,
BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME, OTEL_RUNTIME,
WALRECEIVER_RUNTIME,
};
use pageserver::tenant::{TenantSharedResources, mgr, secondary};
use pageserver::{
@@ -36,7 +37,7 @@ use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::auth::{JwtAuth, SwappableJwtAuth};
use utils::crashsafe::syncfs;
use utils::logging::TracingErrorLayerEnablement;
use utils::logging::{OtelGuard, TracingErrorLayerEnablement};
use utils::sentry_init::init_sentry;
use utils::{failpoint_support, logging, project_build_tag, project_git_version, tcp_listener};
@@ -110,12 +111,27 @@ fn main() -> anyhow::Result<()> {
} else {
TracingErrorLayerEnablement::Disabled
};
logging::init(
let otel_enablement = match &conf.tracing {
Some(cfg) => utils::logging::OtelEnablement::Enabled {
service_name: "pageserver".to_string(),
export_config: (&cfg.export_config).into(),
runtime: *OTEL_RUNTIME,
},
None => utils::logging::OtelEnablement::Disabled,
};
let otel_guard = logging::init(
conf.log_format,
tracing_error_layer_enablement,
otel_enablement,
logging::Output::Stdout,
)?;
if otel_guard.is_some() {
info!(?conf.tracing, "starting with OTEL tracing enabled");
}
// mind the order required here: 1. logging, 2. panic_hook, 3. sentry.
// disarming this hook on pageserver, because we never tear down tracing.
logging::replace_panic_hook_with_tracing_panic_hook().forget();
@@ -189,7 +205,7 @@ fn main() -> anyhow::Result<()> {
tracing::info!("Initializing page_cache...");
page_cache::init(conf.page_cache_size);
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;
start_pageserver(launch_ts, conf, otel_guard).context("Failed to start pageserver")?;
scenario.teardown();
Ok(())
@@ -287,6 +303,7 @@ fn startup_checkpoint(started_at: Instant, phase: &str, human_phase: &str) {
fn start_pageserver(
launch_ts: &'static LaunchTimestamp,
conf: &'static PageServerConf,
otel_guard: Option<OtelGuard>,
) -> anyhow::Result<()> {
// Monotonic time for later calculating startup duration
let started_startup_at = Instant::now();
@@ -634,13 +651,21 @@ fn start_pageserver(
// Spawn a task to listen for libpq connections. It will spawn further tasks
// for each connection. We created the listener earlier already.
let page_service = page_service::spawn(conf, tenant_manager.clone(), pg_auth, {
let _entered = COMPUTE_REQUEST_RUNTIME.enter(); // TcpListener::from_std requires it
pageserver_listener
.set_nonblocking(true)
.context("set listener to nonblocking")?;
tokio::net::TcpListener::from_std(pageserver_listener).context("create tokio listener")?
});
let perf_trace_dispatch = otel_guard.as_ref().map(|g| g.dispatch.clone());
let page_service = page_service::spawn(
conf,
tenant_manager.clone(),
pg_auth,
perf_trace_dispatch,
{
let _entered = COMPUTE_REQUEST_RUNTIME.enter(); // TcpListener::from_std requires it
pageserver_listener
.set_nonblocking(true)
.context("set listener to nonblocking")?;
tokio::net::TcpListener::from_std(pageserver_listener)
.context("create tokio listener")?
},
);
// All started up! Now just sit and wait for shutdown signal.
BACKGROUND_RUNTIME.block_on(async move {

View File

@@ -201,6 +201,8 @@ pub struct PageServerConf {
/// When set, include visible layers in the next uploaded heatmaps of an unarchived timeline.
pub generate_unarchival_heatmap: bool,
pub tracing: Option<pageserver_api::config::Tracing>,
}
/// Token for authentication to safekeepers
@@ -367,6 +369,7 @@ impl PageServerConf {
validate_wal_contiguity,
load_previous_heatmap,
generate_unarchival_heatmap,
tracing,
} = config_toml;
let mut conf = PageServerConf {
@@ -412,6 +415,7 @@ impl PageServerConf {
wal_receiver_protocol,
page_service_pipelining,
get_vectored_concurrent_io,
tracing,
// ------------------------------------------------------------
// fields that require additional validation or custom handling
@@ -476,6 +480,17 @@ impl PageServerConf {
);
}
if let Some(tracing_config) = conf.tracing.as_ref() {
let ratio = &tracing_config.sampling_ratio;
ensure!(
ratio.denominator != 0 && ratio.denominator >= ratio.numerator,
format!(
"Invalid sampling ratio: {}/{}",
ratio.numerator, ratio.denominator
)
);
}
IndexEntry::validate_checkpoint_distance(conf.default_tenant_conf.checkpoint_distance)
.map_err(anyhow::Error::msg)
.with_context(|| {

View File

@@ -89,16 +89,38 @@
//! [`RequestContext`] argument. Functions in the middle of the call chain
//! only need to pass it on.
use futures::FutureExt;
use futures::future::BoxFuture;
use std::future::Future;
use tracing_utils::perf_span::{PerfInstrument, PerfSpan};
use tracing::{Dispatch, Span};
use crate::task_mgr::TaskKind;
// The main structure of this module, see module-level comment.
#[derive(Debug)]
#[derive(Clone)]
pub struct RequestContext {
task_kind: TaskKind,
download_behavior: DownloadBehavior,
access_stats_behavior: AccessStatsBehavior,
page_content_kind: PageContentKind,
read_path_debug: bool,
perf_span: Option<PerfSpan>,
perf_span_dispatch: Option<Dispatch>,
}
impl std::fmt::Debug for RequestContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RequestContext")
.field("task_kind", &self.task_kind)
.field("download_behavior", &self.download_behavior)
.field("access_stats_behavior", &self.access_stats_behavior)
.field("page_content_kind", &self.page_content_kind)
.field("read_path_debug", &self.read_path_debug)
// perf_span and perf_span_dispatch are omitted on purpose
.finish()
}
}
/// The kind of access to the page cache.
@@ -157,24 +179,23 @@ impl RequestContextBuilder {
access_stats_behavior: AccessStatsBehavior::Update,
page_content_kind: PageContentKind::Unknown,
read_path_debug: false,
perf_span: None,
perf_span_dispatch: None,
},
}
}
pub fn extend(original: &RequestContext) -> Self {
pub fn from(original: &RequestContext) -> Self {
Self {
// This is like a Copy, but avoid implementing Copy because ordinary users of
// RequestContext should always move or ref it.
inner: RequestContext {
task_kind: original.task_kind,
download_behavior: original.download_behavior,
access_stats_behavior: original.access_stats_behavior,
page_content_kind: original.page_content_kind,
read_path_debug: original.read_path_debug,
},
inner: original.clone(),
}
}
pub fn task_kind(mut self, b: TaskKind) -> Self {
self.inner.task_kind = b;
self
}
/// Configure the DownloadBehavior of the context: whether to
/// download missing layers, and/or warn on the download.
pub fn download_behavior(mut self, b: DownloadBehavior) -> Self {
@@ -199,7 +220,52 @@ impl RequestContextBuilder {
self
}
pub fn build(self) -> RequestContext {
pub(crate) fn perf_span_dispatch(mut self, dispatch: Option<Dispatch>) -> Self {
self.inner.perf_span_dispatch = dispatch;
self
}
pub fn root_perf_span<Fn>(mut self, make_span: Fn) -> Self
where
Fn: FnOnce() -> Span,
{
assert!(self.inner.perf_span.is_none());
assert!(self.inner.perf_span_dispatch.is_some());
let dispatcher = self.inner.perf_span_dispatch.as_ref().unwrap();
let new_span = tracing::dispatcher::with_default(dispatcher, make_span);
self.inner.perf_span = Some(PerfSpan::new(new_span, dispatcher.clone()));
self
}
pub fn perf_span<Fn>(mut self, make_span: Fn) -> Self
where
Fn: FnOnce(&Span) -> Span,
{
if let Some(ref perf_span) = self.inner.perf_span {
assert!(self.inner.perf_span_dispatch.is_some());
let dispatcher = self.inner.perf_span_dispatch.as_ref().unwrap();
let new_span =
tracing::dispatcher::with_default(dispatcher, || make_span(perf_span.inner()));
self.inner.perf_span = Some(PerfSpan::new(new_span, dispatcher.clone()));
}
self
}
pub fn root(self) -> RequestContext {
self.inner
}
pub fn attached_child(self) -> RequestContext {
self.inner
}
pub fn detached_child(self) -> RequestContext {
self.inner
}
}
@@ -220,7 +286,7 @@ impl RequestContext {
pub fn new(task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
RequestContextBuilder::new(task_kind)
.download_behavior(download_behavior)
.build()
.root()
}
/// Create a detached child context for a task that may outlive `self`.
@@ -241,7 +307,10 @@ impl RequestContext {
///
/// We could make new calls to this function fail if `self` is already canceled.
pub fn detached_child(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
self.child_impl(task_kind, download_behavior)
RequestContextBuilder::from(self)
.task_kind(task_kind)
.download_behavior(download_behavior)
.detached_child()
}
/// Create a child of context `self` for a task that shall not outlive `self`.
@@ -265,7 +334,7 @@ impl RequestContext {
/// The method to wait for child tasks would return an error, indicating
/// that the child task was not started because the context was canceled.
pub fn attached_child(&self) -> Self {
self.child_impl(self.task_kind(), self.download_behavior())
RequestContextBuilder::from(self).attached_child()
}
/// Use this function when you should be creating a child context using
@@ -280,10 +349,6 @@ impl RequestContext {
Self::new(task_kind, download_behavior)
}
fn child_impl(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
Self::new(task_kind, download_behavior)
}
pub fn task_kind(&self) -> TaskKind {
self.task_kind
}
@@ -303,4 +368,51 @@ impl RequestContext {
pub(crate) fn read_path_debug(&self) -> bool {
self.read_path_debug
}
pub(crate) fn perf_follows_from(&self, from: &RequestContext) {
if let (Some(span), Some(from_span)) = (&self.perf_span, &from.perf_span) {
span.inner().follows_from(from_span.inner());
}
}
pub(crate) fn maybe_instrument<'a, Fut, Fn>(
&self,
future: Fut,
make_span: Fn,
) -> BoxFuture<'a, Fut::Output>
where
Fut: Future + Send + 'a,
Fn: FnOnce(&Span) -> Span,
{
match &self.perf_span {
Some(perf_span) => {
assert!(self.perf_span_dispatch.is_some());
let dispatcher = self.perf_span_dispatch.as_ref().unwrap();
let new_span =
tracing::dispatcher::with_default(dispatcher, || make_span(perf_span.inner()));
let new_perf_span = PerfSpan::new(new_span, dispatcher.clone());
future.instrument(new_perf_span).boxed()
}
None => future.boxed(),
}
}
pub(crate) fn perf_span_record<
Q: tracing::field::AsField + ?Sized,
V: tracing::field::Value,
>(
&self,
field: &Q,
value: V,
) {
if let Some(span) = &self.perf_span {
span.record(field, value);
}
}
pub(crate) fn has_perf_span(&self) -> bool {
self.perf_span.is_some()
}
}

View File

@@ -2579,9 +2579,10 @@ async fn getpage_at_lsn_handler_inner(
let lsn: Option<Lsn> = parse_query_param(&request, "lsn")?;
async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
// Enable read path debugging
let ctx = RequestContextBuilder::extend(&ctx).read_path_debug(true).build();
let ctx = RequestContextBuilder::new(TaskKind::MgmtRequest)
.download_behavior(DownloadBehavior::Download)
.read_path_debug(true)
.root();
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
// Use last_record_lsn if no lsn is provided

View File

@@ -17,7 +17,7 @@ use itertools::Itertools;
use once_cell::sync::OnceCell;
use pageserver_api::config::{
PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
PageServiceProtocolPipelinedExecutionStrategy,
PageServiceProtocolPipelinedExecutionStrategy, Tracing,
};
use pageserver_api::key::rel_block_to_key;
use pageserver_api::models::{
@@ -36,6 +36,7 @@ use postgres_ffi::BLCKSZ;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use pq_proto::framed::ConnectionError;
use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor};
use rand::Rng;
use strum_macros::IntoStaticStr;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter};
use tokio::task::JoinHandle;
@@ -44,6 +45,7 @@ use tracing::*;
use utils::auth::{Claims, Scope, SwappableJwtAuth};
use utils::failpoint_support;
use utils::id::{TenantId, TimelineId};
use utils::logging::PERF_TRACE_TARGET;
use utils::logging::log_slow;
use utils::lsn::Lsn;
use utils::simple_rcu::RcuReadGuard;
@@ -53,7 +55,7 @@ use utils::sync::spsc_fold;
use crate::auth::check_permission;
use crate::basebackup::BasebackupError;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
use crate::metrics::{
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, LIVE_CONNECTIONS, SmgrOpTimer,
};
@@ -99,6 +101,7 @@ pub fn spawn(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
pg_auth: Option<Arc<SwappableJwtAuth>>,
perf_trace_dispatch: Option<Dispatch>,
tcp_listener: tokio::net::TcpListener,
) -> Listener {
let cancel = CancellationToken::new();
@@ -116,6 +119,7 @@ pub fn spawn(
conf,
tenant_manager,
pg_auth,
perf_trace_dispatch,
tcp_listener,
conf.pg_auth_type,
conf.page_service_pipelining.clone(),
@@ -172,6 +176,7 @@ pub async fn libpq_listener_main(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
perf_trace_dispatch: Option<Dispatch>,
listener: tokio::net::TcpListener,
auth_type: AuthType,
pipelining_config: PageServicePipeliningConfig,
@@ -204,8 +209,12 @@ pub async fn libpq_listener_main(
// Connection established. Spawn a new task to handle it.
debug!("accepted connection from {}", peer_addr);
let local_auth = auth.clone();
let connection_ctx = listener_ctx
.detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
let connection_ctx = RequestContextBuilder::from(&listener_ctx)
.task_kind(TaskKind::PageRequestHandler)
.download_behavior(DownloadBehavior::Download)
.perf_span_dispatch(perf_trace_dispatch.clone())
.detached_child();
connection_handler_tasks.spawn(page_service_conn_main(
conf,
tenant_manager.clone(),
@@ -600,6 +609,7 @@ impl std::fmt::Display for BatchedPageStreamError {
struct BatchedGetPageRequest {
req: PagestreamGetPageRequest,
timer: SmgrOpTimer,
ctx: RequestContext,
}
#[cfg(feature = "testing")]
@@ -736,6 +746,7 @@ impl PageServerHandler {
tenant_id: TenantId,
timeline_id: TimelineId,
timeline_handles: &mut TimelineHandles,
tracing_config: Option<&Tracing>,
cancel: &CancellationToken,
ctx: &RequestContext,
protocol_version: PagestreamProtocolVersion,
@@ -895,10 +906,55 @@ impl PageServerHandler {
}
let key = rel_block_to_key(req.rel, req.blkno);
let shard = match timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Page(key))
.await
{
let sampled = match tracing_config {
Some(conf) => {
let ratio = &conf.sampling_ratio;
if ratio.numerator == 0 {
false
} else {
rand::thread_rng().gen_range(0..ratio.denominator) < ratio.numerator
}
}
None => false,
};
let get_page_context = if sampled {
RequestContextBuilder::from(ctx)
.root_perf_span(|| {
info_span!(
target: PERF_TRACE_TARGET,
"GET_PAGE",
tenant_id = %tenant_id,
timeline_id = %timeline_id,
lsn = %req.hdr.request_lsn,
request_id = %req.hdr.reqid,
key = %key)
})
.attached_child()
} else {
ctx.attached_child()
};
let res = get_page_context
.maybe_instrument(
timeline_handles.get(tenant_id, timeline_id, ShardSelector::Page(key)),
|current_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: current_perf_span,
"SHARD_SELECTION",
tenant_id = %tenant_id,
timeline_id = %timeline_id,
lsn = %req.hdr.request_lsn,
request_id = %req.hdr.reqid
)
},
)
.await;
let shard = match res {
Ok(tl) => tl,
Err(e) => {
let span = mkspan!(before shard routing);
@@ -927,24 +983,59 @@ impl PageServerHandler {
};
let span = mkspan!(shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetPageAtLsn,
received_at,
)
.await?;
// TODO(vlad): why does this not show up?
get_page_context.perf_span_record(
"shard",
tracing::field::display(shard.get_shard_identity().shard_slug()),
);
let timer = get_page_context
.maybe_instrument(
record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetPageAtLsn,
received_at,
),
|current_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: current_perf_span,
"THROTTLE",
tenant_id = %tenant_id,
timeline_id = %timeline_id,
lsn = %req.hdr.request_lsn,
request_id = %req.hdr.reqid
)
},
)
.await?;
// We're holding the Handle
let effective_request_lsn = match Self::wait_or_get_last_lsn(
&shard,
req.hdr.request_lsn,
req.hdr.not_modified_since,
&shard.get_applied_gc_cutoff_lsn(),
ctx,
)
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
.await
{
let res = get_page_context
.maybe_instrument(
Self::wait_or_get_last_lsn(
&shard,
req.hdr.request_lsn,
req.hdr.not_modified_since,
&shard.get_applied_gc_cutoff_lsn(),
ctx,
),
|current_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: current_perf_span,
"WAIT_LSN",
tenant_id = %tenant_id,
timeline_id = %timeline_id,
lsn = %req.hdr.request_lsn,
request_id = %req.hdr.reqid
)
},
)
.await;
let effective_request_lsn = match res {
Ok(lsn) => lsn,
Err(e) => {
return respond_error!(span, e);
@@ -954,7 +1045,11 @@ impl PageServerHandler {
span,
shard: shard.downgrade(),
effective_request_lsn,
pages: smallvec::smallvec![BatchedGetPageRequest { req, timer }],
pages: smallvec::smallvec![BatchedGetPageRequest {
req,
timer,
ctx: get_page_context
}],
}
}
#[cfg(feature = "testing")]
@@ -1486,12 +1581,15 @@ impl PageServerHandler {
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
{
let cancel = self.cancel.clone();
let tracing_config = self.conf.tracing.clone();
let err = loop {
let msg = Self::pagestream_read_message(
&mut pgb_reader,
tenant_id,
timeline_id,
&mut timeline_handles,
tracing_config.as_ref(),
&cancel,
ctx,
protocol_version,
@@ -1625,6 +1723,8 @@ impl PageServerHandler {
// Batcher
//
let tracing_config = self.conf.tracing.clone();
let cancel_batcher = self.cancel.child_token();
let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| {
@@ -1638,6 +1738,7 @@ impl PageServerHandler {
tenant_id,
timeline_id,
&mut timeline_handles,
tracing_config.as_ref(),
&cancel_batcher,
&ctx,
protocol_version,
@@ -1976,7 +2077,9 @@ impl PageServerHandler {
let results = timeline
.get_rel_page_at_lsn_batched(
requests.iter().map(|p| (&p.req.rel, &p.req.blkno)),
requests
.iter()
.map(|p| (&p.req.rel, &p.req.blkno, p.ctx.attached_child())),
effective_lsn,
io_concurrency,
ctx,

View File

@@ -31,15 +31,16 @@ use postgres_ffi::{BLCKSZ, Oid, RepOriginId, TimestampTz, TransactionId};
use serde::{Deserialize, Serialize};
use strum::IntoEnumIterator;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, trace, warn};
use tracing::{debug, info, info_span, trace, warn};
use utils::bin_ser::{BeSer, DeserializeError};
use utils::logging::PERF_TRACE_TARGET;
use utils::lsn::Lsn;
use utils::pausable_failpoint;
use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
use super::tenant::{PageReconstructError, Timeline};
use crate::aux_file;
use crate::context::RequestContext;
use crate::context::{RequestContext, RequestContextBuilder};
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::metrics::{
RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
@@ -209,7 +210,9 @@ impl Timeline {
let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
let res = self
.get_rel_page_at_lsn_batched(
pages.iter().map(|(tag, blknum)| (tag, blknum)),
pages
.iter()
.map(|(tag, blknum)| (tag, blknum, ctx.attached_child())),
effective_lsn,
io_concurrency.clone(),
ctx,
@@ -248,7 +251,7 @@ impl Timeline {
/// The ordering of the returned vec corresponds to the ordering of `pages`.
pub(crate) async fn get_rel_page_at_lsn_batched(
&self,
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber)>,
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, RequestContext)>,
effective_lsn: Lsn,
io_concurrency: IoConcurrency,
ctx: &RequestContext,
@@ -262,8 +265,11 @@ impl Timeline {
let mut result = Vec::with_capacity(pages.len());
let result_slots = result.spare_capacity_mut();
let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[usize; 1]>> = BTreeMap::default();
for (response_slot_idx, (tag, blknum)) in pages.enumerate() {
let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
BTreeMap::default();
let mut perf_instrument = false;
for (response_slot_idx, (tag, blknum, req_ctx)) in pages.enumerate() {
if tag.relnode == 0 {
result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
@@ -273,6 +279,7 @@ impl Timeline {
continue;
}
// TODO: perf span
let nblocks = match self
.get_rel_size(*tag, Version::Lsn(effective_lsn), ctx)
.await
@@ -297,8 +304,12 @@ impl Timeline {
let key = rel_block_to_key(*tag, *blknum);
if req_ctx.has_perf_span() {
perf_instrument = true;
}
let key_slots = keys_slots.entry(key).or_default();
key_slots.push(response_slot_idx);
key_slots.push((response_slot_idx, req_ctx));
}
let keyspace = {
@@ -314,16 +325,36 @@ impl Timeline {
acc.to_keyspace()
};
match self
.get_vectored(keyspace, effective_lsn, io_concurrency, ctx)
.await
{
let get_vectored_ctx = match perf_instrument {
true => RequestContextBuilder::from(ctx)
.root_perf_span(|| {
info_span!(
target: PERF_TRACE_TARGET,
"GET_VECTORED",
tenant_id = %self.tenant_shard_id.tenant_id,
timeline_id = %self.timeline_id,
lsn = %effective_lsn,
shard = %self.tenant_shard_id.shard_slug(),
)
})
.attached_child(),
false => ctx.attached_child(),
};
let res = get_vectored_ctx
.maybe_instrument(
self.get_vectored(keyspace, effective_lsn, io_concurrency, &get_vectored_ctx),
|current_perf_span| current_perf_span.clone(),
)
.await;
match res {
Ok(results) => {
for (key, res) in results {
let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
let first_slot = key_slots.next().unwrap();
let (first_slot, first_req_ctx) = key_slots.next().unwrap();
for slot in key_slots {
for (slot, req_ctx) in key_slots {
let clone = match &res {
Ok(buf) => Ok(buf.clone()),
Err(err) => Err(match err {
@@ -341,17 +372,19 @@ impl Timeline {
};
result_slots[slot].write(clone);
req_ctx.perf_follows_from(&get_vectored_ctx);
slots_filled += 1;
}
result_slots[first_slot].write(res);
first_req_ctx.perf_follows_from(&get_vectored_ctx);
slots_filled += 1;
}
}
Err(err) => {
// this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
// (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
for slot in keys_slots.values().flatten() {
for (slot, req_ctx) in keys_slots.values().flatten() {
// this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
// but without taking ownership of the GetVectoredError
let err = match &err {
@@ -383,6 +416,7 @@ impl Timeline {
}
};
req_ctx.perf_follows_from(&get_vectored_ctx);
result_slots[*slot].write(err);
}

View File

@@ -217,9 +217,10 @@ pageserver_runtime!(COMPUTE_REQUEST_RUNTIME, "compute request worker");
pageserver_runtime!(MGMT_REQUEST_RUNTIME, "mgmt request worker");
pageserver_runtime!(WALRECEIVER_RUNTIME, "walreceiver worker");
pageserver_runtime!(BACKGROUND_RUNTIME, "background op worker");
pageserver_runtime!(OTEL_RUNTIME, "open telemetry worker");
// Bump this number when adding a new pageserver_runtime!
// SAFETY: it's obviously correct
const NUM_MULTIPLE_RUNTIMES: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(4) };
const NUM_MULTIPLE_RUNTIMES: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(5) };
#[derive(Debug, Clone, Copy)]
pub struct PageserverTaskId(u64);

View File

@@ -5718,9 +5718,10 @@ pub(crate) mod harness {
// enable it in case the tests exercise code paths that use
// debug_assert_current_span_has_tenant_and_timeline_id
logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
utils::logging::OtelEnablement::Disabled,
logging::Output::Stdout,
)
.expect("Failed to init test logging")
.expect("Failed to init test logging");
});
}

View File

@@ -13,7 +13,6 @@ pub mod merge_iterator;
use std::cmp::Ordering;
use std::collections::hash_map::Entry;
use std::collections::{BinaryHeap, HashMap};
use std::future::Future;
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
@@ -34,7 +33,8 @@ use pageserver_api::key::Key;
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
use pageserver_api::record::NeonWalRecord;
use pageserver_api::value::Value;
use tracing::{Instrument, trace};
use tracing::{Instrument, info_span, trace};
use utils::logging::PERF_TRACE_TARGET;
use utils::lsn::Lsn;
use utils::sync::gate::GateGuard;
@@ -43,7 +43,7 @@ use super::PageReconstructError;
use super::layer_map::InMemoryLayerDesc;
use super::timeline::{GetVectoredError, ReadPath};
use crate::config::PageServerConf;
use crate::context::{AccessStatsBehavior, RequestContext};
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
where
@@ -874,13 +874,51 @@ impl ReadableLayer {
) -> Result<(), GetVectoredError> {
match self {
ReadableLayer::PersistentLayer(layer) => {
layer
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
let persistent_context = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"PLAN_LAYER",
layer = %layer
)
})
.attached_child();
persistent_context
.maybe_instrument(
layer.get_values_reconstruct_data(
keyspace,
lsn_range,
reconstruct_state,
&persistent_context,
),
|crnt_perf_span| crnt_perf_span.clone(),
)
.await
}
ReadableLayer::InMemoryLayer(layer) => {
layer
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
let in_mem_context = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"PLAN_LAYER",
layer = %layer
)
})
.attached_child();
in_mem_context
.maybe_instrument(
layer.get_values_reconstruct_data(
keyspace,
lsn_range,
reconstruct_state,
&in_mem_context,
),
|crnt_perf_span| crnt_perf_span.clone(),
)
.await
}
}

View File

@@ -896,9 +896,9 @@ impl DeltaLayerInner {
where
Reader: BlockReader + Clone,
{
let ctx = RequestContextBuilder::extend(ctx)
let ctx = RequestContextBuilder::from(ctx)
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
.build();
.attached_child();
for range in keyspace.ranges.iter() {
let mut range_end_handled = false;
@@ -1105,9 +1105,9 @@ impl DeltaLayerInner {
all_keys.push(entry);
true
},
&RequestContextBuilder::extend(ctx)
&RequestContextBuilder::from(ctx)
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
.build(),
.attached_child(),
)
.await?;
if let Some(last) = all_keys.last_mut() {

View File

@@ -481,9 +481,9 @@ impl ImageLayerInner {
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
let ctx = RequestContextBuilder::extend(ctx)
let ctx = RequestContextBuilder::from(ctx)
.page_content_kind(PageContentKind::ImageLayerBtreeNode)
.build();
.attached_child();
for range in keyspace.ranges.iter() {
let mut range_end_handled = false;

View File

@@ -420,9 +420,9 @@ impl InMemoryLayer {
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let ctx = RequestContextBuilder::extend(ctx)
let ctx = RequestContextBuilder::from(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build();
.attached_child();
let inner = self.inner.read().await;

View File

@@ -8,9 +8,10 @@ use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::HistoricLayerInfo;
use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId};
use tracing::Instrument;
use tracing::{Instrument, info_span};
use utils::generation::Generation;
use utils::id::TimelineId;
use utils::logging::PERF_TRACE_TARGET;
use utils::lsn::Lsn;
use utils::sync::{gate, heavier_once_cell};
@@ -324,16 +325,29 @@ impl Layer {
reconstruct_data: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let downloaded =
self.0
.get_or_maybe_download(true, ctx)
.await
.map_err(|err| match err {
DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => {
GetVectoredError::Cancelled
}
other => GetVectoredError::Other(anyhow::anyhow!(other)),
})?;
let get_layer_context = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"GET_LAYER",
)
})
.attached_child();
let downloaded = get_layer_context
.maybe_instrument(
self.0.get_or_maybe_download(true, &get_layer_context),
|crnt_perf_context| crnt_perf_context.clone(),
)
.await
.map_err(|err| match err {
DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => {
GetVectoredError::Cancelled
}
other => GetVectoredError::Other(anyhow::anyhow!(other)),
})?;
let this = ResidentLayer {
downloaded: downloaded.clone(),
owner: self.clone(),
@@ -341,9 +355,29 @@ impl Layer {
self.record_access(ctx);
downloaded
.get_values_reconstruct_data(this, keyspace, lsn_range, reconstruct_data, ctx)
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
let visit_layer_context = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"VISIT_LAYER",
)
})
.attached_child();
visit_layer_context
.maybe_instrument(
downloaded
.get_values_reconstruct_data(
this,
keyspace,
lsn_range,
reconstruct_data,
&visit_layer_context,
)
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self)),
|crnt_perf_span| crnt_perf_span.clone(),
)
.await
.map_err(|err| match err {
GetVectoredError::Other(err) => GetVectoredError::Other(
@@ -1045,15 +1079,36 @@ impl LayerInner {
return Err(DownloadError::DownloadRequired);
}
let download_ctx = ctx.detached_child(TaskKind::LayerDownload, DownloadBehavior::Download);
let download_ctx = if ctx.has_perf_span() {
let dl_ctx = RequestContextBuilder::from(ctx)
.task_kind(TaskKind::LayerDownload)
.download_behavior(DownloadBehavior::Download)
.root_perf_span(|| {
info_span!(
target: PERF_TRACE_TARGET,
"DOWNLOAD_LAYER",
layer = %self,
reason = %reason
)
})
.detached_child();
ctx.perf_follows_from(&dl_ctx);
dl_ctx
} else {
ctx.attached_child()
};
async move {
tracing::info!(%reason, "downloading on-demand");
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
let res = self
.download_init_and_wait(timeline, permit, download_ctx)
let res = download_ctx
.maybe_instrument(
self.download_init_and_wait(timeline, permit, download_ctx.attached_child()),
|crnt_perf_span| crnt_perf_span.clone(),
)
.await?;
scopeguard::ScopeGuard::into_inner(init_cancelled);
Ok(res)
}
@@ -1720,9 +1775,9 @@ impl DownloadedLayer {
);
let res = if owner.desc.is_delta {
let ctx = RequestContextBuilder::extend(ctx)
let ctx = RequestContextBuilder::from(ctx)
.page_content_kind(crate::context::PageContentKind::DeltaLayerSummary)
.build();
.attached_child();
let summary = Some(delta_layer::Summary::expected(
owner.desc.tenant_shard_id.tenant_id,
owner.desc.timeline_id,
@@ -1738,9 +1793,9 @@ impl DownloadedLayer {
.await
.map(LayerKind::Delta)
} else {
let ctx = RequestContextBuilder::extend(ctx)
let ctx = RequestContextBuilder::from(ctx)
.page_content_kind(crate::context::PageContentKind::ImageLayerSummary)
.build();
.attached_child();
let lsn = owner.desc.image_layer_lsn();
let summary = Some(image_layer::Summary::expected(
owner.desc.tenant_shard_id.tenant_id,

View File

@@ -644,9 +644,9 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
.unwrap();
// This test does downloads
let ctx = RequestContextBuilder::extend(&ctx)
let ctx = RequestContextBuilder::from(&ctx)
.download_behavior(DownloadBehavior::Download)
.build();
.attached_child();
let layer = {
let mut layers = {
@@ -729,9 +729,9 @@ async fn evict_and_wait_does_not_wait_for_download() {
.unwrap();
// This test does downloads
let ctx = RequestContextBuilder::extend(&ctx)
let ctx = RequestContextBuilder::from(&ctx)
.download_behavior(DownloadBehavior::Download)
.build();
.attached_child();
let layer = {
let mut layers = {

View File

@@ -67,6 +67,7 @@ use tracing::*;
use utils::generation::Generation;
use utils::guard_arc_swap::GuardArcSwap;
use utils::id::TimelineId;
use utils::logging::PERF_TRACE_TARGET;
use utils::lsn::{AtomicLsn, Lsn, RecordLsn};
use utils::postgres_client::PostgresClientProtocol;
use utils::rate_limit::RateLimit;
@@ -94,7 +95,7 @@ use super::{
};
use crate::aux_file::AuxFileSizeEstimator;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
use crate::disk_usage_eviction_task::{DiskUsageEvictionInfo, EvictionCandidate, finite_f32};
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::l0_flush::{self, L0FlushGlobalState};
@@ -1274,9 +1275,28 @@ impl Timeline {
};
reconstruct_state.read_path = read_path;
let traversal_res: Result<(), _> = self
.get_vectored_reconstruct_data(keyspace.clone(), lsn, reconstruct_state, ctx)
let plan_context = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"PLAN_IO",
)
})
.attached_child();
let traversal_res: Result<(), _> = plan_context
.maybe_instrument(
self.get_vectored_reconstruct_data(
keyspace.clone(),
lsn,
reconstruct_state,
&plan_context,
),
|crnt_perf_span| crnt_perf_span.clone(),
)
.await;
if let Err(err) = traversal_res {
// Wait for all the spawned IOs to complete.
// See comments on `spawn_io` inside `storage_layer` for more details.
@@ -1290,14 +1310,45 @@ impl Timeline {
let layers_visited = reconstruct_state.get_layers_visited();
let execute_context = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"RECONSTRUCT",
)
})
.attached_child();
let futs = FuturesUnordered::new();
for (key, state) in std::mem::take(&mut reconstruct_state.keys) {
futs.push({
let walredo_self = self.myself.upgrade().expect("&self method holds the arc");
let execute_key_context = RequestContextBuilder::from(&execute_context)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"RECONSTRUCT_KEY",
key = %key,
)
})
.attached_child();
async move {
assert_eq!(state.situation, ValueReconstructSituation::Complete);
let converted = match state.collect_pending_ios().await {
let res = execute_key_context
.maybe_instrument(state.collect_pending_ios(), |crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"EXECUTE_IO",
)
})
.await;
let converted = match res {
Ok(ok) => ok,
Err(err) => {
return (key, Err(err));
@@ -1314,16 +1365,31 @@ impl Timeline {
"{converted:?}"
);
(
key,
walredo_self.reconstruct_value(key, lsn, converted).await,
)
let walredo_deltas = converted.num_deltas();
let walredo_res = execute_key_context
.maybe_instrument(
walredo_self.reconstruct_value(key, lsn, converted),
|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"WALREDO",
deltas = %walredo_deltas,
)
},
)
.await;
(key, walredo_res)
}
});
}
let results = futs
.collect::<BTreeMap<Key, Result<Bytes, PageReconstructError>>>()
let results = execute_context
.maybe_instrument(
futs.collect::<BTreeMap<Key, Result<Bytes, PageReconstructError>>>(),
|crnt_perf_span| crnt_perf_span.clone(),
)
.await;
// For aux file keys (v1 or v2) the vectored read path does not return an error
@@ -3795,18 +3861,34 @@ impl Timeline {
return Err(GetVectoredError::Cancelled);
}
let plan_context = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"PLAN_IO_TIMELINE",
timeline = %timeline.timeline_id,
lsn = %cont_lsn,
)
})
.attached_child();
let TimelineVisitOutcome {
completed_keyspace: completed,
image_covered_keyspace,
} = Self::get_vectored_reconstruct_data_timeline(
timeline,
keyspace.clone(),
cont_lsn,
reconstruct_state,
&self.cancel,
ctx,
)
.await?;
} = plan_context
.maybe_instrument(
Self::get_vectored_reconstruct_data_timeline(
timeline,
keyspace.clone(),
cont_lsn,
reconstruct_state,
&self.cancel,
&plan_context,
),
|crnt_perf_span| crnt_perf_span.clone(),
)
.await?;
keyspace.remove_overlapping_with(&completed);
@@ -3850,8 +3932,26 @@ impl Timeline {
// Take the min to avoid reconstructing a page with data newer than request Lsn.
cont_lsn = std::cmp::min(Lsn(request_lsn.0 + 1), Lsn(timeline.ancestor_lsn.0 + 1));
timeline_owned = timeline
.get_ready_ancestor_timeline(ancestor_timeline, ctx)
let get_ancestor_context = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"GET_ANCESTOR",
timeline = %timeline.timeline_id,
lsn = %cont_lsn,
ancestor = %ancestor_timeline.timeline_id,
ancestor_lsn = %timeline.ancestor_lsn
)
})
.attached_child();
timeline_owned = get_ancestor_context
.maybe_instrument(
timeline.get_ready_ancestor_timeline(ancestor_timeline, &get_ancestor_context),
|crnt_perf_span| crnt_perf_span.clone(),
)
.await?;
timeline = &*timeline_owned;
};
@@ -7184,9 +7284,9 @@ mod tests {
eprintln!("Downloading {layer} and re-generating heatmap");
let ctx = &RequestContextBuilder::extend(&ctx)
let ctx = &RequestContextBuilder::from(&ctx)
.download_behavior(crate::context::DownloadBehavior::Download)
.build();
.attached_child();
let _resident = layer
.download_and_keep_resident(ctx)

View File

@@ -928,9 +928,9 @@ impl Timeline {
{
Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
let image_ctx = RequestContextBuilder::extend(ctx)
let image_ctx = RequestContextBuilder::from(ctx)
.access_stats_behavior(AccessStatsBehavior::Skip)
.build();
.attached_child();
let mut partitioning = dense_partitioning;
partitioning

View File

@@ -46,7 +46,8 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
.expect("this should be a valid filter directive"),
);
let otlp_layer = tracing_utils::init_tracing("proxy").await;
let otlp_layer =
tracing_utils::init_tracing("proxy", tracing_utils::ExportConfig::default()).await;
let json_log_layer = if logfmt == LogFormat::Json {
Some(JsonLoggingLayer::new(

View File

@@ -255,6 +255,7 @@ async fn main() -> anyhow::Result<()> {
logging::init(
LogFormat::from_config(&args.log_format)?,
logging::TracingErrorLayerEnablement::Disabled,
utils::logging::OtelEnablement::Disabled,
logging::Output::Stdout,
)?;
logging::replace_panic_hook_with_tracing_panic_hook().forget();

View File

@@ -643,6 +643,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
logging::init(
LogFormat::from_config(&args.log_format)?,
logging::TracingErrorLayerEnablement::Disabled,
utils::logging::OtelEnablement::Disabled,
logging::Output::Stdout,
)?;
logging::replace_panic_hook_with_tracing_panic_hook().forget();

View File

@@ -232,6 +232,7 @@ fn main() -> anyhow::Result<()> {
logging::init(
LogFormat::Plain,
logging::TracingErrorLayerEnablement::Disabled,
utils::logging::OtelEnablement::Disabled,
logging::Output::Stdout,
)?;

View File

@@ -376,6 +376,28 @@ class PageserverWalReceiverProtocol(StrEnum):
raise ValueError(f"Unknown protocol type: {proto}")
@dataclass
class PageserverTracingConfig:
sampling_ratio: tuple[int, int]
endpoint: str
protocol: str
timeout: str
def to_config_key_value(self) -> tuple[str, dict[str, Any]]:
value = {
"sampling-ratio": {
"numerator": self.sampling_ratio[0],
"denominator": self.sampling_ratio[1],
},
"export-config": {
"endpoint": self.endpoint,
"protocol": self.protocol,
"timeout": self.timeout,
},
}
return ("tracing", value)
class NeonEnvBuilder:
"""
Builder object to create a Neon runtime environment
@@ -425,6 +447,7 @@ class NeonEnvBuilder:
pageserver_virtual_file_io_mode: str | None = None,
pageserver_wal_receiver_protocol: PageserverWalReceiverProtocol | None = None,
pageserver_get_vectored_concurrent_io: str | None = None,
pageserver_tracing_config: PageserverTracingConfig | None = None,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -468,6 +491,8 @@ class NeonEnvBuilder:
pageserver_get_vectored_concurrent_io
)
self.pageserver_tracing_config = pageserver_tracing_config
self.pageserver_default_tenant_config_compaction_algorithm: dict[str, Any] | None = (
pageserver_default_tenant_config_compaction_algorithm
)
@@ -1113,6 +1138,7 @@ class NeonEnv:
self.pageserver_virtual_file_io_mode = config.pageserver_virtual_file_io_mode
self.pageserver_wal_receiver_protocol = config.pageserver_wal_receiver_protocol
self.pageserver_get_vectored_concurrent_io = config.pageserver_get_vectored_concurrent_io
self.pageserver_tracing_config = config.pageserver_tracing_config
# Create the neon_local's `NeonLocalInitConf`
cfg: dict[str, Any] = {
@@ -1225,6 +1251,14 @@ class NeonEnv:
if key not in ps_cfg:
ps_cfg[key] = value
if self.pageserver_tracing_config is not None:
key, value = self.pageserver_tracing_config.to_config_key_value()
if key not in ps_cfg:
ps_cfg[key] = value
ps_cfg[key] = value
# Create a corresponding NeonPageserver object
self.pageservers.append(
NeonPageserver(self, ps_id, port=pageserver_port, az_id=ps_cfg["availability_zone"])

View File

@@ -106,6 +106,7 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
".*delaying layer flush by \\S+ for compaction backpressure.*",
".*stalling layer flushes for compaction backpressure.*",
".*layer roll waiting for flush due to compaction backpressure.*",
".*BatchSpanProcessor.*",
)

View File

@@ -10,6 +10,7 @@ from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PageserverTracingConfig,
PgBin,
wait_for_last_flush_lsn,
)
@@ -113,6 +114,15 @@ def setup_and_run_pagebench_benchmark(
neon_env_builder.pageserver_config_override = (
f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}"
)
tracing_config = PageserverTracingConfig(
sampling_ratio=(0, 1000),
endpoint="http://localhost:4318/v1/traces",
protocol="http-binary",
timeout="10s",
)
neon_env_builder.pageserver_tracing_config = tracing_config
ratio = tracing_config.sampling_ratio[0] / tracing_config.sampling_ratio[1]
params.update(
{
"pageserver_config_override.page_cache_size": (
@@ -120,6 +130,7 @@ def setup_and_run_pagebench_benchmark(
{"unit": "byte"},
),
"pageserver_config_override.max_file_descriptors": (max_file_descriptors, {"unit": ""}),
"pageserver_config_override.sampling_ratio": (ratio, {"unit": ""}),
}
)