Compare commits

...

14 Commits

Author SHA1 Message Date
Heikki Linnakangas
4203cef955 PoC: pass opentelemetry trace id to the pageserver basebackup command
This gives cross-service tracing of the basebackup operation that's
part of compute start.
2025-03-11 15:30:35 +02:00
Vlad Lazar
2294d52fa7 tests: enable sampling with zero ration in tput tests 2025-03-10 19:21:25 +01:00
Vlad Lazar
2db10154eb pageserver: add get page perf tracing
Sampling is done in page service after reading the request from the
wire. A completely separate span hierarchy is used for perf tracing.
The spans live in the `RequestContext` and span relationships are
expressed via the APIs exposed by `RequestContext`.
2025-03-10 19:21:25 +01:00
Vlad Lazar
cd640108f4 pageserver: thread otel dispatch into connection req context 2025-03-10 19:21:25 +01:00
Vlad Lazar
e92794a692 pageserver: set up OTEL tracing infra when config says so 2025-03-10 19:21:24 +01:00
Vlad Lazar
17e2bff4d5 pageserver: add perf span utilities to request context 2025-03-10 19:20:12 +01:00
Vlad Lazar
135e89b34f tracing-utils: add perf span tracking utilities 2025-03-10 19:20:12 +01:00
Vlad Lazar
cadfe33c86 pageserver: add new runtime for performance spans 2025-03-10 19:20:12 +01:00
Vlad Lazar
ff9bb05f17 pageserver: add tracing configuration knobs 2025-03-10 19:20:12 +01:00
Vlad Lazar
e8a00a3464 pageserver: clean up request context api
This commit removes `RequestContextBuilder::extend`.
Instead, callers should create attached children via
`RequestContextBuilder` interface.
2025-03-10 19:20:12 +01:00
Vlad Lazar
91aa9aef45 review: separate setting up otel tracing infra 2025-03-10 19:19:25 +01:00
Vlad Lazar
1f3d655708 review: update doc comment 2025-03-10 19:18:48 +01:00
Vlad Lazar
cdb95d1a35 utils: optionally enable otel tracing in common logging utils
This patch augments `utils::logging::init` with the ability to set up
OTEL tracing infrastructure. The end goal is for the pageserver to use
this in order to export perf traces.

Note that an entirely different tracing subscriber is used. This is to
avoid interference with the existing tracing set-up.

For now, no service uses this functionality.
2025-03-10 14:56:21 +01:00
Vlad Lazar
09a9f6cf3c tracing-utils: allow for explicit export config 2025-03-10 14:56:21 +01:00
33 changed files with 1006 additions and 149 deletions

6
Cargo.lock generated
View File

