mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-30 16:50:37 +00:00
Compare commits
10 Commits
local-prox
...
vlad/perf-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c4ce5dda5d | ||
|
|
74c555fd06 | ||
|
|
716bb3c361 | ||
|
|
b90b945c02 | ||
|
|
ac1159cb33 | ||
|
|
74a7f68da0 | ||
|
|
2acca9fe33 | ||
|
|
a2b9ff0d40 | ||
|
|
a7f60dd5d0 | ||
|
|
2a1cbca4e5 |
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -4301,6 +4301,7 @@ dependencies = [
|
||||
"tokio-util",
|
||||
"toml_edit",
|
||||
"tracing",
|
||||
"tracing-utils",
|
||||
"url",
|
||||
"utils",
|
||||
"wal_decoder",
|
||||
@@ -4337,6 +4338,7 @@ dependencies = [
|
||||
"strum",
|
||||
"strum_macros",
|
||||
"thiserror 1.0.69",
|
||||
"tracing-utils",
|
||||
"utils",
|
||||
]
|
||||
|
||||
@@ -7566,6 +7568,7 @@ dependencies = [
|
||||
"opentelemetry-otlp",
|
||||
"opentelemetry-semantic-conventions",
|
||||
"opentelemetry_sdk",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-opentelemetry",
|
||||
@@ -7806,6 +7809,7 @@ dependencies = [
|
||||
"tracing",
|
||||
"tracing-error",
|
||||
"tracing-subscriber",
|
||||
"tracing-utils",
|
||||
"walkdir",
|
||||
]
|
||||
|
||||
|
||||
@@ -592,6 +592,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
|
||||
utils::logging::init(
|
||||
utils::logging::LogFormat::Json,
|
||||
utils::logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
|
||||
utils::logging::OtelEnablement::Disabled,
|
||||
utils::logging::Output::Stdout,
|
||||
)?;
|
||||
|
||||
|
||||
@@ -24,7 +24,8 @@ pub async fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result
|
||||
.with_writer(std::io::stderr);
|
||||
|
||||
// Initialize OpenTelemetry
|
||||
let otlp_layer = tracing_utils::init_tracing("compute_ctl").await;
|
||||
let otlp_layer =
|
||||
tracing_utils::init_tracing("compute_ctl", tracing_utils::ExportConfig::default()).await;
|
||||
|
||||
// Put it all together
|
||||
tracing_subscriber::registry()
|
||||
|
||||
@@ -158,6 +158,7 @@ mod reliable_copy_test {
|
||||
utils::logging::init(
|
||||
utils::logging::LogFormat::Test,
|
||||
utils::logging::TracingErrorLayerEnablement::Disabled,
|
||||
utils::logging::OtelEnablement::Disabled,
|
||||
utils::logging::Output::Stdout,
|
||||
)
|
||||
.expect("logging init failed");
|
||||
|
||||
@@ -34,6 +34,7 @@ postgres_backend.workspace = true
|
||||
nix = {workspace = true, optional = true}
|
||||
reqwest.workspace = true
|
||||
rand.workspace = true
|
||||
tracing-utils.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
bincode.workspace = true
|
||||
|
||||
@@ -127,6 +127,7 @@ pub struct ConfigToml {
|
||||
pub load_previous_heatmap: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub generate_unarchival_heatmap: Option<bool>,
|
||||
pub tracing: Option<Tracing>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -184,6 +185,58 @@ pub enum GetVectoredConcurrentIo {
|
||||
SidecarTask,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub struct Ratio {
|
||||
pub numerator: usize,
|
||||
pub denominator: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub struct OtelExporterConfig {
|
||||
pub endpoint: String,
|
||||
pub protocol: OtelExporterProtocol,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub timeout: Duration,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum OtelExporterProtocol {
|
||||
Grpc,
|
||||
HttpBinary,
|
||||
HttpJson,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub struct Tracing {
|
||||
pub sampling_ratio: Ratio,
|
||||
pub export_config: OtelExporterConfig,
|
||||
}
|
||||
|
||||
impl From<&OtelExporterConfig> for tracing_utils::ExportConfig {
|
||||
fn from(val: &OtelExporterConfig) -> Self {
|
||||
tracing_utils::ExportConfig {
|
||||
endpoint: Some(val.endpoint.clone()),
|
||||
protocol: val.protocol.into(),
|
||||
timeout: val.timeout,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<OtelExporterProtocol> for tracing_utils::Protocol {
|
||||
fn from(val: OtelExporterProtocol) -> Self {
|
||||
match val {
|
||||
OtelExporterProtocol::Grpc => tracing_utils::Protocol::Grpc,
|
||||
OtelExporterProtocol::HttpJson => tracing_utils::Protocol::HttpJson,
|
||||
OtelExporterProtocol::HttpBinary => tracing_utils::Protocol::HttpBinary,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub mod statvfs {
|
||||
pub mod mock {
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -529,6 +582,7 @@ impl Default for ConfigToml {
|
||||
validate_wal_contiguity: None,
|
||||
load_previous_heatmap: None,
|
||||
generate_unarchival_heatmap: None,
|
||||
tracing: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -208,6 +208,7 @@ pub(crate) fn ensure_logging_ready() {
|
||||
utils::logging::init(
|
||||
utils::logging::LogFormat::Test,
|
||||
utils::logging::TracingErrorLayerEnablement::Disabled,
|
||||
utils::logging::OtelEnablement::Disabled,
|
||||
utils::logging::Output::Stdout,
|
||||
)
|
||||
.expect("logging init failed");
|
||||
|
||||
@@ -14,6 +14,7 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
|
||||
tracing.workspace = true
|
||||
tracing-opentelemetry.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
tracing-subscriber.workspace = true # For examples in docs
|
||||
|
||||
@@ -21,7 +21,7 @@
|
||||
//! .with_writer(std::io::stderr);
|
||||
//!
|
||||
//! // Initialize OpenTelemetry. Exports tracing spans as OpenTelemetry traces
|
||||
//! let otlp_layer = tracing_utils::init_tracing("my_application").await;
|
||||
//! let otlp_layer = tracing_utils::init_tracing("my_application", tracing_utils::ExportConfig::default()).await;
|
||||
//!
|
||||
//! // Put it all together
|
||||
//! tracing_subscriber::registry()
|
||||
@@ -31,13 +31,15 @@
|
||||
//! .init();
|
||||
//! }
|
||||
//! ```
|
||||
#![deny(unsafe_code)]
|
||||
#![deny(clippy::undocumented_unsafe_blocks)]
|
||||
|
||||
pub mod http;
|
||||
pub mod perf_span;
|
||||
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry::trace::TracerProvider;
|
||||
use opentelemetry_otlp::WithExportConfig;
|
||||
pub use opentelemetry_otlp::{ExportConfig, Protocol};
|
||||
use tracing::Subscriber;
|
||||
use tracing_subscriber::Layer;
|
||||
use tracing_subscriber::registry::LookupSpan;
|
||||
@@ -69,19 +71,28 @@ use tracing_subscriber::registry::LookupSpan;
|
||||
///
|
||||
/// This doesn't block, but is marked as 'async' to hint that this must be called in
|
||||
/// asynchronous execution context.
|
||||
pub async fn init_tracing<S>(service_name: &str) -> Option<impl Layer<S>>
|
||||
pub async fn init_tracing<S>(
|
||||
service_name: &str,
|
||||
export_config: ExportConfig,
|
||||
) -> Option<impl Layer<S>>
|
||||
where
|
||||
S: Subscriber + for<'span> LookupSpan<'span>,
|
||||
{
|
||||
if std::env::var("OTEL_SDK_DISABLED") == Ok("true".to_string()) {
|
||||
return None;
|
||||
};
|
||||
Some(init_tracing_internal(service_name.to_string()))
|
||||
Some(init_tracing_internal(
|
||||
service_name.to_string(),
|
||||
export_config,
|
||||
))
|
||||
}
|
||||
|
||||
/// Like `init_tracing`, but creates a separate tokio Runtime for the tracing
|
||||
/// tasks.
|
||||
pub fn init_tracing_without_runtime<S>(service_name: &str) -> Option<impl Layer<S>>
|
||||
pub fn init_tracing_without_runtime<S>(
|
||||
service_name: &str,
|
||||
export_config: ExportConfig,
|
||||
) -> Option<impl Layer<S>>
|
||||
where
|
||||
S: Subscriber + for<'span> LookupSpan<'span>,
|
||||
{
|
||||
@@ -112,16 +123,20 @@ where
|
||||
));
|
||||
let _guard = runtime.enter();
|
||||
|
||||
Some(init_tracing_internal(service_name.to_string()))
|
||||
Some(init_tracing_internal(
|
||||
service_name.to_string(),
|
||||
export_config,
|
||||
))
|
||||
}
|
||||
|
||||
fn init_tracing_internal<S>(service_name: String) -> impl Layer<S>
|
||||
fn init_tracing_internal<S>(service_name: String, export_config: ExportConfig) -> impl Layer<S>
|
||||
where
|
||||
S: Subscriber + for<'span> LookupSpan<'span>,
|
||||
{
|
||||
// Sets up exporter from the OTEL_EXPORTER_* environment variables.
|
||||
let exporter = opentelemetry_otlp::SpanExporter::builder()
|
||||
.with_http()
|
||||
.with_export_config(export_config)
|
||||
.build()
|
||||
.expect("could not initialize opentelemetry exporter");
|
||||
|
||||
|
||||
149
libs/tracing-utils/src/perf_span.rs
Normal file
149
libs/tracing-utils/src/perf_span.rs
Normal file
@@ -0,0 +1,149 @@
|
||||
//! Crutch module to work around tracing infrastructure deficiencies
|
||||
//!
|
||||
//! We wish to collect granular request spans without impacting performance
|
||||
//! by much. Ideally, we should have zero overhead for a sampling rate of 0.
|
||||
//!
|
||||
//! The approach taken by the pageserver crate is to use a completely different
|
||||
//! span hierarchy for the performance spans. Spans are explicitly stored in
|
||||
//! the request context and use a different [`tracing::Subscriber`] in order
|
||||
//! to avoid expensive filtering.
|
||||
//!
|
||||
//! [`tracing::Span`] instances record their [`tracing::Dispatch`] and, implcitly,
|
||||
//! their [`tracing::Subscriber`] at creation time. However, upon exiting the span,
|
||||
//! the global default [`tracing::Dispatch`] is used. This is problematic if one
|
||||
//! wishes to juggle different subscribers.
|
||||
//!
|
||||
//! In order to work around this, this module provides a [`PerfSpan`] type which
|
||||
//! wraps a [`Span`] and sets the default subscriber when exiting the span. This
|
||||
//! achieves the correct routing.
|
||||
//!
|
||||
//! There's also a modified version of [`tracing::Instrument`] which works with
|
||||
//! [`PerfSpan`].
|
||||
|
||||
use core::{
|
||||
future::Future,
|
||||
marker::Sized,
|
||||
mem::ManuallyDrop,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use pin_project_lite::pin_project;
|
||||
use tracing::{Dispatch, field, span::Span};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PerfSpan {
|
||||
inner: ManuallyDrop<Span>,
|
||||
dispatch: Dispatch,
|
||||
}
|
||||
|
||||
#[must_use = "once a span has been entered, it should be exited"]
|
||||
pub struct PerfSpanEntered<'a> {
|
||||
span: &'a PerfSpan,
|
||||
}
|
||||
|
||||
impl PerfSpan {
|
||||
pub fn new(span: Span, dispatch: Dispatch) -> Self {
|
||||
Self {
|
||||
inner: ManuallyDrop::new(span),
|
||||
dispatch,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn record<Q: field::AsField + ?Sized, V: field::Value>(
|
||||
&self,
|
||||
field: &Q,
|
||||
value: V,
|
||||
) -> &Self {
|
||||
self.inner.record(field, value);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn enter(&self) -> PerfSpanEntered {
|
||||
PerfSpanEntered { span: self }
|
||||
}
|
||||
|
||||
pub fn inner(&self) -> &Span {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PerfSpan {
|
||||
fn drop(&mut self) {
|
||||
// Bring the desired dispatch into scope before explicitly calling
|
||||
// the span destructor. This routes the span exit to the correct
|
||||
// [`tracing::Subscriber`].
|
||||
let _dispatch_guard = tracing::dispatcher::set_default(&self.dispatch);
|
||||
// SAFETY: ManuallyDrop in Drop implementation
|
||||
unsafe { ManuallyDrop::drop(&mut self.inner) }
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PerfSpanEntered<'_> {
|
||||
fn drop(&mut self) {
|
||||
assert!(self.span.inner.id().is_some());
|
||||
|
||||
let _dispatch_guard = tracing::dispatcher::set_default(&self.span.dispatch);
|
||||
self.span.dispatch.exit(&self.span.inner.id().unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
pub trait PerfInstrument: Sized {
|
||||
fn instrument(self, span: PerfSpan) -> PerfInstrumented<Self> {
|
||||
PerfInstrumented {
|
||||
inner: ManuallyDrop::new(self),
|
||||
span,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
#[project = PerfInstrumentedProj]
|
||||
#[derive(Debug, Clone)]
|
||||
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||
pub struct PerfInstrumented<T> {
|
||||
// `ManuallyDrop` is used here to to enter instrument `Drop` by entering
|
||||
// `Span` and executing `ManuallyDrop::drop`.
|
||||
#[pin]
|
||||
inner: ManuallyDrop<T>,
|
||||
span: PerfSpan,
|
||||
}
|
||||
|
||||
impl<T> PinnedDrop for PerfInstrumented<T> {
|
||||
fn drop(this: Pin<&mut Self>) {
|
||||
let this = this.project();
|
||||
let _enter = this.span.enter();
|
||||
// SAFETY: 1. `Pin::get_unchecked_mut()` is safe, because this isn't
|
||||
// different from wrapping `T` in `Option` and calling
|
||||
// `Pin::set(&mut this.inner, None)`, except avoiding
|
||||
// additional memory overhead.
|
||||
// 2. `ManuallyDrop::drop()` is safe, because
|
||||
// `PinnedDrop::drop()` is guaranteed to be called only
|
||||
// once.
|
||||
unsafe { ManuallyDrop::drop(this.inner.get_unchecked_mut()) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> PerfInstrumentedProj<'a, T> {
|
||||
/// Get a mutable reference to the [`Span`] a pinned mutable reference to
|
||||
/// the wrapped type.
|
||||
fn span_and_inner_pin_mut(self) -> (&'a mut PerfSpan, Pin<&'a mut T>) {
|
||||
// SAFETY: As long as `ManuallyDrop<T>` does not move, `T` won't move
|
||||
// and `inner` is valid, because `ManuallyDrop::drop` is called
|
||||
// only inside `Drop` of the `Instrumented`.
|
||||
let inner = unsafe { self.inner.map_unchecked_mut(|v| &mut **v) };
|
||||
(self.span, inner)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Future> Future for PerfInstrumented<T> {
|
||||
type Output = T::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let (span, inner) = self.project().span_and_inner_pin_mut();
|
||||
let _enter = span.enter();
|
||||
inner.poll(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Sized> PerfInstrument for T {}
|
||||
@@ -42,6 +42,7 @@ toml_edit = { workspace = true, features = ["serde"] }
|
||||
tracing.workspace = true
|
||||
tracing-error.workspace = true
|
||||
tracing-subscriber = { workspace = true, features = ["json", "registry"] }
|
||||
tracing-utils.workspace = true
|
||||
rand.workspace = true
|
||||
scopeguard.workspace = true
|
||||
strum.workspace = true
|
||||
|
||||
@@ -7,7 +7,9 @@ use metrics::{IntCounter, IntCounterVec};
|
||||
use once_cell::sync::Lazy;
|
||||
use strum_macros::{EnumString, VariantNames};
|
||||
use tokio::time::Instant;
|
||||
use tracing::Dispatch;
|
||||
use tracing::info;
|
||||
use tracing::level_filters::LevelFilter;
|
||||
|
||||
/// Logs a critical error, similarly to `tracing::error!`. This will:
|
||||
///
|
||||
@@ -125,6 +127,15 @@ pub enum TracingErrorLayerEnablement {
|
||||
EnableWithRustLogFilter,
|
||||
}
|
||||
|
||||
pub enum OtelEnablement {
|
||||
Disabled,
|
||||
Enabled {
|
||||
service_name: String,
|
||||
export_config: tracing_utils::ExportConfig,
|
||||
runtime: &'static tokio::runtime::Runtime,
|
||||
},
|
||||
}
|
||||
|
||||
/// Where the logging should output to.
|
||||
#[derive(Clone, Copy)]
|
||||
pub enum Output {
|
||||
@@ -132,11 +143,24 @@ pub enum Output {
|
||||
Stderr,
|
||||
}
|
||||
|
||||
pub struct OtelGuard {
|
||||
pub dispatch: Dispatch,
|
||||
}
|
||||
|
||||
impl Drop for OtelGuard {
|
||||
fn drop(&mut self) {
|
||||
tracing_utils::shutdown_tracing();
|
||||
}
|
||||
}
|
||||
|
||||
pub const PERF_TRACE_TARGET: &str = "P";
|
||||
|
||||
pub fn init(
|
||||
log_format: LogFormat,
|
||||
tracing_error_layer_enablement: TracingErrorLayerEnablement,
|
||||
otel_enablement: OtelEnablement,
|
||||
output: Output,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> anyhow::Result<Option<OtelGuard>> {
|
||||
// We fall back to printing all spans at info-level or above if
|
||||
// the RUST_LOG environment variable is not set.
|
||||
let rust_log_env_filter = || {
|
||||
@@ -165,6 +189,7 @@ pub fn init(
|
||||
};
|
||||
log_layer.with_filter(rust_log_env_filter())
|
||||
});
|
||||
|
||||
let r = r.with(
|
||||
TracingEventCountLayer(&TRACING_EVENT_COUNT_METRIC).with_filter(rust_log_env_filter()),
|
||||
);
|
||||
@@ -175,7 +200,26 @@ pub fn init(
|
||||
TracingErrorLayerEnablement::Disabled => r.init(),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
let otel_subscriber = match otel_enablement {
|
||||
OtelEnablement::Disabled => None,
|
||||
OtelEnablement::Enabled {
|
||||
service_name,
|
||||
export_config,
|
||||
runtime,
|
||||
} => {
|
||||
let otel_layer = runtime
|
||||
.block_on(tracing_utils::init_tracing(&service_name, export_config))
|
||||
.with_filter(LevelFilter::INFO);
|
||||
let otel_subscriber = tracing_subscriber::registry().with(otel_layer);
|
||||
let otel_dispatch = Dispatch::new(otel_subscriber);
|
||||
|
||||
Some(otel_dispatch)
|
||||
}
|
||||
};
|
||||
|
||||
let otel_guard = otel_subscriber.map(|dispatch| OtelGuard { dispatch });
|
||||
|
||||
Ok(otel_guard)
|
||||
}
|
||||
|
||||
/// Disable the default rust panic hook by using `set_hook`.
|
||||
|
||||
@@ -66,6 +66,7 @@ tokio-stream.workspace = true
|
||||
tokio-util.workspace = true
|
||||
toml_edit = { workspace = true, features = [ "serde" ] }
|
||||
tracing.workspace = true
|
||||
tracing-utils.workspace = true
|
||||
url.workspace = true
|
||||
walkdir.workspace = true
|
||||
metrics.workspace = true
|
||||
|
||||
@@ -10,9 +10,10 @@ pub(crate) fn setup_logging() {
|
||||
logging::init(
|
||||
logging::LogFormat::Test,
|
||||
logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
|
||||
utils::logging::OtelEnablement::Disabled,
|
||||
logging::Output::Stdout,
|
||||
)
|
||||
.expect("Failed to init test logging")
|
||||
.expect("Failed to init test logging");
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -117,6 +117,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
logging::init(
|
||||
LogFormat::Plain,
|
||||
TracingErrorLayerEnablement::EnableWithRustLogFilter,
|
||||
utils::logging::OtelEnablement::Disabled,
|
||||
logging::Output::Stdout,
|
||||
)?;
|
||||
|
||||
|
||||
@@ -35,6 +35,7 @@ fn main() {
|
||||
logging::init(
|
||||
logging::LogFormat::Plain,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
utils::logging::OtelEnablement::Disabled,
|
||||
logging::Output::Stderr,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -21,7 +21,8 @@ use pageserver::deletion_queue::DeletionQueue;
|
||||
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
|
||||
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
|
||||
use pageserver::task_mgr::{
|
||||
BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME, WALRECEIVER_RUNTIME,
|
||||
BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME, OTEL_RUNTIME,
|
||||
WALRECEIVER_RUNTIME,
|
||||
};
|
||||
use pageserver::tenant::{TenantSharedResources, mgr, secondary};
|
||||
use pageserver::{
|
||||
@@ -36,7 +37,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::auth::{JwtAuth, SwappableJwtAuth};
|
||||
use utils::crashsafe::syncfs;
|
||||
use utils::logging::TracingErrorLayerEnablement;
|
||||
use utils::logging::{OtelGuard, TracingErrorLayerEnablement};
|
||||
use utils::sentry_init::init_sentry;
|
||||
use utils::{failpoint_support, logging, project_build_tag, project_git_version, tcp_listener};
|
||||
|
||||
@@ -110,12 +111,27 @@ fn main() -> anyhow::Result<()> {
|
||||
} else {
|
||||
TracingErrorLayerEnablement::Disabled
|
||||
};
|
||||
logging::init(
|
||||
|
||||
let otel_enablement = match &conf.tracing {
|
||||
Some(cfg) => utils::logging::OtelEnablement::Enabled {
|
||||
service_name: "pageserver".to_string(),
|
||||
export_config: (&cfg.export_config).into(),
|
||||
runtime: *OTEL_RUNTIME,
|
||||
},
|
||||
None => utils::logging::OtelEnablement::Disabled,
|
||||
};
|
||||
|
||||
let otel_guard = logging::init(
|
||||
conf.log_format,
|
||||
tracing_error_layer_enablement,
|
||||
otel_enablement,
|
||||
logging::Output::Stdout,
|
||||
)?;
|
||||
|
||||
if otel_guard.is_some() {
|
||||
info!(?conf.tracing, "starting with OTEL tracing enabled");
|
||||
}
|
||||
|
||||
// mind the order required here: 1. logging, 2. panic_hook, 3. sentry.
|
||||
// disarming this hook on pageserver, because we never tear down tracing.
|
||||
logging::replace_panic_hook_with_tracing_panic_hook().forget();
|
||||
@@ -189,7 +205,7 @@ fn main() -> anyhow::Result<()> {
|
||||
tracing::info!("Initializing page_cache...");
|
||||
page_cache::init(conf.page_cache_size);
|
||||
|
||||
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;
|
||||
start_pageserver(launch_ts, conf, otel_guard).context("Failed to start pageserver")?;
|
||||
|
||||
scenario.teardown();
|
||||
Ok(())
|
||||
@@ -287,6 +303,7 @@ fn startup_checkpoint(started_at: Instant, phase: &str, human_phase: &str) {
|
||||
fn start_pageserver(
|
||||
launch_ts: &'static LaunchTimestamp,
|
||||
conf: &'static PageServerConf,
|
||||
otel_guard: Option<OtelGuard>,
|
||||
) -> anyhow::Result<()> {
|
||||
// Monotonic time for later calculating startup duration
|
||||
let started_startup_at = Instant::now();
|
||||
@@ -634,13 +651,21 @@ fn start_pageserver(
|
||||
|
||||
// Spawn a task to listen for libpq connections. It will spawn further tasks
|
||||
// for each connection. We created the listener earlier already.
|
||||
let page_service = page_service::spawn(conf, tenant_manager.clone(), pg_auth, {
|
||||
let _entered = COMPUTE_REQUEST_RUNTIME.enter(); // TcpListener::from_std requires it
|
||||
pageserver_listener
|
||||
.set_nonblocking(true)
|
||||
.context("set listener to nonblocking")?;
|
||||
tokio::net::TcpListener::from_std(pageserver_listener).context("create tokio listener")?
|
||||
});
|
||||
let perf_trace_dispatch = otel_guard.as_ref().map(|g| g.dispatch.clone());
|
||||
let page_service = page_service::spawn(
|
||||
conf,
|
||||
tenant_manager.clone(),
|
||||
pg_auth,
|
||||
perf_trace_dispatch,
|
||||
{
|
||||
let _entered = COMPUTE_REQUEST_RUNTIME.enter(); // TcpListener::from_std requires it
|
||||
pageserver_listener
|
||||
.set_nonblocking(true)
|
||||
.context("set listener to nonblocking")?;
|
||||
tokio::net::TcpListener::from_std(pageserver_listener)
|
||||
.context("create tokio listener")?
|
||||
},
|
||||
);
|
||||
|
||||
// All started up! Now just sit and wait for shutdown signal.
|
||||
BACKGROUND_RUNTIME.block_on(async move {
|
||||
|
||||
@@ -201,6 +201,8 @@ pub struct PageServerConf {
|
||||
|
||||
/// When set, include visible layers in the next uploaded heatmaps of an unarchived timeline.
|
||||
pub generate_unarchival_heatmap: bool,
|
||||
|
||||
pub tracing: Option<pageserver_api::config::Tracing>,
|
||||
}
|
||||
|
||||
/// Token for authentication to safekeepers
|
||||
@@ -367,6 +369,7 @@ impl PageServerConf {
|
||||
validate_wal_contiguity,
|
||||
load_previous_heatmap,
|
||||
generate_unarchival_heatmap,
|
||||
tracing,
|
||||
} = config_toml;
|
||||
|
||||
let mut conf = PageServerConf {
|
||||
@@ -412,6 +415,7 @@ impl PageServerConf {
|
||||
wal_receiver_protocol,
|
||||
page_service_pipelining,
|
||||
get_vectored_concurrent_io,
|
||||
tracing,
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// fields that require additional validation or custom handling
|
||||
@@ -476,6 +480,17 @@ impl PageServerConf {
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(tracing_config) = conf.tracing.as_ref() {
|
||||
let ratio = &tracing_config.sampling_ratio;
|
||||
ensure!(
|
||||
ratio.denominator != 0 && ratio.denominator >= ratio.numerator,
|
||||
format!(
|
||||
"Invalid sampling ratio: {}/{}",
|
||||
ratio.numerator, ratio.denominator
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
IndexEntry::validate_checkpoint_distance(conf.default_tenant_conf.checkpoint_distance)
|
||||
.map_err(anyhow::Error::msg)
|
||||
.with_context(|| {
|
||||
|
||||
@@ -89,16 +89,38 @@
|
||||
//! [`RequestContext`] argument. Functions in the middle of the call chain
|
||||
//! only need to pass it on.
|
||||
|
||||
use futures::FutureExt;
|
||||
use futures::future::BoxFuture;
|
||||
use std::future::Future;
|
||||
use tracing_utils::perf_span::{PerfInstrument, PerfSpan};
|
||||
|
||||
use tracing::{Dispatch, Span};
|
||||
|
||||
use crate::task_mgr::TaskKind;
|
||||
|
||||
// The main structure of this module, see module-level comment.
|
||||
#[derive(Debug)]
|
||||
#[derive(Clone)]
|
||||
pub struct RequestContext {
|
||||
task_kind: TaskKind,
|
||||
download_behavior: DownloadBehavior,
|
||||
access_stats_behavior: AccessStatsBehavior,
|
||||
page_content_kind: PageContentKind,
|
||||
read_path_debug: bool,
|
||||
perf_span: Option<PerfSpan>,
|
||||
perf_span_dispatch: Option<Dispatch>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for RequestContext {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("RequestContext")
|
||||
.field("task_kind", &self.task_kind)
|
||||
.field("download_behavior", &self.download_behavior)
|
||||
.field("access_stats_behavior", &self.access_stats_behavior)
|
||||
.field("page_content_kind", &self.page_content_kind)
|
||||
.field("read_path_debug", &self.read_path_debug)
|
||||
// perf_span and perf_span_dispatch are omitted on purpose
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// The kind of access to the page cache.
|
||||
@@ -157,24 +179,23 @@ impl RequestContextBuilder {
|
||||
access_stats_behavior: AccessStatsBehavior::Update,
|
||||
page_content_kind: PageContentKind::Unknown,
|
||||
read_path_debug: false,
|
||||
perf_span: None,
|
||||
perf_span_dispatch: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn extend(original: &RequestContext) -> Self {
|
||||
pub fn from(original: &RequestContext) -> Self {
|
||||
Self {
|
||||
// This is like a Copy, but avoid implementing Copy because ordinary users of
|
||||
// RequestContext should always move or ref it.
|
||||
inner: RequestContext {
|
||||
task_kind: original.task_kind,
|
||||
download_behavior: original.download_behavior,
|
||||
access_stats_behavior: original.access_stats_behavior,
|
||||
page_content_kind: original.page_content_kind,
|
||||
read_path_debug: original.read_path_debug,
|
||||
},
|
||||
inner: original.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn task_kind(mut self, b: TaskKind) -> Self {
|
||||
self.inner.task_kind = b;
|
||||
self
|
||||
}
|
||||
|
||||
/// Configure the DownloadBehavior of the context: whether to
|
||||
/// download missing layers, and/or warn on the download.
|
||||
pub fn download_behavior(mut self, b: DownloadBehavior) -> Self {
|
||||
@@ -199,7 +220,52 @@ impl RequestContextBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(self) -> RequestContext {
|
||||
pub(crate) fn perf_span_dispatch(mut self, dispatch: Option<Dispatch>) -> Self {
|
||||
self.inner.perf_span_dispatch = dispatch;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn root_perf_span<Fn>(mut self, make_span: Fn) -> Self
|
||||
where
|
||||
Fn: FnOnce() -> Span,
|
||||
{
|
||||
assert!(self.inner.perf_span.is_none());
|
||||
assert!(self.inner.perf_span_dispatch.is_some());
|
||||
|
||||
let dispatcher = self.inner.perf_span_dispatch.as_ref().unwrap();
|
||||
let new_span = tracing::dispatcher::with_default(dispatcher, make_span);
|
||||
|
||||
self.inner.perf_span = Some(PerfSpan::new(new_span, dispatcher.clone()));
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub fn perf_span<Fn>(mut self, make_span: Fn) -> Self
|
||||
where
|
||||
Fn: FnOnce(&Span) -> Span,
|
||||
{
|
||||
if let Some(ref perf_span) = self.inner.perf_span {
|
||||
assert!(self.inner.perf_span_dispatch.is_some());
|
||||
let dispatcher = self.inner.perf_span_dispatch.as_ref().unwrap();
|
||||
|
||||
let new_span =
|
||||
tracing::dispatcher::with_default(dispatcher, || make_span(perf_span.inner()));
|
||||
|
||||
self.inner.perf_span = Some(PerfSpan::new(new_span, dispatcher.clone()));
|
||||
}
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub fn root(self) -> RequestContext {
|
||||
self.inner
|
||||
}
|
||||
|
||||
pub fn attached_child(self) -> RequestContext {
|
||||
self.inner
|
||||
}
|
||||
|
||||
pub fn detached_child(self) -> RequestContext {
|
||||
self.inner
|
||||
}
|
||||
}
|
||||
@@ -220,7 +286,7 @@ impl RequestContext {
|
||||
pub fn new(task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
|
||||
RequestContextBuilder::new(task_kind)
|
||||
.download_behavior(download_behavior)
|
||||
.build()
|
||||
.root()
|
||||
}
|
||||
|
||||
/// Create a detached child context for a task that may outlive `self`.
|
||||
@@ -241,7 +307,10 @@ impl RequestContext {
|
||||
///
|
||||
/// We could make new calls to this function fail if `self` is already canceled.
|
||||
pub fn detached_child(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
|
||||
self.child_impl(task_kind, download_behavior)
|
||||
RequestContextBuilder::from(self)
|
||||
.task_kind(task_kind)
|
||||
.download_behavior(download_behavior)
|
||||
.detached_child()
|
||||
}
|
||||
|
||||
/// Create a child of context `self` for a task that shall not outlive `self`.
|
||||
@@ -265,7 +334,7 @@ impl RequestContext {
|
||||
/// The method to wait for child tasks would return an error, indicating
|
||||
/// that the child task was not started because the context was canceled.
|
||||
pub fn attached_child(&self) -> Self {
|
||||
self.child_impl(self.task_kind(), self.download_behavior())
|
||||
RequestContextBuilder::from(self).attached_child()
|
||||
}
|
||||
|
||||
/// Use this function when you should be creating a child context using
|
||||
@@ -280,10 +349,6 @@ impl RequestContext {
|
||||
Self::new(task_kind, download_behavior)
|
||||
}
|
||||
|
||||
fn child_impl(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
|
||||
Self::new(task_kind, download_behavior)
|
||||
}
|
||||
|
||||
pub fn task_kind(&self) -> TaskKind {
|
||||
self.task_kind
|
||||
}
|
||||
@@ -303,4 +368,51 @@ impl RequestContext {
|
||||
pub(crate) fn read_path_debug(&self) -> bool {
|
||||
self.read_path_debug
|
||||
}
|
||||
|
||||
pub(crate) fn perf_follows_from(&self, from: &RequestContext) {
|
||||
if let (Some(span), Some(from_span)) = (&self.perf_span, &from.perf_span) {
|
||||
span.inner().follows_from(from_span.inner());
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn maybe_instrument<'a, Fut, Fn>(
|
||||
&self,
|
||||
future: Fut,
|
||||
make_span: Fn,
|
||||
) -> BoxFuture<'a, Fut::Output>
|
||||
where
|
||||
Fut: Future + Send + 'a,
|
||||
Fn: FnOnce(&Span) -> Span,
|
||||
{
|
||||
match &self.perf_span {
|
||||
Some(perf_span) => {
|
||||
assert!(self.perf_span_dispatch.is_some());
|
||||
let dispatcher = self.perf_span_dispatch.as_ref().unwrap();
|
||||
|
||||
let new_span =
|
||||
tracing::dispatcher::with_default(dispatcher, || make_span(perf_span.inner()));
|
||||
|
||||
let new_perf_span = PerfSpan::new(new_span, dispatcher.clone());
|
||||
future.instrument(new_perf_span).boxed()
|
||||
}
|
||||
None => future.boxed(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn perf_span_record<
|
||||
Q: tracing::field::AsField + ?Sized,
|
||||
V: tracing::field::Value,
|
||||
>(
|
||||
&self,
|
||||
field: &Q,
|
||||
value: V,
|
||||
) {
|
||||
if let Some(span) = &self.perf_span {
|
||||
span.record(field, value);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn has_perf_span(&self) -> bool {
|
||||
self.perf_span.is_some()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2579,9 +2579,10 @@ async fn getpage_at_lsn_handler_inner(
|
||||
let lsn: Option<Lsn> = parse_query_param(&request, "lsn")?;
|
||||
|
||||
async {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
// Enable read path debugging
|
||||
let ctx = RequestContextBuilder::extend(&ctx).read_path_debug(true).build();
|
||||
let ctx = RequestContextBuilder::new(TaskKind::MgmtRequest)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.read_path_debug(true)
|
||||
.root();
|
||||
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
|
||||
|
||||
// Use last_record_lsn if no lsn is provided
|
||||
|
||||
@@ -17,7 +17,7 @@ use itertools::Itertools;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::config::{
|
||||
PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
|
||||
PageServiceProtocolPipelinedExecutionStrategy,
|
||||
PageServiceProtocolPipelinedExecutionStrategy, Tracing,
|
||||
};
|
||||
use pageserver_api::key::rel_block_to_key;
|
||||
use pageserver_api::models::{
|
||||
@@ -36,6 +36,7 @@ use postgres_ffi::BLCKSZ;
|
||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||
use pq_proto::framed::ConnectionError;
|
||||
use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor};
|
||||
use rand::Rng;
|
||||
use strum_macros::IntoStaticStr;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter};
|
||||
use tokio::task::JoinHandle;
|
||||
@@ -44,6 +45,7 @@ use tracing::*;
|
||||
use utils::auth::{Claims, Scope, SwappableJwtAuth};
|
||||
use utils::failpoint_support;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::logging::PERF_TRACE_TARGET;
|
||||
use utils::logging::log_slow;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::simple_rcu::RcuReadGuard;
|
||||
@@ -53,7 +55,7 @@ use utils::sync::spsc_fold;
|
||||
use crate::auth::check_permission;
|
||||
use crate::basebackup::BasebackupError;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
|
||||
use crate::metrics::{
|
||||
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, LIVE_CONNECTIONS, SmgrOpTimer,
|
||||
};
|
||||
@@ -99,6 +101,7 @@ pub fn spawn(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
pg_auth: Option<Arc<SwappableJwtAuth>>,
|
||||
perf_trace_dispatch: Option<Dispatch>,
|
||||
tcp_listener: tokio::net::TcpListener,
|
||||
) -> Listener {
|
||||
let cancel = CancellationToken::new();
|
||||
@@ -116,6 +119,7 @@ pub fn spawn(
|
||||
conf,
|
||||
tenant_manager,
|
||||
pg_auth,
|
||||
perf_trace_dispatch,
|
||||
tcp_listener,
|
||||
conf.pg_auth_type,
|
||||
conf.page_service_pipelining.clone(),
|
||||
@@ -172,6 +176,7 @@ pub async fn libpq_listener_main(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
perf_trace_dispatch: Option<Dispatch>,
|
||||
listener: tokio::net::TcpListener,
|
||||
auth_type: AuthType,
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
@@ -204,8 +209,12 @@ pub async fn libpq_listener_main(
|
||||
// Connection established. Spawn a new task to handle it.
|
||||
debug!("accepted connection from {}", peer_addr);
|
||||
let local_auth = auth.clone();
|
||||
let connection_ctx = listener_ctx
|
||||
.detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
|
||||
let connection_ctx = RequestContextBuilder::from(&listener_ctx)
|
||||
.task_kind(TaskKind::PageRequestHandler)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.perf_span_dispatch(perf_trace_dispatch.clone())
|
||||
.detached_child();
|
||||
|
||||
connection_handler_tasks.spawn(page_service_conn_main(
|
||||
conf,
|
||||
tenant_manager.clone(),
|
||||
@@ -600,6 +609,7 @@ impl std::fmt::Display for BatchedPageStreamError {
|
||||
struct BatchedGetPageRequest {
|
||||
req: PagestreamGetPageRequest,
|
||||
timer: SmgrOpTimer,
|
||||
ctx: RequestContext,
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -736,6 +746,7 @@ impl PageServerHandler {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
timeline_handles: &mut TimelineHandles,
|
||||
tracing_config: Option<&Tracing>,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
protocol_version: PagestreamProtocolVersion,
|
||||
@@ -895,10 +906,55 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
let key = rel_block_to_key(req.rel, req.blkno);
|
||||
let shard = match timeline_handles
|
||||
.get(tenant_id, timeline_id, ShardSelector::Page(key))
|
||||
.await
|
||||
{
|
||||
|
||||
let sampled = match tracing_config {
|
||||
Some(conf) => {
|
||||
let ratio = &conf.sampling_ratio;
|
||||
|
||||
if ratio.numerator == 0 {
|
||||
false
|
||||
} else {
|
||||
rand::thread_rng().gen_range(0..ratio.denominator) < ratio.numerator
|
||||
}
|
||||
}
|
||||
None => false,
|
||||
};
|
||||
|
||||
let get_page_context = if sampled {
|
||||
RequestContextBuilder::from(ctx)
|
||||
.root_perf_span(|| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
"GET_PAGE",
|
||||
tenant_id = %tenant_id,
|
||||
timeline_id = %timeline_id,
|
||||
lsn = %req.hdr.request_lsn,
|
||||
request_id = %req.hdr.reqid,
|
||||
key = %key)
|
||||
})
|
||||
.attached_child()
|
||||
} else {
|
||||
ctx.attached_child()
|
||||
};
|
||||
|
||||
let res = get_page_context
|
||||
.maybe_instrument(
|
||||
timeline_handles.get(tenant_id, timeline_id, ShardSelector::Page(key)),
|
||||
|current_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: current_perf_span,
|
||||
"SHARD_SELECTION",
|
||||
tenant_id = %tenant_id,
|
||||
timeline_id = %timeline_id,
|
||||
lsn = %req.hdr.request_lsn,
|
||||
request_id = %req.hdr.reqid
|
||||
)
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
let shard = match res {
|
||||
Ok(tl) => tl,
|
||||
Err(e) => {
|
||||
let span = mkspan!(before shard routing);
|
||||
@@ -927,24 +983,59 @@ impl PageServerHandler {
|
||||
};
|
||||
let span = mkspan!(shard.tenant_shard_id.shard_slug());
|
||||
|
||||
let timer = record_op_start_and_throttle(
|
||||
&shard,
|
||||
metrics::SmgrQueryType::GetPageAtLsn,
|
||||
received_at,
|
||||
)
|
||||
.await?;
|
||||
// TODO(vlad): why does this not show up?
|
||||
get_page_context.perf_span_record(
|
||||
"shard",
|
||||
tracing::field::display(shard.get_shard_identity().shard_slug()),
|
||||
);
|
||||
|
||||
let timer = get_page_context
|
||||
.maybe_instrument(
|
||||
record_op_start_and_throttle(
|
||||
&shard,
|
||||
metrics::SmgrQueryType::GetPageAtLsn,
|
||||
received_at,
|
||||
),
|
||||
|current_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: current_perf_span,
|
||||
"THROTTLE",
|
||||
tenant_id = %tenant_id,
|
||||
timeline_id = %timeline_id,
|
||||
lsn = %req.hdr.request_lsn,
|
||||
request_id = %req.hdr.reqid
|
||||
)
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
// We're holding the Handle
|
||||
let effective_request_lsn = match Self::wait_or_get_last_lsn(
|
||||
&shard,
|
||||
req.hdr.request_lsn,
|
||||
req.hdr.not_modified_since,
|
||||
&shard.get_applied_gc_cutoff_lsn(),
|
||||
ctx,
|
||||
)
|
||||
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
|
||||
.await
|
||||
{
|
||||
let res = get_page_context
|
||||
.maybe_instrument(
|
||||
Self::wait_or_get_last_lsn(
|
||||
&shard,
|
||||
req.hdr.request_lsn,
|
||||
req.hdr.not_modified_since,
|
||||
&shard.get_applied_gc_cutoff_lsn(),
|
||||
ctx,
|
||||
),
|
||||
|current_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: current_perf_span,
|
||||
"WAIT_LSN",
|
||||
tenant_id = %tenant_id,
|
||||
timeline_id = %timeline_id,
|
||||
lsn = %req.hdr.request_lsn,
|
||||
request_id = %req.hdr.reqid
|
||||
)
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
let effective_request_lsn = match res {
|
||||
Ok(lsn) => lsn,
|
||||
Err(e) => {
|
||||
return respond_error!(span, e);
|
||||
@@ -954,7 +1045,11 @@ impl PageServerHandler {
|
||||
span,
|
||||
shard: shard.downgrade(),
|
||||
effective_request_lsn,
|
||||
pages: smallvec::smallvec![BatchedGetPageRequest { req, timer }],
|
||||
pages: smallvec::smallvec![BatchedGetPageRequest {
|
||||
req,
|
||||
timer,
|
||||
ctx: get_page_context
|
||||
}],
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -1486,12 +1581,15 @@ impl PageServerHandler {
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
|
||||
{
|
||||
let cancel = self.cancel.clone();
|
||||
let tracing_config = self.conf.tracing.clone();
|
||||
|
||||
let err = loop {
|
||||
let msg = Self::pagestream_read_message(
|
||||
&mut pgb_reader,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&mut timeline_handles,
|
||||
tracing_config.as_ref(),
|
||||
&cancel,
|
||||
ctx,
|
||||
protocol_version,
|
||||
@@ -1625,6 +1723,8 @@ impl PageServerHandler {
|
||||
// Batcher
|
||||
//
|
||||
|
||||
let tracing_config = self.conf.tracing.clone();
|
||||
|
||||
let cancel_batcher = self.cancel.child_token();
|
||||
let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
|
||||
let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| {
|
||||
@@ -1638,6 +1738,7 @@ impl PageServerHandler {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&mut timeline_handles,
|
||||
tracing_config.as_ref(),
|
||||
&cancel_batcher,
|
||||
&ctx,
|
||||
protocol_version,
|
||||
@@ -1976,7 +2077,9 @@ impl PageServerHandler {
|
||||
|
||||
let results = timeline
|
||||
.get_rel_page_at_lsn_batched(
|
||||
requests.iter().map(|p| (&p.req.rel, &p.req.blkno)),
|
||||
requests
|
||||
.iter()
|
||||
.map(|p| (&p.req.rel, &p.req.blkno, p.ctx.attached_child())),
|
||||
effective_lsn,
|
||||
io_concurrency,
|
||||
ctx,
|
||||
|
||||
@@ -31,15 +31,16 @@ use postgres_ffi::{BLCKSZ, Oid, RepOriginId, TimestampTz, TransactionId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use strum::IntoEnumIterator;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, info, trace, warn};
|
||||
use tracing::{debug, info, info_span, trace, warn};
|
||||
use utils::bin_ser::{BeSer, DeserializeError};
|
||||
use utils::logging::PERF_TRACE_TARGET;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::pausable_failpoint;
|
||||
use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
|
||||
|
||||
use super::tenant::{PageReconstructError, Timeline};
|
||||
use crate::aux_file;
|
||||
use crate::context::RequestContext;
|
||||
use crate::context::{RequestContext, RequestContextBuilder};
|
||||
use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::metrics::{
|
||||
RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
|
||||
@@ -209,7 +210,9 @@ impl Timeline {
|
||||
let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
|
||||
let res = self
|
||||
.get_rel_page_at_lsn_batched(
|
||||
pages.iter().map(|(tag, blknum)| (tag, blknum)),
|
||||
pages
|
||||
.iter()
|
||||
.map(|(tag, blknum)| (tag, blknum, ctx.attached_child())),
|
||||
effective_lsn,
|
||||
io_concurrency.clone(),
|
||||
ctx,
|
||||
@@ -248,7 +251,7 @@ impl Timeline {
|
||||
/// The ordering of the returned vec corresponds to the ordering of `pages`.
|
||||
pub(crate) async fn get_rel_page_at_lsn_batched(
|
||||
&self,
|
||||
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber)>,
|
||||
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, RequestContext)>,
|
||||
effective_lsn: Lsn,
|
||||
io_concurrency: IoConcurrency,
|
||||
ctx: &RequestContext,
|
||||
@@ -262,8 +265,11 @@ impl Timeline {
|
||||
let mut result = Vec::with_capacity(pages.len());
|
||||
let result_slots = result.spare_capacity_mut();
|
||||
|
||||
let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[usize; 1]>> = BTreeMap::default();
|
||||
for (response_slot_idx, (tag, blknum)) in pages.enumerate() {
|
||||
let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
|
||||
BTreeMap::default();
|
||||
|
||||
let mut perf_instrument = false;
|
||||
for (response_slot_idx, (tag, blknum, req_ctx)) in pages.enumerate() {
|
||||
if tag.relnode == 0 {
|
||||
result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
|
||||
RelationError::InvalidRelnode.into(),
|
||||
@@ -273,6 +279,7 @@ impl Timeline {
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO: perf span
|
||||
let nblocks = match self
|
||||
.get_rel_size(*tag, Version::Lsn(effective_lsn), ctx)
|
||||
.await
|
||||
@@ -297,8 +304,12 @@ impl Timeline {
|
||||
|
||||
let key = rel_block_to_key(*tag, *blknum);
|
||||
|
||||
if req_ctx.has_perf_span() {
|
||||
perf_instrument = true;
|
||||
}
|
||||
|
||||
let key_slots = keys_slots.entry(key).or_default();
|
||||
key_slots.push(response_slot_idx);
|
||||
key_slots.push((response_slot_idx, req_ctx));
|
||||
}
|
||||
|
||||
let keyspace = {
|
||||
@@ -314,16 +325,36 @@ impl Timeline {
|
||||
acc.to_keyspace()
|
||||
};
|
||||
|
||||
match self
|
||||
.get_vectored(keyspace, effective_lsn, io_concurrency, ctx)
|
||||
.await
|
||||
{
|
||||
let get_vectored_ctx = match perf_instrument {
|
||||
true => RequestContextBuilder::from(ctx)
|
||||
.root_perf_span(|| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
"GET_VECTORED",
|
||||
tenant_id = %self.tenant_shard_id.tenant_id,
|
||||
timeline_id = %self.timeline_id,
|
||||
lsn = %effective_lsn,
|
||||
shard = %self.tenant_shard_id.shard_slug(),
|
||||
)
|
||||
})
|
||||
.attached_child(),
|
||||
false => ctx.attached_child(),
|
||||
};
|
||||
|
||||
let res = get_vectored_ctx
|
||||
.maybe_instrument(
|
||||
self.get_vectored(keyspace, effective_lsn, io_concurrency, &get_vectored_ctx),
|
||||
|current_perf_span| current_perf_span.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(results) => {
|
||||
for (key, res) in results {
|
||||
let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
|
||||
let first_slot = key_slots.next().unwrap();
|
||||
let (first_slot, first_req_ctx) = key_slots.next().unwrap();
|
||||
|
||||
for slot in key_slots {
|
||||
for (slot, req_ctx) in key_slots {
|
||||
let clone = match &res {
|
||||
Ok(buf) => Ok(buf.clone()),
|
||||
Err(err) => Err(match err {
|
||||
@@ -341,17 +372,19 @@ impl Timeline {
|
||||
};
|
||||
|
||||
result_slots[slot].write(clone);
|
||||
req_ctx.perf_follows_from(&get_vectored_ctx);
|
||||
slots_filled += 1;
|
||||
}
|
||||
|
||||
result_slots[first_slot].write(res);
|
||||
first_req_ctx.perf_follows_from(&get_vectored_ctx);
|
||||
slots_filled += 1;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
// this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
|
||||
// (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
|
||||
for slot in keys_slots.values().flatten() {
|
||||
for (slot, req_ctx) in keys_slots.values().flatten() {
|
||||
// this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
|
||||
// but without taking ownership of the GetVectoredError
|
||||
let err = match &err {
|
||||
@@ -383,6 +416,7 @@ impl Timeline {
|
||||
}
|
||||
};
|
||||
|
||||
req_ctx.perf_follows_from(&get_vectored_ctx);
|
||||
result_slots[*slot].write(err);
|
||||
}
|
||||
|
||||
|
||||
@@ -217,9 +217,10 @@ pageserver_runtime!(COMPUTE_REQUEST_RUNTIME, "compute request worker");
|
||||
pageserver_runtime!(MGMT_REQUEST_RUNTIME, "mgmt request worker");
|
||||
pageserver_runtime!(WALRECEIVER_RUNTIME, "walreceiver worker");
|
||||
pageserver_runtime!(BACKGROUND_RUNTIME, "background op worker");
|
||||
pageserver_runtime!(OTEL_RUNTIME, "open telemetry worker");
|
||||
// Bump this number when adding a new pageserver_runtime!
|
||||
// SAFETY: it's obviously correct
|
||||
const NUM_MULTIPLE_RUNTIMES: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(4) };
|
||||
const NUM_MULTIPLE_RUNTIMES: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(5) };
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct PageserverTaskId(u64);
|
||||
|
||||
@@ -5718,9 +5718,10 @@ pub(crate) mod harness {
|
||||
// enable it in case the tests exercise code paths that use
|
||||
// debug_assert_current_span_has_tenant_and_timeline_id
|
||||
logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
|
||||
utils::logging::OtelEnablement::Disabled,
|
||||
logging::Output::Stdout,
|
||||
)
|
||||
.expect("Failed to init test logging")
|
||||
.expect("Failed to init test logging");
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -13,7 +13,6 @@ pub mod merge_iterator;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{BinaryHeap, HashMap};
|
||||
use std::future::Future;
|
||||
use std::ops::Range;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
@@ -34,7 +33,8 @@ use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::value::Value;
|
||||
use tracing::{Instrument, trace};
|
||||
use tracing::{Instrument, info_span, trace};
|
||||
use utils::logging::PERF_TRACE_TARGET;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::sync::gate::GateGuard;
|
||||
|
||||
@@ -43,7 +43,7 @@ use super::PageReconstructError;
|
||||
use super::layer_map::InMemoryLayerDesc;
|
||||
use super::timeline::{GetVectoredError, ReadPath};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{AccessStatsBehavior, RequestContext};
|
||||
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
|
||||
|
||||
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
|
||||
where
|
||||
@@ -874,13 +874,51 @@ impl ReadableLayer {
|
||||
) -> Result<(), GetVectoredError> {
|
||||
match self {
|
||||
ReadableLayer::PersistentLayer(layer) => {
|
||||
layer
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
|
||||
let persistent_context = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"PLAN_LAYER",
|
||||
layer = %layer
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
persistent_context
|
||||
.maybe_instrument(
|
||||
layer.get_values_reconstruct_data(
|
||||
keyspace,
|
||||
lsn_range,
|
||||
reconstruct_state,
|
||||
&persistent_context,
|
||||
),
|
||||
|crnt_perf_span| crnt_perf_span.clone(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
ReadableLayer::InMemoryLayer(layer) => {
|
||||
layer
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
|
||||
let in_mem_context = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"PLAN_LAYER",
|
||||
layer = %layer
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
in_mem_context
|
||||
.maybe_instrument(
|
||||
layer.get_values_reconstruct_data(
|
||||
keyspace,
|
||||
lsn_range,
|
||||
reconstruct_state,
|
||||
&in_mem_context,
|
||||
),
|
||||
|crnt_perf_span| crnt_perf_span.clone(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -896,9 +896,9 @@ impl DeltaLayerInner {
|
||||
where
|
||||
Reader: BlockReader + Clone,
|
||||
{
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
@@ -1105,9 +1105,9 @@ impl DeltaLayerInner {
|
||||
all_keys.push(entry);
|
||||
true
|
||||
},
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
&RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
|
||||
.build(),
|
||||
.attached_child(),
|
||||
)
|
||||
.await?;
|
||||
if let Some(last) = all_keys.last_mut() {
|
||||
|
||||
@@ -481,9 +481,9 @@ impl ImageLayerInner {
|
||||
let tree_reader =
|
||||
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
|
||||
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(PageContentKind::ImageLayerBtreeNode)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
|
||||
@@ -420,9 +420,9 @@ impl InMemoryLayer {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(PageContentKind::InMemoryLayer)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
let inner = self.inner.read().await;
|
||||
|
||||
|
||||
@@ -8,9 +8,10 @@ use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::HistoricLayerInfo;
|
||||
use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId};
|
||||
use tracing::Instrument;
|
||||
use tracing::{Instrument, info_span};
|
||||
use utils::generation::Generation;
|
||||
use utils::id::TimelineId;
|
||||
use utils::logging::PERF_TRACE_TARGET;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::sync::{gate, heavier_once_cell};
|
||||
|
||||
@@ -324,16 +325,29 @@ impl Layer {
|
||||
reconstruct_data: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
let downloaded =
|
||||
self.0
|
||||
.get_or_maybe_download(true, ctx)
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => {
|
||||
GetVectoredError::Cancelled
|
||||
}
|
||||
other => GetVectoredError::Other(anyhow::anyhow!(other)),
|
||||
})?;
|
||||
let get_layer_context = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"GET_LAYER",
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
let downloaded = get_layer_context
|
||||
.maybe_instrument(
|
||||
self.0.get_or_maybe_download(true, &get_layer_context),
|
||||
|crnt_perf_context| crnt_perf_context.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => {
|
||||
GetVectoredError::Cancelled
|
||||
}
|
||||
other => GetVectoredError::Other(anyhow::anyhow!(other)),
|
||||
})?;
|
||||
|
||||
let this = ResidentLayer {
|
||||
downloaded: downloaded.clone(),
|
||||
owner: self.clone(),
|
||||
@@ -341,9 +355,29 @@ impl Layer {
|
||||
|
||||
self.record_access(ctx);
|
||||
|
||||
downloaded
|
||||
.get_values_reconstruct_data(this, keyspace, lsn_range, reconstruct_data, ctx)
|
||||
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
|
||||
let visit_layer_context = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"VISIT_LAYER",
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
visit_layer_context
|
||||
.maybe_instrument(
|
||||
downloaded
|
||||
.get_values_reconstruct_data(
|
||||
this,
|
||||
keyspace,
|
||||
lsn_range,
|
||||
reconstruct_data,
|
||||
&visit_layer_context,
|
||||
)
|
||||
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self)),
|
||||
|crnt_perf_span| crnt_perf_span.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
GetVectoredError::Other(err) => GetVectoredError::Other(
|
||||
@@ -1045,15 +1079,36 @@ impl LayerInner {
|
||||
return Err(DownloadError::DownloadRequired);
|
||||
}
|
||||
|
||||
let download_ctx = ctx.detached_child(TaskKind::LayerDownload, DownloadBehavior::Download);
|
||||
let download_ctx = if ctx.has_perf_span() {
|
||||
let dl_ctx = RequestContextBuilder::from(ctx)
|
||||
.task_kind(TaskKind::LayerDownload)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.root_perf_span(|| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
"DOWNLOAD_LAYER",
|
||||
layer = %self,
|
||||
reason = %reason
|
||||
)
|
||||
})
|
||||
.detached_child();
|
||||
ctx.perf_follows_from(&dl_ctx);
|
||||
dl_ctx
|
||||
} else {
|
||||
ctx.attached_child()
|
||||
};
|
||||
|
||||
async move {
|
||||
tracing::info!(%reason, "downloading on-demand");
|
||||
|
||||
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
|
||||
let res = self
|
||||
.download_init_and_wait(timeline, permit, download_ctx)
|
||||
let res = download_ctx
|
||||
.maybe_instrument(
|
||||
self.download_init_and_wait(timeline, permit, download_ctx.attached_child()),
|
||||
|crnt_perf_span| crnt_perf_span.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
Ok(res)
|
||||
}
|
||||
@@ -1720,9 +1775,9 @@ impl DownloadedLayer {
|
||||
);
|
||||
|
||||
let res = if owner.desc.is_delta {
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(crate::context::PageContentKind::DeltaLayerSummary)
|
||||
.build();
|
||||
.attached_child();
|
||||
let summary = Some(delta_layer::Summary::expected(
|
||||
owner.desc.tenant_shard_id.tenant_id,
|
||||
owner.desc.timeline_id,
|
||||
@@ -1738,9 +1793,9 @@ impl DownloadedLayer {
|
||||
.await
|
||||
.map(LayerKind::Delta)
|
||||
} else {
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(crate::context::PageContentKind::ImageLayerSummary)
|
||||
.build();
|
||||
.attached_child();
|
||||
let lsn = owner.desc.image_layer_lsn();
|
||||
let summary = Some(image_layer::Summary::expected(
|
||||
owner.desc.tenant_shard_id.tenant_id,
|
||||
|
||||
@@ -644,9 +644,9 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
|
||||
.unwrap();
|
||||
|
||||
// This test does downloads
|
||||
let ctx = RequestContextBuilder::extend(&ctx)
|
||||
let ctx = RequestContextBuilder::from(&ctx)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
let layer = {
|
||||
let mut layers = {
|
||||
@@ -729,9 +729,9 @@ async fn evict_and_wait_does_not_wait_for_download() {
|
||||
.unwrap();
|
||||
|
||||
// This test does downloads
|
||||
let ctx = RequestContextBuilder::extend(&ctx)
|
||||
let ctx = RequestContextBuilder::from(&ctx)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
let layer = {
|
||||
let mut layers = {
|
||||
|
||||
@@ -67,6 +67,7 @@ use tracing::*;
|
||||
use utils::generation::Generation;
|
||||
use utils::guard_arc_swap::GuardArcSwap;
|
||||
use utils::id::TimelineId;
|
||||
use utils::logging::PERF_TRACE_TARGET;
|
||||
use utils::lsn::{AtomicLsn, Lsn, RecordLsn};
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
use utils::rate_limit::RateLimit;
|
||||
@@ -94,7 +95,7 @@ use super::{
|
||||
};
|
||||
use crate::aux_file::AuxFileSizeEstimator;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
|
||||
use crate::disk_usage_eviction_task::{DiskUsageEvictionInfo, EvictionCandidate, finite_f32};
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
use crate::l0_flush::{self, L0FlushGlobalState};
|
||||
@@ -1274,9 +1275,28 @@ impl Timeline {
|
||||
};
|
||||
reconstruct_state.read_path = read_path;
|
||||
|
||||
let traversal_res: Result<(), _> = self
|
||||
.get_vectored_reconstruct_data(keyspace.clone(), lsn, reconstruct_state, ctx)
|
||||
let plan_context = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"PLAN_IO",
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
let traversal_res: Result<(), _> = plan_context
|
||||
.maybe_instrument(
|
||||
self.get_vectored_reconstruct_data(
|
||||
keyspace.clone(),
|
||||
lsn,
|
||||
reconstruct_state,
|
||||
&plan_context,
|
||||
),
|
||||
|crnt_perf_span| crnt_perf_span.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Err(err) = traversal_res {
|
||||
// Wait for all the spawned IOs to complete.
|
||||
// See comments on `spawn_io` inside `storage_layer` for more details.
|
||||
@@ -1290,14 +1310,45 @@ impl Timeline {
|
||||
|
||||
let layers_visited = reconstruct_state.get_layers_visited();
|
||||
|
||||
let execute_context = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"RECONSTRUCT",
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
let futs = FuturesUnordered::new();
|
||||
for (key, state) in std::mem::take(&mut reconstruct_state.keys) {
|
||||
futs.push({
|
||||
let walredo_self = self.myself.upgrade().expect("&self method holds the arc");
|
||||
let execute_key_context = RequestContextBuilder::from(&execute_context)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"RECONSTRUCT_KEY",
|
||||
key = %key,
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
async move {
|
||||
assert_eq!(state.situation, ValueReconstructSituation::Complete);
|
||||
|
||||
let converted = match state.collect_pending_ios().await {
|
||||
let res = execute_key_context
|
||||
.maybe_instrument(state.collect_pending_ios(), |crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"EXECUTE_IO",
|
||||
)
|
||||
})
|
||||
.await;
|
||||
|
||||
let converted = match res {
|
||||
Ok(ok) => ok,
|
||||
Err(err) => {
|
||||
return (key, Err(err));
|
||||
@@ -1314,16 +1365,31 @@ impl Timeline {
|
||||
"{converted:?}"
|
||||
);
|
||||
|
||||
(
|
||||
key,
|
||||
walredo_self.reconstruct_value(key, lsn, converted).await,
|
||||
)
|
||||
let walredo_deltas = converted.num_deltas();
|
||||
let walredo_res = execute_key_context
|
||||
.maybe_instrument(
|
||||
walredo_self.reconstruct_value(key, lsn, converted),
|
||||
|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"WALREDO",
|
||||
deltas = %walredo_deltas,
|
||||
)
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
(key, walredo_res)
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let results = futs
|
||||
.collect::<BTreeMap<Key, Result<Bytes, PageReconstructError>>>()
|
||||
let results = execute_context
|
||||
.maybe_instrument(
|
||||
futs.collect::<BTreeMap<Key, Result<Bytes, PageReconstructError>>>(),
|
||||
|crnt_perf_span| crnt_perf_span.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
// For aux file keys (v1 or v2) the vectored read path does not return an error
|
||||
@@ -3795,18 +3861,34 @@ impl Timeline {
|
||||
return Err(GetVectoredError::Cancelled);
|
||||
}
|
||||
|
||||
let plan_context = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"PLAN_IO_TIMELINE",
|
||||
timeline = %timeline.timeline_id,
|
||||
lsn = %cont_lsn,
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
let TimelineVisitOutcome {
|
||||
completed_keyspace: completed,
|
||||
image_covered_keyspace,
|
||||
} = Self::get_vectored_reconstruct_data_timeline(
|
||||
timeline,
|
||||
keyspace.clone(),
|
||||
cont_lsn,
|
||||
reconstruct_state,
|
||||
&self.cancel,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
} = plan_context
|
||||
.maybe_instrument(
|
||||
Self::get_vectored_reconstruct_data_timeline(
|
||||
timeline,
|
||||
keyspace.clone(),
|
||||
cont_lsn,
|
||||
reconstruct_state,
|
||||
&self.cancel,
|
||||
&plan_context,
|
||||
),
|
||||
|crnt_perf_span| crnt_perf_span.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
keyspace.remove_overlapping_with(&completed);
|
||||
|
||||
@@ -3850,8 +3932,26 @@ impl Timeline {
|
||||
|
||||
// Take the min to avoid reconstructing a page with data newer than request Lsn.
|
||||
cont_lsn = std::cmp::min(Lsn(request_lsn.0 + 1), Lsn(timeline.ancestor_lsn.0 + 1));
|
||||
timeline_owned = timeline
|
||||
.get_ready_ancestor_timeline(ancestor_timeline, ctx)
|
||||
|
||||
let get_ancestor_context = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"GET_ANCESTOR",
|
||||
timeline = %timeline.timeline_id,
|
||||
lsn = %cont_lsn,
|
||||
ancestor = %ancestor_timeline.timeline_id,
|
||||
ancestor_lsn = %timeline.ancestor_lsn
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
timeline_owned = get_ancestor_context
|
||||
.maybe_instrument(
|
||||
timeline.get_ready_ancestor_timeline(ancestor_timeline, &get_ancestor_context),
|
||||
|crnt_perf_span| crnt_perf_span.clone(),
|
||||
)
|
||||
.await?;
|
||||
timeline = &*timeline_owned;
|
||||
};
|
||||
@@ -7184,9 +7284,9 @@ mod tests {
|
||||
|
||||
eprintln!("Downloading {layer} and re-generating heatmap");
|
||||
|
||||
let ctx = &RequestContextBuilder::extend(&ctx)
|
||||
let ctx = &RequestContextBuilder::from(&ctx)
|
||||
.download_behavior(crate::context::DownloadBehavior::Download)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
let _resident = layer
|
||||
.download_and_keep_resident(ctx)
|
||||
|
||||
@@ -928,9 +928,9 @@ impl Timeline {
|
||||
{
|
||||
Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
|
||||
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
|
||||
let image_ctx = RequestContextBuilder::extend(ctx)
|
||||
let image_ctx = RequestContextBuilder::from(ctx)
|
||||
.access_stats_behavior(AccessStatsBehavior::Skip)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
let mut partitioning = dense_partitioning;
|
||||
partitioning
|
||||
|
||||
@@ -46,7 +46,8 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
|
||||
.expect("this should be a valid filter directive"),
|
||||
);
|
||||
|
||||
let otlp_layer = tracing_utils::init_tracing("proxy").await;
|
||||
let otlp_layer =
|
||||
tracing_utils::init_tracing("proxy", tracing_utils::ExportConfig::default()).await;
|
||||
|
||||
let json_log_layer = if logfmt == LogFormat::Json {
|
||||
Some(JsonLoggingLayer::new(
|
||||
|
||||
@@ -255,6 +255,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
logging::init(
|
||||
LogFormat::from_config(&args.log_format)?,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
utils::logging::OtelEnablement::Disabled,
|
||||
logging::Output::Stdout,
|
||||
)?;
|
||||
logging::replace_panic_hook_with_tracing_panic_hook().forget();
|
||||
|
||||
@@ -643,6 +643,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
logging::init(
|
||||
LogFormat::from_config(&args.log_format)?,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
utils::logging::OtelEnablement::Disabled,
|
||||
logging::Output::Stdout,
|
||||
)?;
|
||||
logging::replace_panic_hook_with_tracing_panic_hook().forget();
|
||||
|
||||
@@ -232,6 +232,7 @@ fn main() -> anyhow::Result<()> {
|
||||
logging::init(
|
||||
LogFormat::Plain,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
utils::logging::OtelEnablement::Disabled,
|
||||
logging::Output::Stdout,
|
||||
)?;
|
||||
|
||||
|
||||
@@ -376,6 +376,28 @@ class PageserverWalReceiverProtocol(StrEnum):
|
||||
raise ValueError(f"Unknown protocol type: {proto}")
|
||||
|
||||
|
||||
@dataclass
|
||||
class PageserverTracingConfig:
|
||||
sampling_ratio: tuple[int, int]
|
||||
endpoint: str
|
||||
protocol: str
|
||||
timeout: str
|
||||
|
||||
def to_config_key_value(self) -> tuple[str, dict[str, Any]]:
|
||||
value = {
|
||||
"sampling-ratio": {
|
||||
"numerator": self.sampling_ratio[0],
|
||||
"denominator": self.sampling_ratio[1],
|
||||
},
|
||||
"export-config": {
|
||||
"endpoint": self.endpoint,
|
||||
"protocol": self.protocol,
|
||||
"timeout": self.timeout,
|
||||
},
|
||||
}
|
||||
return ("tracing", value)
|
||||
|
||||
|
||||
class NeonEnvBuilder:
|
||||
"""
|
||||
Builder object to create a Neon runtime environment
|
||||
@@ -425,6 +447,7 @@ class NeonEnvBuilder:
|
||||
pageserver_virtual_file_io_mode: str | None = None,
|
||||
pageserver_wal_receiver_protocol: PageserverWalReceiverProtocol | None = None,
|
||||
pageserver_get_vectored_concurrent_io: str | None = None,
|
||||
pageserver_tracing_config: PageserverTracingConfig | None = None,
|
||||
):
|
||||
self.repo_dir = repo_dir
|
||||
self.rust_log_override = rust_log_override
|
||||
@@ -468,6 +491,8 @@ class NeonEnvBuilder:
|
||||
pageserver_get_vectored_concurrent_io
|
||||
)
|
||||
|
||||
self.pageserver_tracing_config = pageserver_tracing_config
|
||||
|
||||
self.pageserver_default_tenant_config_compaction_algorithm: dict[str, Any] | None = (
|
||||
pageserver_default_tenant_config_compaction_algorithm
|
||||
)
|
||||
@@ -1113,6 +1138,7 @@ class NeonEnv:
|
||||
self.pageserver_virtual_file_io_mode = config.pageserver_virtual_file_io_mode
|
||||
self.pageserver_wal_receiver_protocol = config.pageserver_wal_receiver_protocol
|
||||
self.pageserver_get_vectored_concurrent_io = config.pageserver_get_vectored_concurrent_io
|
||||
self.pageserver_tracing_config = config.pageserver_tracing_config
|
||||
|
||||
# Create the neon_local's `NeonLocalInitConf`
|
||||
cfg: dict[str, Any] = {
|
||||
@@ -1225,6 +1251,14 @@ class NeonEnv:
|
||||
if key not in ps_cfg:
|
||||
ps_cfg[key] = value
|
||||
|
||||
if self.pageserver_tracing_config is not None:
|
||||
key, value = self.pageserver_tracing_config.to_config_key_value()
|
||||
|
||||
if key not in ps_cfg:
|
||||
ps_cfg[key] = value
|
||||
|
||||
ps_cfg[key] = value
|
||||
|
||||
# Create a corresponding NeonPageserver object
|
||||
self.pageservers.append(
|
||||
NeonPageserver(self, ps_id, port=pageserver_port, az_id=ps_cfg["availability_zone"])
|
||||
|
||||
@@ -106,6 +106,7 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
|
||||
".*delaying layer flush by \\S+ for compaction backpressure.*",
|
||||
".*stalling layer flushes for compaction backpressure.*",
|
||||
".*layer roll waiting for flush due to compaction backpressure.*",
|
||||
".*BatchSpanProcessor.*",
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PageserverTracingConfig,
|
||||
PgBin,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
@@ -113,6 +114,15 @@ def setup_and_run_pagebench_benchmark(
|
||||
neon_env_builder.pageserver_config_override = (
|
||||
f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}"
|
||||
)
|
||||
|
||||
tracing_config = PageserverTracingConfig(
|
||||
sampling_ratio=(0, 1000),
|
||||
endpoint="http://localhost:4318/v1/traces",
|
||||
protocol="http-binary",
|
||||
timeout="10s",
|
||||
)
|
||||
neon_env_builder.pageserver_tracing_config = tracing_config
|
||||
ratio = tracing_config.sampling_ratio[0] / tracing_config.sampling_ratio[1]
|
||||
params.update(
|
||||
{
|
||||
"pageserver_config_override.page_cache_size": (
|
||||
@@ -120,6 +130,7 @@ def setup_and_run_pagebench_benchmark(
|
||||
{"unit": "byte"},
|
||||
),
|
||||
"pageserver_config_override.max_file_descriptors": (max_file_descriptors, {"unit": ""}),
|
||||
"pageserver_config_override.sampling_ratio": (ratio, {"unit": ""}),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user