fix: dynamic reload tracing layer loses trace id (#7257)

* not working

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* works

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Apply suggestions from code review

Co-authored-by: Yingwen <realevenyag@gmail.com>

* clean up

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
Ruihang Xia
2025-11-19 14:16:56 +08:00
committed by GitHub
parent 8b7b5c17c7
commit 5d8819e7af
2 changed files with 183 additions and 42 deletions

View File

@@ -16,7 +16,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::env; use std::env;
use std::io::IsTerminal; use std::io::IsTerminal;
use std::sync::{Arc, Mutex, Once}; use std::sync::{Arc, Mutex, Once, RwLock};
use std::time::Duration; use std::time::Duration;
use common_base::serde::empty_string_as_default; use common_base::serde::empty_string_as_default;
@@ -28,6 +28,8 @@ use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::trace::{Sampler, Tracer}; use opentelemetry_sdk::trace::{Sampler, Tracer};
use opentelemetry_semantic_conventions::resource; use opentelemetry_semantic_conventions::resource;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tracing::callsite;
use tracing::metadata::LevelFilter;
use tracing_appender::non_blocking::WorkerGuard; use tracing_appender::non_blocking::WorkerGuard;
use tracing_appender::rolling::{RollingFileAppender, Rotation}; use tracing_appender::rolling::{RollingFileAppender, Rotation};
use tracing_log::LogTracer; use tracing_log::LogTracer;
@@ -52,15 +54,176 @@ pub const DEFAULT_LOGGING_DIR: &str = "logs";
pub static LOG_RELOAD_HANDLE: OnceCell<tracing_subscriber::reload::Handle<Targets, Registry>> = pub static LOG_RELOAD_HANDLE: OnceCell<tracing_subscriber::reload::Handle<Targets, Registry>> =
OnceCell::new(); OnceCell::new();
type TraceReloadHandle = tracing_subscriber::reload::Handle< type DynSubscriber = Layered<tracing_subscriber::reload::Layer<Targets, Registry>, Registry>;
Vec< type OtelTraceLayer = tracing_opentelemetry::OpenTelemetryLayer<DynSubscriber, Tracer>;
tracing_opentelemetry::OpenTelemetryLayer<
Layered<tracing_subscriber::reload::Layer<Targets, Registry>, Registry>, #[derive(Clone)]
Tracer, pub struct TraceReloadHandle {
>, inner: Arc<RwLock<Option<OtelTraceLayer>>>,
>, }
Layered<tracing_subscriber::reload::Layer<Targets, Registry>, Registry>,
>; impl TraceReloadHandle {
fn new(inner: Arc<RwLock<Option<OtelTraceLayer>>>) -> Self {
Self { inner }
}
pub fn reload(&self, new_layer: Option<OtelTraceLayer>) {
let mut guard = self.inner.write().unwrap();
*guard = new_layer;
drop(guard);
callsite::rebuild_interest_cache();
}
}
/// A tracing layer that can be dynamically reloaded.
///
/// Mostly copied from [`tracing_subscriber::reload::Layer`].
struct TraceLayer {
inner: Arc<RwLock<Option<OtelTraceLayer>>>,
}
impl TraceLayer {
fn new(initial: Option<OtelTraceLayer>) -> (Self, TraceReloadHandle) {
let inner = Arc::new(RwLock::new(initial));
(
Self {
inner: inner.clone(),
},
TraceReloadHandle::new(inner),
)
}
fn with_layer<R>(&self, f: impl FnOnce(&OtelTraceLayer) -> R) -> Option<R> {
self.inner
.read()
.ok()
.and_then(|guard| guard.as_ref().map(f))
}
fn with_layer_mut<R>(&self, f: impl FnOnce(&mut OtelTraceLayer) -> R) -> Option<R> {
self.inner
.write()
.ok()
.and_then(|mut guard| guard.as_mut().map(f))
}
}
impl tracing_subscriber::Layer<DynSubscriber> for TraceLayer {
fn on_register_dispatch(&self, subscriber: &tracing::Dispatch) {
let _ = self.with_layer(|layer| layer.on_register_dispatch(subscriber));
}
fn on_layer(&mut self, subscriber: &mut DynSubscriber) {
let _ = self.with_layer_mut(|layer| layer.on_layer(subscriber));
}
fn register_callsite(
&self,
metadata: &'static tracing::Metadata<'static>,
) -> tracing::subscriber::Interest {
self.with_layer(|layer| layer.register_callsite(metadata))
.unwrap_or_else(tracing::subscriber::Interest::always)
}
fn enabled(
&self,
metadata: &tracing::Metadata<'_>,
ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
) -> bool {
self.with_layer(|layer| layer.enabled(metadata, ctx))
.unwrap_or(false)
}
fn on_new_span(
&self,
attrs: &tracing::span::Attributes<'_>,
id: &tracing::span::Id,
ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
) {
let _ = self.with_layer(|layer| layer.on_new_span(attrs, id, ctx));
}
fn max_level_hint(&self) -> Option<LevelFilter> {
self.with_layer(|layer| layer.max_level_hint()).flatten()
}
fn on_record(
&self,
span: &tracing::span::Id,
values: &tracing::span::Record<'_>,
ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
) {
let _ = self.with_layer(|layer| layer.on_record(span, values, ctx));
}
fn on_follows_from(
&self,
span: &tracing::span::Id,
follows: &tracing::span::Id,
ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
) {
let _ = self.with_layer(|layer| layer.on_follows_from(span, follows, ctx));
}
fn event_enabled(
&self,
event: &tracing::Event<'_>,
ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
) -> bool {
self.with_layer(|layer| layer.event_enabled(event, ctx))
.unwrap_or(false)
}
fn on_event(
&self,
event: &tracing::Event<'_>,
ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
) {
let _ = self.with_layer(|layer| layer.on_event(event, ctx));
}
fn on_enter(
&self,
id: &tracing::span::Id,
ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
) {
let _ = self.with_layer(|layer| layer.on_enter(id, ctx));
}
fn on_exit(
&self,
id: &tracing::span::Id,
ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
) {
let _ = self.with_layer(|layer| layer.on_exit(id, ctx));
}
fn on_close(
&self,
id: tracing::span::Id,
ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
) {
let _ = self.with_layer(|layer| layer.on_close(id, ctx));
}
fn on_id_change(
&self,
old: &tracing::span::Id,
new: &tracing::span::Id,
ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
) {
let _ = self.with_layer(|layer| layer.on_id_change(old, new, ctx));
}
unsafe fn downcast_raw(&self, id: std::any::TypeId) -> Option<*const ()> {
self.inner.read().ok().and_then(|guard| {
guard
.as_ref()
.and_then(|layer| unsafe { layer.downcast_raw(id) })
})
}
}
/// Handle for reloading trace level /// Handle for reloading trace level
pub static TRACE_RELOAD_HANDLE: OnceCell<TraceReloadHandle> = OnceCell::new(); pub static TRACE_RELOAD_HANDLE: OnceCell<TraceReloadHandle> = OnceCell::new();
@@ -407,13 +570,11 @@ pub fn init_global_logging(
.set(Mutex::new(trace_state)) .set(Mutex::new(trace_state))
.expect("trace state already initialized"); .expect("trace state already initialized");
let initial_trace_layers = initial_tracer let initial_trace_layer = initial_tracer
.as_ref() .as_ref()
.map(|tracer| vec![tracing_opentelemetry::layer().with_tracer(tracer.clone())]) .map(|tracer| tracing_opentelemetry::layer().with_tracer(tracer.clone()));
.unwrap_or_else(Vec::new);
let (dyn_trace_layer, trace_reload_handle) = let (dyn_trace_layer, trace_reload_handle) = TraceLayer::new(initial_trace_layer);
tracing_subscriber::reload::Layer::new(initial_trace_layers);
TRACE_RELOAD_HANDLE TRACE_RELOAD_HANDLE
.set(trace_reload_handle) .set(trace_reload_handle)

View File

@@ -14,7 +14,7 @@
use axum::http::StatusCode; use axum::http::StatusCode;
use axum::response::IntoResponse; use axum::response::IntoResponse;
use common_telemetry::{TRACE_RELOAD_HANDLE, error, get_or_init_tracer, info}; use common_telemetry::{TRACE_RELOAD_HANDLE, get_or_init_tracer, info};
use crate::error::{InvalidParameterSnafu, Result}; use crate::error::{InvalidParameterSnafu, Result};
@@ -43,32 +43,12 @@ pub async fn dyn_trace_handler(enable_str: String) -> Result<impl IntoResponse>
}; };
let trace_layer = tracing_opentelemetry::layer().with_tracer(tracer); let trace_layer = tracing_opentelemetry::layer().with_tracer(tracer);
match trace_reload_handle.reload(vec![trace_layer]) { trace_reload_handle.reload(Some(trace_layer));
Ok(_) => { info!("trace enabled");
info!("trace enabled"); Ok((StatusCode::OK, "trace enabled".to_string()))
Ok((StatusCode::OK, "trace enabled".to_string()))
}
Err(e) => {
error!(e; "Failed to enable trace");
Ok((
StatusCode::INTERNAL_SERVER_ERROR,
format!("failed to enable trace: {e}"),
))
}
}
} else { } else {
match trace_reload_handle.reload(vec![]) { trace_reload_handle.reload(None);
Ok(_) => { info!("trace disabled");
info!("trace disabled"); Ok((StatusCode::OK, "trace disabled".to_string()))
Ok((StatusCode::OK, "trace disabled".to_string()))
}
Err(e) => {
error!(e; "Failed to disable trace");
Ok((
StatusCode::INTERNAL_SERVER_ERROR,
format!("failed to disable trace: {e}"),
))
}
}
} }
} }