PoC: pass opentelemetry trace id to the pageserver basebackup command

This gives cross-service tracing of the basebackup operation that's
part of compute start.
This commit is contained in:
Heikki Linnakangas
2025-03-11 15:30:35 +02:00
parent 2294d52fa7
commit 4203cef955
5 changed files with 46 additions and 0 deletions

2
Cargo.lock generated
View File

@@ -4259,6 +4259,7 @@ dependencies = [
"num-traits",
"num_cpus",
"once_cell",
"opentelemetry",
"pageserver_api",
"pageserver_client",
"pageserver_compaction",
@@ -4301,6 +4302,7 @@ dependencies = [
"tokio-util",
"toml_edit",
"tracing",
"tracing-opentelemetry",
"tracing-utils",
"url",
"utils",

View File

@@ -855,6 +855,23 @@ impl ComputeNode {
info!("Storage auth token not set");
}
let mut traceparams = HashMap::new();
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry::trace::TraceContextExt;
use tracing_opentelemetry::OpenTelemetrySpanExt;
let cx: opentelemetry::Context = tracing::Span::current().context();
let span = cx.span();
let span_context = span.span_context();
info!("span cxt: is_valid: {} is_sampled: {} trace_id: {}",
span_context.is_valid(), span_context.is_sampled(), span_context.trace_id());
TraceContextPropagator::new().inject_context(&cx, &mut traceparams);
config.options(&serde_json::to_string(&traceparams).expect("failed to serialize otel trace params"));
info!("getting basebackup with config {:?}", config);
// Connect to pageserver
let mut client = config.connect(NoTls)?;
let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;

View File

@@ -167,6 +167,9 @@ pub fn init(
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
};
// Initialize OpenTelemetry
let otlp_layer =
tracing_utils::init_tracing_without_runtime("pageserver-x", tracing_utils::ExportConfig::default());
// NB: the order of the with() calls does not matter.
// See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
use tracing_subscriber::prelude::*;
@@ -188,6 +191,7 @@ pub fn init(
};
log_layer.with_filter(rust_log_env_filter())
});
let r = r.with(otlp_layer);
let r = r.with(
TracingEventCountLayer(&TRACING_EVENT_COUNT_METRIC).with_filter(rust_log_env_filter()),

View File

@@ -39,6 +39,7 @@ nix.workspace = true
num_cpus.workspace = true
num-traits.workspace = true
once_cell.workspace = true
opentelemetry.workspace = true
pin-project-lite.workspace = true
postgres_backend.workspace = true
postgres-protocol.workspace = true
@@ -66,6 +67,7 @@ tokio-stream.workspace = true
tokio-util.workspace = true
toml_edit = { workspace = true, features = [ "serde" ] }
tracing.workspace = true
tracing-opentelemetry.workspace = true
tracing-utils.workspace = true
url.workspace = true
walkdir.workspace = true

View File

@@ -2,6 +2,7 @@
//! requests.
use std::borrow::Cow;
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::os::fd::AsRawFd;
use std::str::FromStr;
@@ -2630,6 +2631,26 @@ where
if let Some(app_name) = params.get("application_name") {
Span::current().record("application_name", field::display(app_name));
}
if let Some(options) = params.get("options") {
info!("got options: {options:?}");
match serde_json::from_str::<HashMap<String, String>>(&options) {
Ok(traceoptions) => {
if !traceoptions.is_empty() {
use opentelemetry;
use tracing_opentelemetry::OpenTelemetrySpanExt;
let parent_ctx =
opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&traceoptions));
Span::current().set_parent(parent_ctx);
info!("installed span parent");
}
}
Err(_) => {
error!("could not deserialize 'options', ignoring");
}
}
}
};
Ok(())