@@ -4259,6 +4259,7 @@ dependencies = [
"num-traits",
"num_cpus",
"once_cell",
"opentelemetry",
"pageserver_api",
"pageserver_client",
"pageserver_compaction",
@@ -4301,6 +4302,8 @@ dependencies = [
"tokio-util",
"toml_edit",
"tracing",
"tracing-opentelemetry",
"tracing-utils",
"url",
"utils",
"uuid",
@@ -4338,6 +4341,7 @@ dependencies = [
"strum",
"strum_macros",
"thiserror 1.0.69",
"tracing-utils",
"utils",
]
@@ -7597,6 +7601,7 @@ dependencies = [
"opentelemetry-otlp",
"opentelemetry-semantic-conventions",
"opentelemetry_sdk",
"pin-project-lite",
"tokio",
"tracing",
"tracing-opentelemetry",
@@ -7837,6 +7842,7 @@ dependencies = [
"tracing",
"tracing-error",
"tracing-subscriber",
"tracing-utils",
"walkdir",
]

View File

@@ -855,6 +855,23 @@ impl ComputeNode {
info!("Storage auth token not set");
}
let mut traceparams = HashMap::new();
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry::trace::TraceContextExt;
use tracing_opentelemetry::OpenTelemetrySpanExt;
let cx: opentelemetry::Context = tracing::Span::current().context();
let span = cx.span();
let span_context = span.span_context();
info!("span cxt: is_valid: {} is_sampled: {} trace_id: {}",
span_context.is_valid(), span_context.is_sampled(), span_context.trace_id());
TraceContextPropagator::new().inject_context(&cx, &mut traceparams);
config.options(&serde_json::to_string(&traceparams).expect("failed to serialize otel trace params"));
info!("getting basebackup with config {:?}", config);
// Connect to pageserver
let mut client = config.connect(NoTls)?;
let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;

View File

@@ -24,7 +24,8 @@ pub async fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result
.with_writer(std::io::stderr);
// Initialize OpenTelemetry
let otlp_layer = tracing_utils::init_tracing("compute_ctl").await;
let otlp_layer =
tracing_utils::init_tracing("compute_ctl", tracing_utils::ExportConfig::default()).await;
// Put it all together
tracing_subscriber::registry()

View File

@@ -34,6 +34,7 @@ postgres_backend.workspace = true
nix = {workspace = true, optional = true}
reqwest.workspace = true
rand.workspace = true
tracing-utils.workspace = true
[dev-dependencies]
bincode.workspace = true

View File

@@ -127,6 +127,7 @@ pub struct ConfigToml {
pub load_previous_heatmap: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub generate_unarchival_heatmap: Option<bool>,
pub tracing: Option<Tracing>,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -184,6 +185,58 @@ pub enum GetVectoredConcurrentIo {
SidecarTask,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct Ratio {
pub numerator: usize,
pub denominator: usize,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct OtelExporterConfig {
pub endpoint: String,
pub protocol: OtelExporterProtocol,
#[serde(with = "humantime_serde")]
pub timeout: Duration,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum OtelExporterProtocol {
Grpc,
HttpBinary,
HttpJson,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(deny_unknown_fields)]
#[serde(rename_all = "kebab-case")]
pub struct Tracing {
pub sampling_ratio: Ratio,
pub export_config: OtelExporterConfig,
}
impl From<&OtelExporterConfig> for tracing_utils::ExportConfig {
fn from(val: &OtelExporterConfig) -> Self {
tracing_utils::ExportConfig {
endpoint: Some(val.endpoint.clone()),
protocol: val.protocol.into(),
timeout: val.timeout,
}
}
}
impl From<OtelExporterProtocol> for tracing_utils::Protocol {
fn from(val: OtelExporterProtocol) -> Self {
match val {
OtelExporterProtocol::Grpc => tracing_utils::Protocol::Grpc,
OtelExporterProtocol::HttpJson => tracing_utils::Protocol::HttpJson,
OtelExporterProtocol::HttpBinary => tracing_utils::Protocol::HttpBinary,
}
}
}
pub mod statvfs {
pub mod mock {
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -529,6 +582,7 @@ impl Default for ConfigToml {
validate_wal_contiguity: None,
load_previous_heatmap: None,
generate_unarchival_heatmap: None,
tracing: None,
}
}
}

View File

@@ -14,6 +14,7 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
tracing.workspace = true
tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
pin-project-lite.workspace = true
[dev-dependencies]
tracing-subscriber.workspace = true # For examples in docs

View File

@@ -21,7 +21,7 @@
//! .with_writer(std::io::stderr);
//!
//! // Initialize OpenTelemetry. Exports tracing spans as OpenTelemetry traces
//! let otlp_layer = tracing_utils::init_tracing("my_application").await;
//! let otlp_layer = tracing_utils::init_tracing("my_application", tracing_utils::ExportConfig::default()).await;
//!
//! // Put it all together
//! tracing_subscriber::registry()
@@ -31,13 +31,15 @@
//! .init();
//! }
//! ```
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
pub mod http;
pub mod perf_span;
use opentelemetry::KeyValue;
use opentelemetry::trace::TracerProvider;
use opentelemetry_otlp::WithExportConfig;
pub use opentelemetry_otlp::{ExportConfig, Protocol};
use tracing::Subscriber;
use tracing_subscriber::Layer;
use tracing_subscriber::registry::LookupSpan;
@@ -69,19 +71,28 @@ use tracing_subscriber::registry::LookupSpan;
///
/// This doesn't block, but is marked as 'async' to hint that this must be called in
/// asynchronous execution context.
pub async fn init_tracing<S>(service_name: &str) -> Option<impl Layer<S>>
pub async fn init_tracing<S>(
service_name: &str,
export_config: ExportConfig,
) -> Option<impl Layer<S>>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
if std::env::var("OTEL_SDK_DISABLED") == Ok("true".to_string()) {
return None;
};
Some(init_tracing_internal(service_name.to_string()))
Some(init_tracing_internal(
service_name.to_string(),
export_config,
))
}
/// Like `init_tracing`, but creates a separate tokio Runtime for the tracing
/// tasks.
pub fn init_tracing_without_runtime<S>(service_name: &str) -> Option<impl Layer<S>>
pub fn init_tracing_without_runtime<S>(
service_name: &str,
export_config: ExportConfig,
) -> Option<impl Layer<S>>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
@@ -112,16 +123,22 @@ where
));
let _guard = runtime.enter();
Some(init_tracing_internal(service_name.to_string()))
Some(init_tracing_internal(
service_name.to_string(),
export_config,
))
}
fn init_tracing_internal<S>(service_name: String) -> impl Layer<S>
fn init_tracing_internal<S>(service_name: String, export_config: ExportConfig) -> impl Layer<S>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
// Sets up exporter from the OTEL_EXPORTER_* environment variables.
// Sets up exporter from the provided [`ExportConfig`] parameter.
// If the endpoint is not specified, it is loaded from the
// OTEL_EXPORTER_OTLP_ENDPOINT environment variable.
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_http()
.with_export_config(export_config)
.build()
.expect("could not initialize opentelemetry exporter");

View File

@@ -0,0 +1,149 @@
//! Crutch module to work around tracing infrastructure deficiencies
//!
//! We wish to collect granular request spans without impacting performance
//! by much. Ideally, we should have zero overhead for a sampling rate of 0.
//!
//! The approach taken by the pageserver crate is to use a completely different
//! span hierarchy for the performance spans. Spans are explicitly stored in
//! the request context and use a different [`tracing::Subscriber`] in order
//! to avoid expensive filtering.
//!
//! [`tracing::Span`] instances record their [`tracing::Dispatch`] and, implcitly,
//! their [`tracing::Subscriber`] at creation time. However, upon exiting the span,
//! the global default [`tracing::Dispatch`] is used. This is problematic if one
//! wishes to juggle different subscribers.
//!
//! In order to work around this, this module provides a [`PerfSpan`] type which
//! wraps a [`Span`] and sets the default subscriber when exiting the span. This
//! achieves the correct routing.
//!
//! There's also a modified version of [`tracing::Instrument`] which works with
//! [`PerfSpan`].
use core::{
future::Future,
marker::Sized,
mem::ManuallyDrop,
pin::Pin,
task::{Context, Poll},
};
use pin_project_lite::pin_project;
use tracing::{Dispatch, field, span::Span};
#[derive(Debug, Clone)]
pub struct PerfSpan {
inner: ManuallyDrop<Span>,
dispatch: Dispatch,
}
#[must_use = "once a span has been entered, it should be exited"]
pub struct PerfSpanEntered<'a> {
span: &'a PerfSpan,
}
impl PerfSpan {
pub fn new(span: Span, dispatch: Dispatch) -> Self {
Self {
inner: ManuallyDrop::new(span),
dispatch,
}
}
pub fn record<Q: field::AsField + ?Sized, V: field::Value>(
&self,
field: &Q,
value: V,
) -> &Self {
self.inner.record(field, value);
self
}
pub fn enter(&self) -> PerfSpanEntered {
PerfSpanEntered { span: self }
}
pub fn inner(&self) -> &Span {
&self.inner
}
}
impl Drop for PerfSpan {
fn drop(&mut self) {
// Bring the desired dispatch into scope before explicitly calling
// the span destructor. This routes the span exit to the correct
// [`tracing::Subscriber`].
let _dispatch_guard = tracing::dispatcher::set_default(&self.dispatch);
// SAFETY: ManuallyDrop in Drop implementation
unsafe { ManuallyDrop::drop(&mut self.inner) }
}
}
impl Drop for PerfSpanEntered<'_> {
fn drop(&mut self) {
assert!(self.span.inner.id().is_some());
let _dispatch_guard = tracing::dispatcher::set_default(&self.span.dispatch);
self.span.dispatch.exit(&self.span.inner.id().unwrap());
}
}
pub trait PerfInstrument: Sized {
fn instrument(self, span: PerfSpan) -> PerfInstrumented<Self> {
PerfInstrumented {
inner: ManuallyDrop::new(self),
span,
}
}
}
pin_project! {
#[project = PerfInstrumentedProj]
#[derive(Debug, Clone)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct PerfInstrumented<T> {
// `ManuallyDrop` is used here to to enter instrument `Drop` by entering
// `Span` and executing `ManuallyDrop::drop`.
#[pin]
inner: ManuallyDrop<T>,
span: PerfSpan,
}
impl<T> PinnedDrop for PerfInstrumented<T> {
fn drop(this: Pin<&mut Self>) {
let this = this.project();
let _enter = this.span.enter();
// SAFETY: 1. `Pin::get_unchecked_mut()` is safe, because this isn't
// different from wrapping `T` in `Option` and calling
// `Pin::set(&mut this.inner, None)`, except avoiding
// additional memory overhead.
// 2. `ManuallyDrop::drop()` is safe, because
// `PinnedDrop::drop()` is guaranteed to be called only
// once.
unsafe { ManuallyDrop::drop(this.inner.get_unchecked_mut()) }
}
}
}
impl<'a, T> PerfInstrumentedProj<'a, T> {
/// Get a mutable reference to the [`Span`] a pinned mutable reference to
/// the wrapped type.
fn span_and_inner_pin_mut(self) -> (&'a mut PerfSpan, Pin<&'a mut T>) {
// SAFETY: As long as `ManuallyDrop<T>` does not move, `T` won't move
// and `inner` is valid, because `ManuallyDrop::drop` is called
// only inside `Drop` of the `Instrumented`.
let inner = unsafe { self.inner.map_unchecked_mut(|v| &mut **v) };
(self.span, inner)
}
}
impl<T: Future> Future for PerfInstrumented<T> {
type Output = T::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let (span, inner) = self.project().span_and_inner_pin_mut();
let _enter = span.enter();
inner.poll(cx)
}
}
impl<T: Sized> PerfInstrument for T {}

View File

@@ -42,6 +42,7 @@ toml_edit = { workspace = true, features = ["serde"] }
tracing.workspace = true
tracing-error.workspace = true
tracing-subscriber = { workspace = true, features = ["json", "registry"] }
tracing-utils.workspace = true
rand.workspace = true
scopeguard.workspace = true
strum.workspace = true

View File

