From f55bff51ac624fb2bda23dcef64c6f53d440b0d4 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 25 Jul 2023 11:50:27 +0800 Subject: [PATCH] feat: set and retrieve trace id in log macro (#2016) * trace id passed by task local store Signed-off-by: Ruihang Xia * modify log macro Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia * remove tokio::spawn Signed-off-by: Ruihang Xia * use real trace id Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- Cargo.lock | 1 + src/common/telemetry/src/lib.rs | 2 +- src/common/telemetry/src/logging.rs | 24 ++++++++++++++++++++++++ src/common/telemetry/src/macros.rs | 14 ++++++++------ src/mito2/src/worker.rs | 2 +- src/promql/Cargo.toml | 1 + src/promql/src/planner.rs | 1 + src/servers/src/mysql/handler.rs | 13 +++++++++---- src/servers/src/mysql/server.rs | 2 +- src/servers/src/opentsdb.rs | 2 +- src/servers/src/opentsdb/handler.rs | 4 ++-- src/servers/src/postgres/server.rs | 3 +-- 12 files changed, 51 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7a7bc5d95b..9adb2e139b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7035,6 +7035,7 @@ dependencies = [ "common-catalog", "common-error", "common-function-macro", + "common-telemetry", "datafusion", "datatypes", "futures", diff --git a/src/common/telemetry/src/lib.rs b/src/common/telemetry/src/lib.rs index 0c760dd968..91d67b92ce 100644 --- a/src/common/telemetry/src/lib.rs +++ b/src/common/telemetry/src/lib.rs @@ -20,7 +20,7 @@ mod panic_hook; use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; -pub use logging::{init_default_ut_logging, init_global_logging}; +pub use logging::{init_default_ut_logging, init_global_logging, trace_id, TRACE_ID}; pub use metric::init_default_metrics_recorder; use once_cell::sync::OnceCell; pub use panic_hook::set_panic_hook; diff --git a/src/common/telemetry/src/logging.rs b/src/common/telemetry/src/logging.rs index 338d828ee4..2d757ed596 100644 --- a/src/common/telemetry/src/logging.rs +++ b/src/common/telemetry/src/logging.rs @@ -32,6 +32,30 @@ use tracing_subscriber::{filter, EnvFilter, Registry}; pub use crate::{debug, error, info, log, trace, warn}; +tokio::task_local! { + /// Task local trace id. See [trace_id](crate::trace_id) for more details. + pub static TRACE_ID: u64; +} + +/// Get current [TRACE_ID] from tokio [task_local](tokio::task_local) storage. +/// +/// # Usage +/// To set current trace id, wrap your async code like this: +/// ```rust, no_run +/// common_telemetry::TRACE_ID +/// .scope(id, async move { +/// query_handler +/// .do_query(query, self.session.context()) +/// .await +/// }) +/// .await +/// ``` +/// Then all functions called from this stack will be able to retrieve the trace id +/// via this method. +pub fn trace_id() -> Option { + TRACE_ID.try_with(|id| Some(*id)).unwrap_or(None) +} + #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(default)] pub struct LoggingOptions { diff --git a/src/common/telemetry/src/macros.rs b/src/common/telemetry/src/macros.rs index 7f5e38f18f..fed42bdb6d 100644 --- a/src/common/telemetry/src/macros.rs +++ b/src/common/telemetry/src/macros.rs @@ -16,14 +16,16 @@ #[macro_export] macro_rules! log { // log!(target: "my_target", Level::INFO, "a {} event", "log"); - (target: $target:expr, $lvl:expr, $($arg:tt)+) => { - $crate::logging::event!(target: $target, $lvl, $($arg)+) - }; + (target: $target:expr, $lvl:expr, $($arg:tt)+) => {{ + let _trace_id = $crate::trace_id(); + $crate::logging::event!(target: $target, $lvl, trace_id = _trace_id, $($arg)+) + }}; // log!(Level::INFO, "a log event") - ($lvl:expr, $($arg:tt)+) => { - $crate::logging::event!($lvl, $($arg)+) - }; + ($lvl:expr, $($arg:tt)+) => {{ + let _trace_id = $crate::trace_id(); + $crate::logging::event!($lvl, trace_id = _trace_id, $($arg)+) + }}; } /// Logs a message at the error level. diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 838994adf9..c096575623 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -182,7 +182,7 @@ impl RegionWorker { running: running.clone(), memtable_builder: Arc::new(DefaultMemtableBuilder::default()), }; - let handle = common_runtime::spawn_bg(async move { + let handle = common_runtime::spawn_write(async move { worker_thread.run().await; }); diff --git a/src/promql/Cargo.toml b/src/promql/Cargo.toml index c9df5fb376..c594b93351 100644 --- a/src/promql/Cargo.toml +++ b/src/promql/Cargo.toml @@ -21,6 +21,7 @@ prost.workspace = true session = { path = "../session" } snafu = { version = "0.7", features = ["backtraces"] } table = { path = "../table" } +common-telemetry = { path = "../common/telemetry" } [dev-dependencies] tokio.workspace = true diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index dcaaf89124..f79bf95deb 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -113,6 +113,7 @@ impl PromPlanner { table_provider, ctx: PromPlannerContext::from_eval_stmt(&stmt), }; + planner.prom_expr_to_plan(stmt.expr).await } diff --git a/src/servers/src/mysql/handler.rs b/src/servers/src/mysql/handler.rs index e39b3bd143..d833aca555 100644 --- a/src/servers/src/mysql/handler.rs +++ b/src/servers/src/mysql/handler.rs @@ -21,7 +21,7 @@ use async_trait::async_trait; use chrono::{NaiveDate, NaiveDateTime}; use common_error::ext::ErrorExt; use common_query::Output; -use common_telemetry::{error, logging, timer, trace, warn}; +use common_telemetry::{error, info, logging, timer, warn}; use datatypes::prelude::ConcreteDataType; use metrics::increment_counter; use opensrv_mysql::{ @@ -90,17 +90,22 @@ impl MysqlInstanceShim { } async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec> { - trace!("Start executing query: '{}'", query); + let trace_id = query_ctx.trace_id(); + info!("Start executing query: '{}'", query); let start = Instant::now(); let output = if let Some(output) = crate::mysql::federated::check(query, query_ctx.clone()) { vec![Ok(output)] } else { - self.query_handler.do_query(query, query_ctx).await + common_telemetry::TRACE_ID + .scope(trace_id, async move { + self.query_handler.do_query(query, query_ctx).await + }) + .await }; - trace!( + info!( "Finished executing query: '{}', total time costs in microseconds: {}", query, start.elapsed().as_micros() diff --git a/src/servers/src/mysql/server.rs b/src/servers/src/mysql/server.rs index 3a96ca2432..301a082c11 100644 --- a/src/servers/src/mysql/server.rs +++ b/src/servers/src/mysql/server.rs @@ -214,7 +214,7 @@ impl Server for MysqlServer { let (stream, addr) = self.base_server.bind(listening).await?; let io_runtime = self.base_server.io_runtime(); - let join_handle = tokio::spawn(self.accept(io_runtime, stream)); + let join_handle = common_runtime::spawn_read(self.accept(io_runtime, stream)); self.base_server.start_with(join_handle).await?; Ok(addr) } diff --git a/src/servers/src/opentsdb.rs b/src/servers/src/opentsdb.rs index 93860fc45b..7c569cc2ff 100644 --- a/src/servers/src/opentsdb.rs +++ b/src/servers/src/opentsdb.rs @@ -115,7 +115,7 @@ impl Server for OpentsdbServer { let (stream, addr) = self.base_server.bind(listening).await?; let io_runtime = self.base_server.io_runtime(); - let join_handle = tokio::spawn(self.accept(io_runtime, stream)); + let join_handle = common_runtime::spawn_read(self.accept(io_runtime, stream)); self.base_server.start_with(join_handle).await?; Ok(addr) } diff --git a/src/servers/src/opentsdb/handler.rs b/src/servers/src/opentsdb/handler.rs index e374e84606..bc28f18be0 100644 --- a/src/servers/src/opentsdb/handler.rs +++ b/src/servers/src/opentsdb/handler.rs @@ -185,14 +185,14 @@ mod tests { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); - let _handle = tokio::spawn(async move { + let _handle = common_runtime::spawn_read(async move { loop { let (stream, _) = listener.accept().await.unwrap(); let query_handler = query_handler.clone(); let connection = Connection::new(stream); let shutdown = Shutdown::new(notify_shutdown.subscribe()); - let _handle = tokio::spawn(async move { + let _handle = common_runtime::spawn_read(async move { Handler::new(query_handler, connection, shutdown) .run() .await diff --git a/src/servers/src/postgres/server.rs b/src/servers/src/postgres/server.rs index cc83ece444..6418f196c9 100644 --- a/src/servers/src/postgres/server.rs +++ b/src/servers/src/postgres/server.rs @@ -23,7 +23,6 @@ use common_telemetry::{debug, warn}; use futures::StreamExt; use metrics::{decrement_gauge, increment_gauge}; use pgwire::tokio::process_socket; -use tokio; use tokio_rustls::TlsAcceptor; use super::{MakePostgresServerHandler, MakePostgresServerHandlerBuilder}; @@ -128,7 +127,7 @@ impl Server for PostgresServer { .map(|server_conf| Arc::new(TlsAcceptor::from(Arc::new(server_conf)))); let io_runtime = self.base_server.io_runtime(); - let join_handle = tokio::spawn(self.accept(io_runtime, stream, tls_acceptor)); + let join_handle = common_runtime::spawn_read(self.accept(io_runtime, stream, tls_acceptor)); self.base_server.start_with(join_handle).await?; Ok(addr)