diff --git a/src/common/telemetry/src/logging.rs b/src/common/telemetry/src/logging.rs index 20b519b62f..326bcc7f19 100644 --- a/src/common/telemetry/src/logging.rs +++ b/src/common/telemetry/src/logging.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::env; use std::io::IsTerminal; -use std::sync::{Arc, Mutex, Once}; +use std::sync::{Arc, Mutex, Once, RwLock}; use std::time::Duration; use common_base::serde::empty_string_as_default; @@ -28,6 +28,8 @@ use opentelemetry_sdk::propagation::TraceContextPropagator; use opentelemetry_sdk::trace::{Sampler, Tracer}; use opentelemetry_semantic_conventions::resource; use serde::{Deserialize, Serialize}; +use tracing::callsite; +use tracing::metadata::LevelFilter; use tracing_appender::non_blocking::WorkerGuard; use tracing_appender::rolling::{RollingFileAppender, Rotation}; use tracing_log::LogTracer; @@ -52,15 +54,176 @@ pub const DEFAULT_LOGGING_DIR: &str = "logs"; pub static LOG_RELOAD_HANDLE: OnceCell> = OnceCell::new(); -type TraceReloadHandle = tracing_subscriber::reload::Handle< - Vec< - tracing_opentelemetry::OpenTelemetryLayer< - Layered, Registry>, - Tracer, - >, - >, - Layered, Registry>, ->; +type DynSubscriber = Layered, Registry>; +type OtelTraceLayer = tracing_opentelemetry::OpenTelemetryLayer; + +#[derive(Clone)] +pub struct TraceReloadHandle { + inner: Arc>>, +} + +impl TraceReloadHandle { + fn new(inner: Arc>>) -> Self { + Self { inner } + } + + pub fn reload(&self, new_layer: Option) { + let mut guard = self.inner.write().unwrap(); + *guard = new_layer; + drop(guard); + + callsite::rebuild_interest_cache(); + } +} + +/// A tracing layer that can be dynamically reloaded. +/// +/// Mostly copied from [`tracing_subscriber::reload::Layer`]. +struct TraceLayer { + inner: Arc>>, +} + +impl TraceLayer { + fn new(initial: Option) -> (Self, TraceReloadHandle) { + let inner = Arc::new(RwLock::new(initial)); + ( + Self { + inner: inner.clone(), + }, + TraceReloadHandle::new(inner), + ) + } + + fn with_layer(&self, f: impl FnOnce(&OtelTraceLayer) -> R) -> Option { + self.inner + .read() + .ok() + .and_then(|guard| guard.as_ref().map(f)) + } + + fn with_layer_mut(&self, f: impl FnOnce(&mut OtelTraceLayer) -> R) -> Option { + self.inner + .write() + .ok() + .and_then(|mut guard| guard.as_mut().map(f)) + } +} + +impl tracing_subscriber::Layer for TraceLayer { + fn on_register_dispatch(&self, subscriber: &tracing::Dispatch) { + let _ = self.with_layer(|layer| layer.on_register_dispatch(subscriber)); + } + + fn on_layer(&mut self, subscriber: &mut DynSubscriber) { + let _ = self.with_layer_mut(|layer| layer.on_layer(subscriber)); + } + + fn register_callsite( + &self, + metadata: &'static tracing::Metadata<'static>, + ) -> tracing::subscriber::Interest { + self.with_layer(|layer| layer.register_callsite(metadata)) + .unwrap_or_else(tracing::subscriber::Interest::always) + } + + fn enabled( + &self, + metadata: &tracing::Metadata<'_>, + ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>, + ) -> bool { + self.with_layer(|layer| layer.enabled(metadata, ctx)) + .unwrap_or(false) + } + + fn on_new_span( + &self, + attrs: &tracing::span::Attributes<'_>, + id: &tracing::span::Id, + ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>, + ) { + let _ = self.with_layer(|layer| layer.on_new_span(attrs, id, ctx)); + } + + fn max_level_hint(&self) -> Option { + self.with_layer(|layer| layer.max_level_hint()).flatten() + } + + fn on_record( + &self, + span: &tracing::span::Id, + values: &tracing::span::Record<'_>, + ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>, + ) { + let _ = self.with_layer(|layer| layer.on_record(span, values, ctx)); + } + + fn on_follows_from( + &self, + span: &tracing::span::Id, + follows: &tracing::span::Id, + ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>, + ) { + let _ = self.with_layer(|layer| layer.on_follows_from(span, follows, ctx)); + } + + fn event_enabled( + &self, + event: &tracing::Event<'_>, + ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>, + ) -> bool { + self.with_layer(|layer| layer.event_enabled(event, ctx)) + .unwrap_or(false) + } + + fn on_event( + &self, + event: &tracing::Event<'_>, + ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>, + ) { + let _ = self.with_layer(|layer| layer.on_event(event, ctx)); + } + + fn on_enter( + &self, + id: &tracing::span::Id, + ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>, + ) { + let _ = self.with_layer(|layer| layer.on_enter(id, ctx)); + } + + fn on_exit( + &self, + id: &tracing::span::Id, + ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>, + ) { + let _ = self.with_layer(|layer| layer.on_exit(id, ctx)); + } + + fn on_close( + &self, + id: tracing::span::Id, + ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>, + ) { + let _ = self.with_layer(|layer| layer.on_close(id, ctx)); + } + + fn on_id_change( + &self, + old: &tracing::span::Id, + new: &tracing::span::Id, + ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>, + ) { + let _ = self.with_layer(|layer| layer.on_id_change(old, new, ctx)); + } + + unsafe fn downcast_raw(&self, id: std::any::TypeId) -> Option<*const ()> { + self.inner.read().ok().and_then(|guard| { + guard + .as_ref() + .and_then(|layer| unsafe { layer.downcast_raw(id) }) + }) + } +} /// Handle for reloading trace level pub static TRACE_RELOAD_HANDLE: OnceCell = OnceCell::new(); @@ -407,13 +570,11 @@ pub fn init_global_logging( .set(Mutex::new(trace_state)) .expect("trace state already initialized"); - let initial_trace_layers = initial_tracer + let initial_trace_layer = initial_tracer .as_ref() - .map(|tracer| vec![tracing_opentelemetry::layer().with_tracer(tracer.clone())]) - .unwrap_or_else(Vec::new); + .map(|tracer| tracing_opentelemetry::layer().with_tracer(tracer.clone())); - let (dyn_trace_layer, trace_reload_handle) = - tracing_subscriber::reload::Layer::new(initial_trace_layers); + let (dyn_trace_layer, trace_reload_handle) = TraceLayer::new(initial_trace_layer); TRACE_RELOAD_HANDLE .set(trace_reload_handle) diff --git a/src/servers/src/http/dyn_trace.rs b/src/servers/src/http/dyn_trace.rs index 888806b360..dcdb74c56a 100644 --- a/src/servers/src/http/dyn_trace.rs +++ b/src/servers/src/http/dyn_trace.rs @@ -14,7 +14,7 @@ use axum::http::StatusCode; use axum::response::IntoResponse; -use common_telemetry::{TRACE_RELOAD_HANDLE, error, get_or_init_tracer, info}; +use common_telemetry::{TRACE_RELOAD_HANDLE, get_or_init_tracer, info}; use crate::error::{InvalidParameterSnafu, Result}; @@ -43,32 +43,12 @@ pub async fn dyn_trace_handler(enable_str: String) -> Result }; let trace_layer = tracing_opentelemetry::layer().with_tracer(tracer); - match trace_reload_handle.reload(vec![trace_layer]) { - Ok(_) => { - info!("trace enabled"); - Ok((StatusCode::OK, "trace enabled".to_string())) - } - Err(e) => { - error!(e; "Failed to enable trace"); - Ok(( - StatusCode::INTERNAL_SERVER_ERROR, - format!("failed to enable trace: {e}"), - )) - } - } + trace_reload_handle.reload(Some(trace_layer)); + info!("trace enabled"); + Ok((StatusCode::OK, "trace enabled".to_string())) } else { - match trace_reload_handle.reload(vec![]) { - Ok(_) => { - info!("trace disabled"); - Ok((StatusCode::OK, "trace disabled".to_string())) - } - Err(e) => { - error!(e; "Failed to disable trace"); - Ok(( - StatusCode::INTERNAL_SERVER_ERROR, - format!("failed to disable trace: {e}"), - )) - } - } + trace_reload_handle.reload(None); + info!("trace disabled"); + Ok((StatusCode::OK, "trace disabled".to_string())) } }