@@ -7,7 +7,11 @@ use metrics::{IntCounter, IntCounterVec};
use once_cell::sync::Lazy;
use strum_macros::{EnumString, VariantNames};
use tokio::time::Instant;
use tracing::Dispatch;
use tracing::info;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::Layer;
use tracing_subscriber::layer::SubscriberExt;
/// Logs a critical error, similarly to `tracing::error!`. This will:
///
@@ -125,6 +129,15 @@ pub enum TracingErrorLayerEnablement {
EnableWithRustLogFilter,
}
pub enum OtelEnablement {
Disabled,
Enabled {
service_name: String,
export_config: tracing_utils::ExportConfig,
runtime: &'static tokio::runtime::Runtime,
},
}
/// Where the logging should output to.
#[derive(Clone, Copy)]
pub enum Output {
@@ -132,6 +145,16 @@ pub enum Output {
Stderr,
}
pub struct OtelGuard {
pub dispatch: Dispatch,
}
impl Drop for OtelGuard {
fn drop(&mut self) {
tracing_utils::shutdown_tracing();
}
}
pub fn init(
log_format: LogFormat,
tracing_error_layer_enablement: TracingErrorLayerEnablement,
@@ -144,6 +167,9 @@ pub fn init(
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
};
// Initialize OpenTelemetry
let otlp_layer =
tracing_utils::init_tracing_without_runtime("pageserver-x", tracing_utils::ExportConfig::default());
// NB: the order of the with() calls does not matter.
// See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
use tracing_subscriber::prelude::*;
@@ -165,6 +191,8 @@ pub fn init(
};
log_layer.with_filter(rust_log_env_filter())
});
let r = r.with(otlp_layer);
let r = r.with(
TracingEventCountLayer(&TRACING_EVENT_COUNT_METRIC).with_filter(rust_log_env_filter()),
);
@@ -178,6 +206,29 @@ pub fn init(
Ok(())
}
pub fn init_otel_tracing(otel_enablement: OtelEnablement) -> anyhow::Result<Option<OtelGuard>> {
let otel_subscriber = match otel_enablement {
OtelEnablement::Disabled => None,
OtelEnablement::Enabled {
service_name,
export_config,
runtime,
} => {
let otel_layer = runtime
.block_on(tracing_utils::init_tracing(&service_name, export_config))
.with_filter(LevelFilter::INFO);
let otel_subscriber = tracing_subscriber::registry().with(otel_layer);
let otel_dispatch = Dispatch::new(otel_subscriber);
Some(otel_dispatch)
}
};
let otel_guard = otel_subscriber.map(|dispatch| OtelGuard { dispatch });
Ok(otel_guard)
}
/// Disable the default rust panic hook by using `set_hook`.
///
/// For neon binaries, the assumption is that tracing is configured before with [`init`], after

View File

@@ -39,6 +39,7 @@ nix.workspace = true
num_cpus.workspace = true
num-traits.workspace = true
once_cell.workspace = true
opentelemetry.workspace = true
pin-project-lite.workspace = true
postgres_backend.workspace = true
postgres-protocol.workspace = true
@@ -66,6 +67,8 @@ tokio-stream.workspace = true
tokio-util.workspace = true
toml_edit = { workspace = true, features = [ "serde" ] }
tracing.workspace = true
tracing-opentelemetry.workspace = true
tracing-utils.workspace = true
url.workspace = true
walkdir.workspace = true
metrics.workspace = true

View File

@@ -12,7 +12,7 @@ pub(crate) fn setup_logging() {
logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
logging::Output::Stdout,
)
.expect("Failed to init test logging")
.expect("Failed to init test logging");
});
}

View File

