feat: set and retrieve trace id in log macro (#2016)

* trace id passed by task local store

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* modify log macro

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove tokio::spawn

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use real trace id

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-07-25 11:50:27 +08:00
committed by GitHub
parent 0fc0f74cd7
commit f55bff51ac
12 changed files with 51 additions and 18 deletions

1
Cargo.lock generated
View File

@@ -7035,6 +7035,7 @@ dependencies = [
"common-catalog",
"common-error",
"common-function-macro",
"common-telemetry",
"datafusion",
"datatypes",
"futures",

View File

@@ -20,7 +20,7 @@ mod panic_hook;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
pub use logging::{init_default_ut_logging, init_global_logging};
pub use logging::{init_default_ut_logging, init_global_logging, trace_id, TRACE_ID};
pub use metric::init_default_metrics_recorder;
use once_cell::sync::OnceCell;
pub use panic_hook::set_panic_hook;

View File

@@ -32,6 +32,30 @@ use tracing_subscriber::{filter, EnvFilter, Registry};
pub use crate::{debug, error, info, log, trace, warn};
tokio::task_local! {
/// Task local trace id. See [trace_id](crate::trace_id) for more details.
pub static TRACE_ID: u64;
}
/// Get current [TRACE_ID] from tokio [task_local](tokio::task_local) storage.
///
/// # Usage
/// To set current trace id, wrap your async code like this:
/// ```rust, no_run
/// common_telemetry::TRACE_ID
/// .scope(id, async move {
/// query_handler
/// .do_query(query, self.session.context())
/// .await
/// })
/// .await
/// ```
/// Then all functions called from this stack will be able to retrieve the trace id
/// via this method.
pub fn trace_id() -> Option<u64> {
TRACE_ID.try_with(|id| Some(*id)).unwrap_or(None)
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct LoggingOptions {

View File

@@ -16,14 +16,16 @@
#[macro_export]
macro_rules! log {
// log!(target: "my_target", Level::INFO, "a {} event", "log");
(target: $target:expr, $lvl:expr, $($arg:tt)+) => {
$crate::logging::event!(target: $target, $lvl, $($arg)+)
};
(target: $target:expr, $lvl:expr, $($arg:tt)+) => {{
let _trace_id = $crate::trace_id();
$crate::logging::event!(target: $target, $lvl, trace_id = _trace_id, $($arg)+)
}};
// log!(Level::INFO, "a log event")
($lvl:expr, $($arg:tt)+) => {
$crate::logging::event!($lvl, $($arg)+)
};
($lvl:expr, $($arg:tt)+) => {{
let _trace_id = $crate::trace_id();
$crate::logging::event!($lvl, trace_id = _trace_id, $($arg)+)
}};
}
/// Logs a message at the error level.

View File

@@ -182,7 +182,7 @@ impl RegionWorker {
running: running.clone(),
memtable_builder: Arc::new(DefaultMemtableBuilder::default()),
};
let handle = common_runtime::spawn_bg(async move {
let handle = common_runtime::spawn_write(async move {
worker_thread.run().await;
});

View File

@@ -21,6 +21,7 @@ prost.workspace = true
session = { path = "../session" }
snafu = { version = "0.7", features = ["backtraces"] }
table = { path = "../table" }
common-telemetry = { path = "../common/telemetry" }
[dev-dependencies]
tokio.workspace = true

View File

@@ -113,6 +113,7 @@ impl PromPlanner {
table_provider,
ctx: PromPlannerContext::from_eval_stmt(&stmt),
};
planner.prom_expr_to_plan(stmt.expr).await
}

View File

@@ -21,7 +21,7 @@ use async_trait::async_trait;
use chrono::{NaiveDate, NaiveDateTime};
use common_error::ext::ErrorExt;
use common_query::Output;
use common_telemetry::{error, logging, timer, trace, warn};
use common_telemetry::{error, info, logging, timer, warn};
use datatypes::prelude::ConcreteDataType;
use metrics::increment_counter;
use opensrv_mysql::{
@@ -90,17 +90,22 @@ impl MysqlInstanceShim {
}
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
trace!("Start executing query: '{}'", query);
let trace_id = query_ctx.trace_id();
info!("Start executing query: '{}'", query);
let start = Instant::now();
let output = if let Some(output) = crate::mysql::federated::check(query, query_ctx.clone())
{
vec![Ok(output)]
} else {
self.query_handler.do_query(query, query_ctx).await
common_telemetry::TRACE_ID
.scope(trace_id, async move {
self.query_handler.do_query(query, query_ctx).await
})
.await
};
trace!(
info!(
"Finished executing query: '{}', total time costs in microseconds: {}",
query,
start.elapsed().as_micros()

View File

@@ -214,7 +214,7 @@ impl Server for MysqlServer {
let (stream, addr) = self.base_server.bind(listening).await?;
let io_runtime = self.base_server.io_runtime();
let join_handle = tokio::spawn(self.accept(io_runtime, stream));
let join_handle = common_runtime::spawn_read(self.accept(io_runtime, stream));
self.base_server.start_with(join_handle).await?;
Ok(addr)
}

View File

@@ -115,7 +115,7 @@ impl Server for OpentsdbServer {
let (stream, addr) = self.base_server.bind(listening).await?;
let io_runtime = self.base_server.io_runtime();
let join_handle = tokio::spawn(self.accept(io_runtime, stream));
let join_handle = common_runtime::spawn_read(self.accept(io_runtime, stream));
self.base_server.start_with(join_handle).await?;
Ok(addr)
}

View File

@@ -185,14 +185,14 @@ mod tests {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let _handle = tokio::spawn(async move {
let _handle = common_runtime::spawn_read(async move {
loop {
let (stream, _) = listener.accept().await.unwrap();
let query_handler = query_handler.clone();
let connection = Connection::new(stream);
let shutdown = Shutdown::new(notify_shutdown.subscribe());
let _handle = tokio::spawn(async move {
let _handle = common_runtime::spawn_read(async move {
Handler::new(query_handler, connection, shutdown)
.run()
.await

View File

@@ -23,7 +23,6 @@ use common_telemetry::{debug, warn};
use futures::StreamExt;
use metrics::{decrement_gauge, increment_gauge};
use pgwire::tokio::process_socket;
use tokio;
use tokio_rustls::TlsAcceptor;
use super::{MakePostgresServerHandler, MakePostgresServerHandlerBuilder};
@@ -128,7 +127,7 @@ impl Server for PostgresServer {
.map(|server_conf| Arc::new(TlsAcceptor::from(Arc::new(server_conf))));
let io_runtime = self.base_server.io_runtime();
let join_handle = tokio::spawn(self.accept(io_runtime, stream, tls_acceptor));
let join_handle = common_runtime::spawn_read(self.accept(io_runtime, stream, tls_acceptor));
self.base_server.start_with(join_handle).await?;
Ok(addr)