@@ -21,7 +21,8 @@ use pageserver::deletion_queue::DeletionQueue;
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::task_mgr::{
BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME, WALRECEIVER_RUNTIME,
BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME, OTEL_RUNTIME,
WALRECEIVER_RUNTIME,
};
use pageserver::tenant::{TenantSharedResources, mgr, secondary};
use pageserver::{
@@ -36,7 +37,7 @@ use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::auth::{JwtAuth, SwappableJwtAuth};
use utils::crashsafe::syncfs;
use utils::logging::TracingErrorLayerEnablement;
use utils::logging::{OtelGuard, TracingErrorLayerEnablement};
use utils::sentry_init::init_sentry;
use utils::{failpoint_support, logging, project_build_tag, project_git_version, tcp_listener};
@@ -110,12 +111,28 @@ fn main() -> anyhow::Result<()> {
} else {
TracingErrorLayerEnablement::Disabled
};
logging::init(
conf.log_format,
tracing_error_layer_enablement,
logging::Output::Stdout,
)?;
let otel_enablement = match &conf.tracing {
Some(cfg) => utils::logging::OtelEnablement::Enabled {
service_name: "pageserver".to_string(),
export_config: (&cfg.export_config).into(),
runtime: *OTEL_RUNTIME,
},
None => utils::logging::OtelEnablement::Disabled,
};
let otel_guard = logging::init_otel_tracing(otel_enablement)?;
if otel_guard.is_some() {
info!(?conf.tracing, "starting with OTEL tracing enabled");
}
// mind the order required here: 1. logging, 2. panic_hook, 3. sentry.
// disarming this hook on pageserver, because we never tear down tracing.
logging::replace_panic_hook_with_tracing_panic_hook().forget();
@@ -189,7 +206,7 @@ fn main() -> anyhow::Result<()> {
tracing::info!("Initializing page_cache...");
page_cache::init(conf.page_cache_size);
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;
start_pageserver(launch_ts, conf, otel_guard).context("Failed to start pageserver")?;
scenario.teardown();
Ok(())
@@ -287,6 +304,7 @@ fn startup_checkpoint(started_at: Instant, phase: &str, human_phase: &str) {
fn start_pageserver(
launch_ts: &'static LaunchTimestamp,
conf: &'static PageServerConf,
otel_guard: Option<OtelGuard>,
) -> anyhow::Result<()> {
// Monotonic time for later calculating startup duration
let started_startup_at = Instant::now();
@@ -634,13 +652,21 @@ fn start_pageserver(
// Spawn a task to listen for libpq connections. It will spawn further tasks
// for each connection. We created the listener earlier already.
let page_service = page_service::spawn(conf, tenant_manager.clone(), pg_auth, {
let _entered = COMPUTE_REQUEST_RUNTIME.enter(); // TcpListener::from_std requires it
pageserver_listener
.set_nonblocking(true)
.context("set listener to nonblocking")?;
tokio::net::TcpListener::from_std(pageserver_listener).context("create tokio listener")?
});
let perf_trace_dispatch = otel_guard.as_ref().map(|g| g.dispatch.clone());
let page_service = page_service::spawn(
conf,
tenant_manager.clone(),
pg_auth,
perf_trace_dispatch,
{
let _entered = COMPUTE_REQUEST_RUNTIME.enter(); // TcpListener::from_std requires it
pageserver_listener
.set_nonblocking(true)
.context("set listener to nonblocking")?;
tokio::net::TcpListener::from_std(pageserver_listener)
.context("create tokio listener")?
},
);
// All started up! Now just sit and wait for shutdown signal.
BACKGROUND_RUNTIME.block_on(async move {

View File

@@ -201,6 +201,8 @@ pub struct PageServerConf {
/// When set, include visible layers in the next uploaded heatmaps of an unarchived timeline.
pub generate_unarchival_heatmap: bool,
pub tracing: Option<pageserver_api::config::Tracing>,
}
/// Token for authentication to safekeepers
@@ -367,6 +369,7 @@ impl PageServerConf {
validate_wal_contiguity,
load_previous_heatmap,
generate_unarchival_heatmap,
tracing,
} = config_toml;
let mut conf = PageServerConf {
@@ -412,6 +415,7 @@ impl PageServerConf {
wal_receiver_protocol,
page_service_pipelining,
get_vectored_concurrent_io,
tracing,
// ------------------------------------------------------------
// fields that require additional validation or custom handling
@@ -476,6 +480,17 @@ impl PageServerConf {
);
}
if let Some(tracing_config) = conf.tracing.as_ref() {
let ratio = &tracing_config.sampling_ratio;
ensure!(
ratio.denominator != 0 && ratio.denominator >= ratio.numerator,
format!(
"Invalid sampling ratio: {}/{}",
ratio.numerator, ratio.denominator
)
);
}
IndexEntry::validate_checkpoint_distance(conf.default_tenant_conf.checkpoint_distance)
.map_err(anyhow::Error::msg)
.with_context(|| {

View File

@@ -89,16 +89,38 @@
//! [`RequestContext`] argument. Functions in the middle of the call chain
//! only need to pass it on.
use futures::FutureExt;
use futures::future::BoxFuture;
use std::future::Future;
use tracing_utils::perf_span::{PerfInstrument, PerfSpan};
use tracing::{Dispatch, Span};
use crate::task_mgr::TaskKind;
// The main structure of this module, see module-level comment.
#[derive(Debug)]
#[derive(Clone)]
pub struct RequestContext {
task_kind: TaskKind,
download_behavior: DownloadBehavior,
access_stats_behavior: AccessStatsBehavior,
page_content_kind: PageContentKind,
read_path_debug: bool,
perf_span: Option<PerfSpan>,
perf_span_dispatch: Option<Dispatch>,
}
impl std::fmt::Debug for RequestContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RequestContext")
.field("task_kind", &self.task_kind)
.field("download_behavior", &self.download_behavior)
.field("access_stats_behavior", &self.access_stats_behavior)
.field("page_content_kind", &self.page_content_kind)
.field("read_path_debug", &self.read_path_debug)
// perf_span and perf_span_dispatch are omitted on purpose
.finish()
}
}
/// The kind of access to the page cache.
@@ -157,24 +179,23 @@ impl RequestContextBuilder {
access_stats_behavior: AccessStatsBehavior::Update,
page_content_kind: PageContentKind::Unknown,
read_path_debug: false,
perf_span: None,
perf_span_dispatch: None,
},
}
}
pub fn extend(original: &RequestContext) -> Self {
pub fn from(original: &RequestContext) -> Self {
Self {
// This is like a Copy, but avoid implementing Copy because ordinary users of
// RequestContext should always move or ref it.
inner: RequestContext {
task_kind: original.task_kind,
download_behavior: original.download_behavior,
access_stats_behavior: original.access_stats_behavior,
page_content_kind: original.page_content_kind,
read_path_debug: original.read_path_debug,
},
inner: original.clone(),
}
}
pub fn task_kind(mut self, b: TaskKind) -> Self {
self.inner.task_kind = b;
self
}
/// Configure the DownloadBehavior of the context: whether to
/// download missing layers, and/or warn on the download.
pub fn download_behavior(mut self, b: DownloadBehavior) -> Self {
@@ -199,7 +220,52 @@ impl RequestContextBuilder {
self
}
pub fn build(self) -> RequestContext {
pub(crate) fn perf_span_dispatch(mut self, dispatch: Option<Dispatch>) -> Self {
self.inner.perf_span_dispatch = dispatch;
self
}
pub fn root_perf_span<Fn>(mut self, make_span: Fn) -> Self
where
Fn: FnOnce() -> Span,
{
assert!(self.inner.perf_span.is_none());
assert!(self.inner.perf_span_dispatch.is_some());
let dispatcher = self.inner.perf_span_dispatch.as_ref().unwrap();
let new_span = tracing::dispatcher::with_default(dispatcher, make_span);
self.inner.perf_span = Some(PerfSpan::new(new_span, dispatcher.clone()));
self
}
pub fn perf_span<Fn>(mut self, make_span: Fn) -> Self
where
Fn: FnOnce(&Span) -> Span,
{
if let Some(ref perf_span) = self.inner.perf_span {
assert!(self.inner.perf_span_dispatch.is_some());
let dispatcher = self.inner.perf_span_dispatch.as_ref().unwrap();
let new_span =
tracing::dispatcher::with_default(dispatcher, || make_span(perf_span.inner()));
self.inner.perf_span = Some(PerfSpan::new(new_span, dispatcher.clone()));
}
self
}
pub fn root(self) -> RequestContext {
self.inner
}
pub fn attached_child(self) -> RequestContext {
self.inner
}
pub fn detached_child(self) -> RequestContext {
self.inner
}
}
@@ -220,7 +286,7 @@ impl RequestContext {
pub fn new(task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
RequestContextBuilder::new(task_kind)
.download_behavior(download_behavior)
.build()
.root()
}
/// Create a detached child context for a task that may outlive `self`.
@@ -241,7 +307,10 @@ impl RequestContext {
///
/// We could make new calls to this function fail if `self` is already canceled.
pub fn detached_child(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
self.child_impl(task_kind, download_behavior)
RequestContextBuilder::from(self)
.task_kind(task_kind)
.download_behavior(download_behavior)
.detached_child()
}
/// Create a child of context `self` for a task that shall not outlive `self`.
@@ -265,7 +334,7 @@ impl RequestContext {
/// The method to wait for child tasks would return an error, indicating
/// that the child task was not started because the context was canceled.
pub fn attached_child(&self) -> Self {
self.child_impl(self.task_kind(), self.download_behavior())
RequestContextBuilder::from(self).attached_child()
}
/// Use this function when you should be creating a child context using
@@ -280,10 +349,6 @@ impl RequestContext {
Self::new(task_kind, download_behavior)
}
fn child_impl(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
Self::new(task_kind, download_behavior)
}
pub fn task_kind(&self) -> TaskKind {
self.task_kind
}
@@ -303,4 +368,51 @@ impl RequestContext {
pub(crate) fn read_path_debug(&self) -> bool {
self.read_path_debug
}
pub(crate) fn perf_follows_from(&self, from: &RequestContext) {
if let (Some(span), Some(from_span)) = (&self.perf_span, &from.perf_span) {
span.inner().follows_from(from_span.inner());
}
}
pub(crate) fn maybe_instrument<'a, Fut, Fn>(
&self,
future: Fut,
make_span: Fn,
) -> BoxFuture<'a, Fut::Output>
where
Fut: Future + Send + 'a,
Fn: FnOnce(&Span) -> Span,
{
match &self.perf_span {
Some(perf_span) => {
assert!(self.perf_span_dispatch.is_some());
let dispatcher = self.perf_span_dispatch.as_ref().unwrap();
let new_span =
tracing::dispatcher::with_default(dispatcher, || make_span(perf_span.inner()));
let new_perf_span = PerfSpan::new(new_span, dispatcher.clone());
future.instrument(new_perf_span).boxed()
}
None => future.boxed(),
}
}
pub(crate) fn perf_span_record<
Q: tracing::field::AsField + ?Sized,
V: tracing::field::Value,
>(
&self,
field: &Q,
value: V,
) {
if let Some(span) = &self.perf_span {
span.record(field, value);
}
}
pub(crate) fn has_perf_span(&self) -> bool {
self.perf_span.is_some()
}
}

View File

@@ -2650,9 +2650,10 @@ async fn getpage_at_lsn_handler_inner(
let lsn: Option<Lsn> = parse_query_param(&request, "lsn")?;
async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
// Enable read path debugging
let ctx = RequestContextBuilder::extend(&ctx).read_path_debug(true).build();
let ctx = RequestContextBuilder::new(TaskKind::MgmtRequest)
.download_behavior(DownloadBehavior::Download)
.read_path_debug(true)
.root();
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
// Use last_record_lsn if no lsn is provided

View File

@@ -55,6 +55,9 @@ pub const DEFAULT_PG_VERSION: u32 = 16;
pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
// Target used for performance traces.
pub const PERF_TRACE_TARGET: &str = "P";
static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
pub use crate::metrics::preinitialize_metrics;

View File

@@ -2,6 +2,7 @@
//! requests.
use std::borrow::Cow;
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::os::fd::AsRawFd;
use std::str::FromStr;
@@ -9,6 +10,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use std::{io, str};
use crate::PERF_TRACE_TARGET;
use anyhow::{Context, bail};
use async_compression::tokio::write::GzipEncoder;
use bytes::Buf;
@@ -17,7 +19,7 @@ use itertools::Itertools;
use once_cell::sync::OnceCell;
use pageserver_api::config::{
PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
PageServiceProtocolPipelinedExecutionStrategy,
PageServiceProtocolPipelinedExecutionStrategy, Tracing,
};
use pageserver_api::key::rel_block_to_key;
use pageserver_api::models::{
@@ -36,6 +38,7 @@ use postgres_ffi::BLCKSZ;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use pq_proto::framed::ConnectionError;
use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor};
use rand::Rng;
use strum_macros::IntoStaticStr;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter};
use tokio::task::JoinHandle;
@@ -53,7 +56,7 @@ use utils::sync::spsc_fold;
use crate::auth::check_permission;
use crate::basebackup::BasebackupError;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
use crate::metrics::{
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, LIVE_CONNECTIONS, SmgrOpTimer,
};
@@ -99,6 +102,7 @@ pub fn spawn(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
pg_auth: Option<Arc<SwappableJwtAuth>>,
perf_trace_dispatch: Option<Dispatch>,
tcp_listener: tokio::net::TcpListener,
) -> Listener {
let cancel = CancellationToken::new();
@@ -116,6 +120,7 @@ pub fn spawn(
conf,
tenant_manager,
pg_auth,
perf_trace_dispatch,
tcp_listener,
conf.pg_auth_type,
conf.page_service_pipelining.clone(),
@@ -172,6 +177,7 @@ pub async fn libpq_listener_main(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
perf_trace_dispatch: Option<Dispatch>,
listener: tokio::net::TcpListener,
auth_type: AuthType,
pipelining_config: PageServicePipeliningConfig,
@@ -204,8 +210,12 @@ pub async fn libpq_listener_main(
// Connection established. Spawn a new task to handle it.
debug!("accepted connection from {}", peer_addr);
let local_auth = auth.clone();
let connection_ctx = listener_ctx
.detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
let connection_ctx = RequestContextBuilder::from(&listener_ctx)
.task_kind(TaskKind::PageRequestHandler)
.download_behavior(DownloadBehavior::Download)
.perf_span_dispatch(perf_trace_dispatch.clone())
.detached_child();
connection_handler_tasks.spawn(page_service_conn_main(
conf,
tenant_manager.clone(),
@@ -600,6 +610,7 @@ impl std::fmt::Display for BatchedPageStreamError {
struct BatchedGetPageRequest {
req: PagestreamGetPageRequest,
timer: SmgrOpTimer,
ctx: RequestContext,
}
#[cfg(feature = "testing")]
@@ -736,6 +747,7 @@ impl PageServerHandler {
tenant_id: TenantId,
timeline_id: TimelineId,
timeline_handles: &mut TimelineHandles,
tracing_config: Option<&Tracing>,
cancel: &CancellationToken,
ctx: &RequestContext,
protocol_version: PagestreamProtocolVersion,
@@ -895,10 +907,55 @@ impl PageServerHandler {
}
let key = rel_block_to_key(req.rel, req.blkno);
let shard = match timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Page(key))
.await
{
let sampled = match tracing_config {
Some(conf) => {
let ratio = &conf.sampling_ratio;
if ratio.numerator == 0 {
false
} else {
rand::thread_rng().gen_range(0..ratio.denominator) < ratio.numerator
}
}
None => false,
};
let get_page_context = if sampled {
RequestContextBuilder::from(ctx)
.root_perf_span(|| {
info_span!(
target: PERF_TRACE_TARGET,
"GET_PAGE",
tenant_id = %tenant_id,
timeline_id = %timeline_id,
lsn = %req.hdr.request_lsn,
request_id = %req.hdr.reqid,
key = %key)
})
.attached_child()
} else {
ctx.attached_child()
};
let res = get_page_context
.maybe_instrument(
timeline_handles.get(tenant_id, timeline_id, ShardSelector::Page(key)),
|current_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: current_perf_span,
"SHARD_SELECTION",
tenant_id = %tenant_id,
timeline_id = %timeline_id,
lsn = %req.hdr.request_lsn,
request_id = %req.hdr.reqid
)
},
)
.await;
let shard = match res {
Ok(tl) => tl,
Err(e) => {
let span = mkspan!(before shard routing);
@@ -927,24 +984,59 @@ impl PageServerHandler {
};
let span = mkspan!(shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetPageAtLsn,
received_at,
)
.await?;
// TODO(vlad): why does this not show up?
get_page_context.perf_span_record(
"shard",
tracing::field::display(shard.get_shard_identity().shard_slug()),
);
let timer = get_page_context
.maybe_instrument(
record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetPageAtLsn,
received_at,
),
|current_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: current_perf_span,
"THROTTLE",
tenant_id = %tenant_id,
timeline_id = %timeline_id,
lsn = %req.hdr.request_lsn,
request_id = %req.hdr.reqid
)
},
)
.await?;
// We're holding the Handle
let effective_request_lsn = match Self::wait_or_get_last_lsn(
&shard,
req.hdr.request_lsn,
req.hdr.not_modified_since,
&shard.get_applied_gc_cutoff_lsn(),
ctx,
)
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
.await
{
let res = get_page_context
.maybe_instrument(
Self::wait_or_get_last_lsn(
&shard,
req.hdr.request_lsn,
req.hdr.not_modified_since,
&shard.get_applied_gc_cutoff_lsn(),
ctx,
),
|current_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: current_perf_span,
"WAIT_LSN",
tenant_id = %tenant_id,
timeline_id = %timeline_id,
lsn = %req.hdr.request_lsn,
request_id = %req.hdr.reqid
)
},
)
.await;
let effective_request_lsn = match res {
Ok(lsn) => lsn,
Err(e) => {
return respond_error!(span, e);
@@ -954,7 +1046,11 @@ impl PageServerHandler {
span,
shard: shard.downgrade(),
effective_request_lsn,
pages: smallvec::smallvec![BatchedGetPageRequest { req, timer }],
pages: smallvec::smallvec![BatchedGetPageRequest {
req,
timer,
ctx: get_page_context
}],
}
}
#[cfg(feature = "testing")]
@@ -1486,12 +1582,15 @@ impl PageServerHandler {
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
{
let cancel = self.cancel.clone();
let tracing_config = self.conf.tracing.clone();
let err = loop {
let msg = Self::pagestream_read_message(
&mut pgb_reader,
tenant_id,
timeline_id,
&mut timeline_handles,
tracing_config.as_ref(),
&cancel,
ctx,
protocol_version,
@@ -1625,6 +1724,8 @@ impl PageServerHandler {
// Batcher
//
let tracing_config = self.conf.tracing.clone();
let cancel_batcher = self.cancel.child_token();
let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| {
@@ -1638,6 +1739,7 @@ impl PageServerHandler {
tenant_id,
timeline_id,
&mut timeline_handles,
tracing_config.as_ref(),
&cancel_batcher,
&ctx,
protocol_version,
@@ -1976,7 +2078,9 @@ impl PageServerHandler {
let results = timeline
.get_rel_page_at_lsn_batched(
requests.iter().map(|p| (&p.req.rel, &p.req.blkno)),
requests
.iter()
.map(|p| (&p.req.rel, &p.req.blkno, p.ctx.attached_child())),
effective_lsn,
io_concurrency,
ctx,
@@ -2527,6 +2631,26 @@ where
if let Some(app_name) = params.get("application_name") {
Span::current().record("application_name", field::display(app_name));
}
if let Some(options) = params.get("options") {
info!("got options: {options:?}");
match serde_json::from_str::<HashMap<String, String>>(&options) {
Ok(traceoptions) => {
if !traceoptions.is_empty() {
use opentelemetry;
use tracing_opentelemetry::OpenTelemetrySpanExt;
let parent_ctx =
opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&traceoptions));
Span::current().set_parent(parent_ctx);
info!("installed span parent");
}
}
Err(_) => {
error!("could not deserialize 'options', ignoring");
}
}
}
};
Ok(())

View File

@@ -9,6 +9,7 @@
use std::collections::{BTreeMap, HashMap, HashSet, hash_map};
use std::ops::{ControlFlow, Range};
use crate::PERF_TRACE_TARGET;
use anyhow::{Context, ensure};
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
@@ -31,7 +32,7 @@ use postgres_ffi::{BLCKSZ, Oid, RepOriginId, TimestampTz, TransactionId};
use serde::{Deserialize, Serialize};
use strum::IntoEnumIterator;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, trace, warn};
use tracing::{debug, info, info_span, trace, warn};
use utils::bin_ser::{BeSer, DeserializeError};
use utils::lsn::Lsn;
use utils::pausable_failpoint;
@@ -39,7 +40,7 @@ use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
use super::tenant::{PageReconstructError, Timeline};
use crate::aux_file;
use crate::context::RequestContext;
use crate::context::{RequestContext, RequestContextBuilder};
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::metrics::{
RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
@@ -209,7 +210,9 @@ impl Timeline {
let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
let res = self
.get_rel_page_at_lsn_batched(
pages.iter().map(|(tag, blknum)| (tag, blknum)),
pages
.iter()
.map(|(tag, blknum)| (tag, blknum, ctx.attached_child())),
effective_lsn,
io_concurrency.clone(),
ctx,
@@ -248,7 +251,7 @@ impl Timeline {
/// The ordering of the returned vec corresponds to the ordering of `pages`.
pub(crate) async fn get_rel_page_at_lsn_batched(
&self,
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber)>,
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, RequestContext)>,
effective_lsn: Lsn,
io_concurrency: IoConcurrency,
ctx: &RequestContext,
@@ -262,8 +265,11 @@ impl Timeline {
let mut result = Vec::with_capacity(pages.len());
let result_slots = result.spare_capacity_mut();
let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[usize; 1]>> = BTreeMap::default();
for (response_slot_idx, (tag, blknum)) in pages.enumerate() {
let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
BTreeMap::default();
let mut perf_instrument = false;
for (response_slot_idx, (tag, blknum, req_ctx)) in pages.enumerate() {
if tag.relnode == 0 {
result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
@@ -273,6 +279,7 @@ impl Timeline {
continue;
}
// TODO: perf span
let nblocks = match self
.get_rel_size(*tag, Version::Lsn(effective_lsn), ctx)
.await
@@ -297,8 +304,12 @@ impl Timeline {
let key = rel_block_to_key(*tag, *blknum);
if req_ctx.has_perf_span() {
perf_instrument = true;
}
let key_slots = keys_slots.entry(key).or_default();
key_slots.push(response_slot_idx);
key_slots.push((response_slot_idx, req_ctx));
}
let keyspace = {
@@ -314,16 +325,36 @@ impl Timeline {
acc.to_keyspace()
};
match self
.get_vectored(keyspace, effective_lsn, io_concurrency, ctx)
.await
{
let get_vectored_ctx = match perf_instrument {
true => RequestContextBuilder::from(ctx)
.root_perf_span(|| {
info_span!(
target: PERF_TRACE_TARGET,
"GET_VECTORED",
tenant_id = %self.tenant_shard_id.tenant_id,
timeline_id = %self.timeline_id,
lsn = %effective_lsn,
shard = %self.tenant_shard_id.shard_slug(),
)
})
.attached_child(),
false => ctx.attached_child(),
};
let res = get_vectored_ctx
.maybe_instrument(
self.get_vectored(keyspace, effective_lsn, io_concurrency, &get_vectored_ctx),
|current_perf_span| current_perf_span.clone(),
)
.await;
match res {
Ok(results) => {
for (key, res) in results {
let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
let first_slot = key_slots.next().unwrap();
let (first_slot, first_req_ctx) = key_slots.next().unwrap();
for slot in key_slots {
for (slot, req_ctx) in key_slots {
let clone = match &res {
Ok(buf) => Ok(buf.clone()),
Err(err) => Err(match err {
@@ -341,17 +372,19 @@ impl Timeline {
};
result_slots[slot].write(clone);
req_ctx.perf_follows_from(&get_vectored_ctx);
slots_filled += 1;
}
result_slots[first_slot].write(res);
first_req_ctx.perf_follows_from(&get_vectored_ctx);
slots_filled += 1;
}
}
Err(err) => {
// this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
// (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
for slot in keys_slots.values().flatten() {
for (slot, req_ctx) in keys_slots.values().flatten() {
// this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
// but without taking ownership of the GetVectoredError
let err = match &err {
@@ -383,6 +416,7 @@ impl Timeline {
}
};
req_ctx.perf_follows_from(&get_vectored_ctx);
result_slots[*slot].write(err);
}

View File

@@ -217,9 +217,10 @@ pageserver_runtime!(COMPUTE_REQUEST_RUNTIME, "compute request worker");
pageserver_runtime!(MGMT_REQUEST_RUNTIME, "mgmt request worker");
pageserver_runtime!(WALRECEIVER_RUNTIME, "walreceiver worker");
pageserver_runtime!(BACKGROUND_RUNTIME, "background op worker");
pageserver_runtime!(OTEL_RUNTIME, "open telemetry worker");
// Bump this number when adding a new pageserver_runtime!
// SAFETY: it's obviously correct
const NUM_MULTIPLE_RUNTIMES: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(4) };
const NUM_MULTIPLE_RUNTIMES: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(5) };
#[derive(Debug, Clone, Copy)]
pub struct PageserverTaskId(u64);

View File

@@ -5720,7 +5720,7 @@ pub(crate) mod harness {
logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
logging::Output::Stdout,
)
.expect("Failed to init test logging")
.expect("Failed to init test logging");
});
}

View File

@@ -13,13 +13,13 @@ pub mod merge_iterator;
use std::cmp::Ordering;
use std::collections::hash_map::Entry;
use std::collections::{BinaryHeap, HashMap};
use std::future::Future;
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use crate::PERF_TRACE_TARGET;
pub use batch_split_writer::{BatchLayerWriter, SplitDeltaLayerWriter, SplitImageLayerWriter};
use bytes::Bytes;
pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef};
@@ -34,7 +34,7 @@ use pageserver_api::key::Key;
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
use pageserver_api::record::NeonWalRecord;
use pageserver_api::value::Value;
use tracing::{Instrument, trace};
use tracing::{Instrument, info_span, trace};
use utils::lsn::Lsn;
use utils::sync::gate::GateGuard;
@@ -43,7 +43,7 @@ use super::PageReconstructError;
use super::layer_map::InMemoryLayerDesc;
use super::timeline::{GetVectoredError, ReadPath};
use crate::config::PageServerConf;
use crate::context::{AccessStatsBehavior, RequestContext};
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
where
@@ -874,13 +874,51 @@ impl ReadableLayer {
) -> Result<(), GetVectoredError> {
match self {
ReadableLayer::PersistentLayer(layer) => {
layer
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
let persistent_context = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"PLAN_LAYER",
layer = %layer
)
})
.attached_child();
persistent_context
.maybe_instrument(
layer.get_values_reconstruct_data(
keyspace,
lsn_range,
reconstruct_state,
&persistent_context,
),
|crnt_perf_span| crnt_perf_span.clone(),
)
.await
}
ReadableLayer::InMemoryLayer(layer) => {
layer
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
let in_mem_context = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"PLAN_LAYER",
layer = %layer
)
})
.attached_child();
in_mem_context
.maybe_instrument(
layer.get_values_reconstruct_data(
keyspace,
lsn_range,
reconstruct_state,
&in_mem_context,
),
|crnt_perf_span| crnt_perf_span.clone(),
)
.await
}
}

View File

@@ -896,9 +896,9 @@ impl DeltaLayerInner {
where
Reader: BlockReader + Clone,
{
let ctx = RequestContextBuilder::extend(ctx)
let ctx = RequestContextBuilder::from(ctx)
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
.build();
.attached_child();
for range in keyspace.ranges.iter() {
let mut range_end_handled = false;
@@ -1105,9 +1105,9 @@ impl DeltaLayerInner {
all_keys.push(entry);
true
},
&RequestContextBuilder::extend(ctx)
&RequestContextBuilder::from(ctx)
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
.build(),
.attached_child(),
)
.await?;
if let Some(last) = all_keys.last_mut() {

View File

@@ -481,9 +481,9 @@ impl ImageLayerInner {
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
let ctx = RequestContextBuilder::extend(ctx)
let ctx = RequestContextBuilder::from(ctx)
.page_content_kind(PageContentKind::ImageLayerBtreeNode)
.build();
.attached_child();
for range in keyspace.ranges.iter() {
let mut range_end_handled = false;

View File

@@ -420,9 +420,9 @@ impl InMemoryLayer {
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let ctx = RequestContextBuilder::extend(ctx)
let ctx = RequestContextBuilder::from(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build();
.attached_child();
let inner = self.inner.read().await;

View File

@@ -3,12 +3,13 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use std::time::{Duration, SystemTime};
use crate::PERF_TRACE_TARGET;
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::HistoricLayerInfo;
use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId};
use tracing::Instrument;
use tracing::{Instrument, info_span};
use utils::generation::Generation;
use utils::id::TimelineId;
use utils::lsn::Lsn;
@@ -324,16 +325,29 @@ impl Layer {
reconstruct_data: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let downloaded =
self.0
.get_or_maybe_download(true, ctx)
.await
.map_err(|err| match err {
DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => {
GetVectoredError::Cancelled
}
other => GetVectoredError::Other(anyhow::anyhow!(other)),
})?;
let get_layer_context = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"GET_LAYER",
)
})
.attached_child();
let downloaded = get_layer_context
.maybe_instrument(
self.0.get_or_maybe_download(true, &get_layer_context),
|crnt_perf_context| crnt_perf_context.clone(),
)
.await
.map_err(|err| match err {
DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => {
GetVectoredError::Cancelled
}
other => GetVectoredError::Other(anyhow::anyhow!(other)),
})?;
let this = ResidentLayer {
downloaded: downloaded.clone(),
owner: self.clone(),
@@ -341,9 +355,29 @@ impl Layer {
self.record_access(ctx);
downloaded
.get_values_reconstruct_data(this, keyspace, lsn_range, reconstruct_data, ctx)
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
let visit_layer_context = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"VISIT_LAYER",
)
})
.attached_child();
visit_layer_context
.maybe_instrument(
downloaded
.get_values_reconstruct_data(
this,
keyspace,
lsn_range,
reconstruct_data,
&visit_layer_context,
)
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self)),
|crnt_perf_span| crnt_perf_span.clone(),
)
.await
.map_err(|err| match err {
GetVectoredError::Other(err) => GetVectoredError::Other(
@@ -1045,15 +1079,36 @@ impl LayerInner {
return Err(DownloadError::DownloadRequired);
}
let download_ctx = ctx.detached_child(TaskKind::LayerDownload, DownloadBehavior::Download);
let download_ctx = if ctx.has_perf_span() {
let dl_ctx = RequestContextBuilder::from(ctx)
.task_kind(TaskKind::LayerDownload)
.download_behavior(DownloadBehavior::Download)
.root_perf_span(|| {
info_span!(
target: PERF_TRACE_TARGET,
"DOWNLOAD_LAYER",
layer = %self,
reason = %reason
)
})
.detached_child();
ctx.perf_follows_from(&dl_ctx);
dl_ctx
} else {
ctx.attached_child()
};
async move {
tracing::info!(%reason, "downloading on-demand");
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
let res = self
.download_init_and_wait(timeline, permit, download_ctx)
let res = download_ctx
.maybe_instrument(
self.download_init_and_wait(timeline, permit, download_ctx.attached_child()),
|crnt_perf_span| crnt_perf_span.clone(),
)
.await?;
scopeguard::ScopeGuard::into_inner(init_cancelled);
Ok(res)
}
@@ -1720,9 +1775,9 @@ impl DownloadedLayer {
);
let res = if owner.desc.is_delta {
let ctx = RequestContextBuilder::extend(ctx)
let ctx = RequestContextBuilder::from(ctx)
.page_content_kind(crate::context::PageContentKind::DeltaLayerSummary)
.build();
.attached_child();
let summary = Some(delta_layer::Summary::expected(
owner.desc.tenant_shard_id.tenant_id,
owner.desc.timeline_id,
@@ -1738,9 +1793,9 @@ impl DownloadedLayer {
.await
.map(LayerKind::Delta)
} else {
let ctx = RequestContextBuilder::extend(ctx)
let ctx = RequestContextBuilder::from(ctx)
.page_content_kind(crate::context::PageContentKind::ImageLayerSummary)
.build();
.attached_child();
let lsn = owner.desc.image_layer_lsn();
let summary = Some(image_layer::Summary::expected(
owner.desc.tenant_shard_id.tenant_id,

View File

@@ -644,9 +644,9 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
.unwrap();
// This test does downloads
let ctx = RequestContextBuilder::extend(&ctx)
let ctx = RequestContextBuilder::from(&ctx)
.download_behavior(DownloadBehavior::Download)
.build();
.attached_child();
let layer = {
let mut layers = {
@@ -729,9 +729,9 @@ async fn evict_and_wait_does_not_wait_for_download() {
.unwrap();
// This test does downloads
let ctx = RequestContextBuilder::extend(&ctx)
let ctx = RequestContextBuilder::from(&ctx)
.download_behavior(DownloadBehavior::Download)
.build();
.attached_child();
let layer = {
let mut layers = {

View File

@@ -23,6 +23,7 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use crate::PERF_TRACE_TARGET;
use anyhow::{Context, Result, anyhow, bail, ensure};
use arc_swap::{ArcSwap, ArcSwapOption};
use bytes::Bytes;
@@ -94,7 +95,7 @@ use super::{
};
use crate::aux_file::AuxFileSizeEstimator;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
use crate::disk_usage_eviction_task::{DiskUsageEvictionInfo, EvictionCandidate, finite_f32};
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::l0_flush::{self, L0FlushGlobalState};
@@ -1274,9 +1275,28 @@ impl Timeline {
};
reconstruct_state.read_path = read_path;
let traversal_res: Result<(), _> = self
.get_vectored_reconstruct_data(keyspace.clone(), lsn, reconstruct_state, ctx)
let plan_context = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"PLAN_IO",
)
})
.attached_child();
let traversal_res: Result<(), _> = plan_context
.maybe_instrument(
self.get_vectored_reconstruct_data(
keyspace.clone(),
lsn,
reconstruct_state,
&plan_context,
),
|crnt_perf_span| crnt_perf_span.clone(),
)
.await;
if let Err(err) = traversal_res {
// Wait for all the spawned IOs to complete.
// See comments on `spawn_io` inside `storage_layer` for more details.
@@ -1290,14 +1310,45 @@ impl Timeline {
let layers_visited = reconstruct_state.get_layers_visited();
let execute_context = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"RECONSTRUCT",
)
})
.attached_child();
let futs = FuturesUnordered::new();
for (key, state) in std::mem::take(&mut reconstruct_state.keys) {
futs.push({
let walredo_self = self.myself.upgrade().expect("&self method holds the arc");
let execute_key_context = RequestContextBuilder::from(&execute_context)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"RECONSTRUCT_KEY",
key = %key,
)
})
.attached_child();
async move {
assert_eq!(state.situation, ValueReconstructSituation::Complete);
let converted = match state.collect_pending_ios().await {
let res = execute_key_context
.maybe_instrument(state.collect_pending_ios(), |crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"EXECUTE_IO",
)
})
.await;
let converted = match res {
Ok(ok) => ok,
Err(err) => {
return (key, Err(err));
@@ -1314,16 +1365,31 @@ impl Timeline {
"{converted:?}"
);
(
key,
walredo_self.reconstruct_value(key, lsn, converted).await,
)
let walredo_deltas = converted.num_deltas();
let walredo_res = execute_key_context
.maybe_instrument(
walredo_self.reconstruct_value(key, lsn, converted),
|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"WALREDO",
deltas = %walredo_deltas,
)
},
)
.await;
(key, walredo_res)
}
});
}
let results = futs
.collect::<BTreeMap<Key, Result<Bytes, PageReconstructError>>>()
let results = execute_context
.maybe_instrument(
futs.collect::<BTreeMap<Key, Result<Bytes, PageReconstructError>>>(),
|crnt_perf_span| crnt_perf_span.clone(),
)
.await;
// For aux file keys (v1 or v2) the vectored read path does not return an error
@@ -3801,18 +3867,34 @@ impl Timeline {
return Err(GetVectoredError::Cancelled);
}
let plan_context = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"PLAN_IO_TIMELINE",
timeline = %timeline.timeline_id,
lsn = %cont_lsn,
)
})
.attached_child();
let TimelineVisitOutcome {
completed_keyspace: completed,
image_covered_keyspace,
} = Self::get_vectored_reconstruct_data_timeline(
timeline,
keyspace.clone(),
cont_lsn,
reconstruct_state,
&self.cancel,
ctx,
)
.await?;
} = plan_context
.maybe_instrument(
Self::get_vectored_reconstruct_data_timeline(
timeline,
keyspace.clone(),
cont_lsn,
reconstruct_state,
&self.cancel,
&plan_context,
),
|crnt_perf_span| crnt_perf_span.clone(),
)
.await?;
keyspace.remove_overlapping_with(&completed);
@@ -3856,8 +3938,26 @@ impl Timeline {
// Take the min to avoid reconstructing a page with data newer than request Lsn.
cont_lsn = std::cmp::min(Lsn(request_lsn.0 + 1), Lsn(timeline.ancestor_lsn.0 + 1));
timeline_owned = timeline
.get_ready_ancestor_timeline(ancestor_timeline, ctx)
let get_ancestor_context = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"GET_ANCESTOR",
timeline = %timeline.timeline_id,
lsn = %cont_lsn,
ancestor = %ancestor_timeline.timeline_id,
ancestor_lsn = %timeline.ancestor_lsn
)
})
.attached_child();
timeline_owned = get_ancestor_context
.maybe_instrument(
timeline.get_ready_ancestor_timeline(ancestor_timeline, &get_ancestor_context),
|crnt_perf_span| crnt_perf_span.clone(),
)
.await?;
timeline = &*timeline_owned;
};
@@ -7192,9 +7292,9 @@ mod tests {
eprintln!("Downloading {layer} and re-generating heatmap");
let ctx = &RequestContextBuilder::extend(&ctx)
let ctx = &RequestContextBuilder::from(&ctx)
.download_behavior(crate::context::DownloadBehavior::Download)
.build();
.attached_child();
let _resident = layer
.download_and_keep_resident(ctx)

View File

@@ -983,9 +983,9 @@ impl Timeline {
{
Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
let image_ctx = RequestContextBuilder::extend(ctx)
let image_ctx = RequestContextBuilder::from(ctx)
.access_stats_behavior(AccessStatsBehavior::Skip)
.build();
.attached_child();
let mut partitioning = dense_partitioning;
partitioning

View File

@@ -46,7 +46,8 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
.expect("this should be a valid filter directive"),
);
let otlp_layer = tracing_utils::init_tracing("proxy").await;
let otlp_layer =
tracing_utils::init_tracing("proxy", tracing_utils::ExportConfig::default()).await;
let json_log_layer = if logfmt == LogFormat::Json {
Some(JsonLoggingLayer::new(

View File

@@ -376,6 +376,28 @@ class PageserverWalReceiverProtocol(StrEnum):
raise ValueError(f"Unknown protocol type: {proto}")
@dataclass
class PageserverTracingConfig:
sampling_ratio: tuple[int, int]
endpoint: str
protocol: str
timeout: str
def to_config_key_value(self) -> tuple[str, dict[str, Any]]:
value = {
"sampling-ratio": {
"numerator": self.sampling_ratio[0],
"denominator": self.sampling_ratio[1],
},
"export-config": {
"endpoint": self.endpoint,
"protocol": self.protocol,
"timeout": self.timeout,
},
}
return ("tracing", value)
class NeonEnvBuilder:
"""
Builder object to create a Neon runtime environment
@@ -425,6 +447,7 @@ class NeonEnvBuilder:
pageserver_virtual_file_io_mode: str | None = None,
pageserver_wal_receiver_protocol: PageserverWalReceiverProtocol | None = None,
pageserver_get_vectored_concurrent_io: str | None = None,
pageserver_tracing_config: PageserverTracingConfig | None = None,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -468,6 +491,8 @@ class NeonEnvBuilder:
pageserver_get_vectored_concurrent_io
)
self.pageserver_tracing_config = pageserver_tracing_config
self.pageserver_default_tenant_config_compaction_algorithm: dict[str, Any] | None = (
pageserver_default_tenant_config_compaction_algorithm
)
@@ -1113,6 +1138,7 @@ class NeonEnv:
self.pageserver_virtual_file_io_mode = config.pageserver_virtual_file_io_mode
self.pageserver_wal_receiver_protocol = config.pageserver_wal_receiver_protocol
self.pageserver_get_vectored_concurrent_io = config.pageserver_get_vectored_concurrent_io
self.pageserver_tracing_config = config.pageserver_tracing_config
# Create the neon_local's `NeonLocalInitConf`
cfg: dict[str, Any] = {
@@ -1216,6 +1242,14 @@ class NeonEnv:
if key not in ps_cfg:
ps_cfg[key] = value
if self.pageserver_tracing_config is not None:
key, value = self.pageserver_tracing_config.to_config_key_value()
if key not in ps_cfg:
ps_cfg[key] = value
ps_cfg[key] = value
# Create a corresponding NeonPageserver object
self.pageservers.append(
NeonPageserver(self, ps_id, port=pageserver_port, az_id=ps_cfg["availability_zone"])

View File

@@ -106,6 +106,7 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
".*delaying layer flush by \\S+ for compaction backpressure.*",
".*stalling layer flushes for compaction backpressure.*",
".*layer roll waiting for flush due to compaction backpressure.*",
".*BatchSpanProcessor.*",
)

View File

@@ -10,6 +10,7 @@ from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PageserverTracingConfig,
PgBin,
wait_for_last_flush_lsn,
)
@@ -113,6 +114,15 @@ def setup_and_run_pagebench_benchmark(
neon_env_builder.pageserver_config_override = (
f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}"
)
tracing_config = PageserverTracingConfig(
sampling_ratio=(0, 1000),
endpoint="http://localhost:4318/v1/traces",
protocol="http-binary",
timeout="10s",
)
neon_env_builder.pageserver_tracing_config = tracing_config
ratio = tracing_config.sampling_ratio[0] / tracing_config.sampling_ratio[1]
params.update(
{
"pageserver_config_override.page_cache_size": (
@@ -120,6 +130,7 @@ def setup_and_run_pagebench_benchmark(
{"unit": "byte"},
),
"pageserver_config_override.max_file_descriptors": (max_file_descriptors, {"unit": ""}),
"pageserver_config_override.sampling_ratio": (ratio, {"unit": ""}),
}
)