diff --git a/Cargo.lock b/Cargo.lock index 03e5c25a81..6a660df9c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2060,6 +2060,7 @@ dependencies = [ "parking_lot 0.12.1", "prometheus", "serde", + "serde_json", "tokio", "tracing", "tracing-appender", diff --git a/src/client/src/region.rs b/src/client/src/region.rs index 7c05d93372..7f1181ee59 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -28,6 +28,7 @@ use common_meta::error::{self as meta_error, Result as MetaResult}; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; use common_telemetry::error; +use common_telemetry::tracing_context::TracingContext; use prost::Message; use snafu::{location, Location, OptionExt, ResultExt}; use tokio_stream::StreamExt; @@ -125,7 +126,12 @@ impl RegionRequester { let metrics_str = Arc::new(ArcSwapOption::from(None)); let ref_str = metrics_str.clone(); + let tracing_context = TracingContext::from_current_span(); + let stream = Box::pin(stream!({ + let _span = tracing_context.attach(common_telemetry::tracing::info_span!( + "poll_flight_data_stream" + )); while let Some(flight_message) = flight_message_stream.next().await { let flight_message = flight_message .map_err(BoxedError::new) diff --git a/src/common/telemetry/Cargo.toml b/src/common/telemetry/Cargo.toml index 2d03aa45d1..c6b76a61e5 100644 --- a/src/common/telemetry/Cargo.toml +++ b/src/common/telemetry/Cargo.toml @@ -24,6 +24,7 @@ opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio"] } parking_lot = { version = "0.12" } prometheus.workspace = true serde.workspace = true +serde_json.workspace = true tokio.workspace = true tracing = "0.1" tracing-appender = "0.2" diff --git a/src/common/telemetry/src/tracing_context.rs b/src/common/telemetry/src/tracing_context.rs index b6aed81c85..bf9c3dd916 100644 --- a/src/common/telemetry/src/tracing_context.rs +++ b/src/common/telemetry/src/tracing_context.rs @@ -89,4 +89,17 @@ impl TracingContext { let context = Propagator::new().extract(fields); Self(context) } + + /// Convert the tracing context to a JSON string in W3C trace context format. + pub fn to_json(&self) -> String { + serde_json::to_string(&self.to_w3c()).unwrap() + } + + /// Create a new tracing context from a JSON string in W3C trace context format. + /// + /// Illegal json string will produce an empty tracing context and no error will be reported. + pub fn from_json(json: &str) -> Self { + let fields: W3cTrace = serde_json::from_str(json).unwrap_or_default(); + Self::from_w3c(&fields) + } } diff --git a/src/datanode/src/heartbeat/handler/close_region.rs b/src/datanode/src/heartbeat/handler/close_region.rs index df72240789..eb0e9165d9 100644 --- a/src/datanode/src/heartbeat/handler/close_region.rs +++ b/src/datanode/src/heartbeat/handler/close_region.rs @@ -15,7 +15,7 @@ use common_error::ext::ErrorExt; use common_meta::instruction::{InstructionReply, SimpleReply}; use common_meta::RegionIdent; -use common_telemetry::warn; +use common_telemetry::{tracing, warn}; use futures_util::future::BoxFuture; use store_api::region_request::{RegionCloseRequest, RegionRequest}; @@ -23,6 +23,7 @@ use crate::error; use crate::heartbeat::handler::HandlerContext; impl HandlerContext { + #[tracing::instrument(skip_all)] pub(crate) fn handle_close_region_instruction( self, region_ident: RegionIdent, diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index e798f273e1..ad5b6e13e4 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -123,6 +123,7 @@ impl RegionServer { }) } + #[tracing::instrument(skip_all, fields(request_type = request.request_type()))] pub async fn handle_request( &self, region_id: RegionId, @@ -226,6 +227,7 @@ impl RegionServerHandler for RegionServer { .map_err(BoxedError::new) .context(ExecuteGrpcRequestSnafu)?; let tracing_context = TracingContext::from_current_span(); + let results = if is_parallel { let join_tasks = requests.into_iter().map(|(region_id, req)| { let self_to_move = self.clone(); @@ -488,17 +490,11 @@ impl RegionServerInner { CurrentEngine::EarlyReturn(rows) => return Ok(rows), }; - let engine_type = engine.name(); - // Sets corresponding region status to registering/deregistering before the operation. self.set_region_status_not_ready(region_id, &engine, ®ion_change); match engine .handle_request(region_id, request) - .trace(info_span!( - "RegionEngine::handle_region_request", - engine_type - )) .await .with_context(|_| HandleRegionRequestSnafu { region_id }) { diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index b84be68798..943acdc593 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -51,6 +51,7 @@ use std::sync::Arc; use async_trait::async_trait; use common_error::ext::BoxedError; use common_recordbatch::SendableRecordBatchStream; +use common_telemetry::tracing; use object_store::manager::ObjectStoreManagerRef; use snafu::{ensure, OptionExt, ResultExt}; use store_api::logstore::LogStore; @@ -281,6 +282,7 @@ impl RegionEngine for MitoEngine { MITO_ENGINE_NAME } + #[tracing::instrument(skip_all)] async fn handle_request( &self, region_id: RegionId, @@ -293,6 +295,7 @@ impl RegionEngine for MitoEngine { } /// Handle substrait query and return a stream of record batches + #[tracing::instrument(skip_all)] async fn handle_query( &self, region_id: RegionId, diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index d9595673f4..2093921946 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -21,7 +21,7 @@ use async_stream::try_stream; use common_error::ext::BoxedError; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream}; -use common_telemetry::{debug, error}; +use common_telemetry::{debug, error, tracing}; use common_time::range::TimestampRange; use snafu::ResultExt; use table::predicate::Predicate; @@ -286,6 +286,7 @@ impl SeqScan { } /// Fetch a batch from the reader and convert it into a record batch. + #[tracing::instrument(skip_all, level = "trace")] async fn fetch_record_batch( reader: &mut dyn BatchReader, mapper: &ProjectionMapper, diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index f1d59a5181..d4e4516d1b 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -29,7 +29,6 @@ use common_recordbatch::{ DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream, }; use common_telemetry::tracing_context::TracingContext; -use common_telemetry::{tracing, warn}; use datafusion::physical_plan::metrics::{ Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time, }; @@ -158,7 +157,6 @@ impl MergeScanExec { }) } - #[tracing::instrument(skip_all)] pub fn to_stream(&self, context: Arc) -> Result { let substrait_plan = self.substrait_plan.to_vec(); let regions = self.regions.clone(); @@ -167,8 +165,7 @@ impl MergeScanExec { let schema = Self::arrow_schema_to_schema(self.schema())?; let dbname = context.task_id().unwrap_or_default(); - - let tracing_context = TracingContext::from_current_span().to_w3c(); + let tracing_context = TracingContext::from_json(context.session_id().as_str()); let stream = Box::pin(stream!({ METRIC_MERGE_SCAN_REGIONS.observe(regions.len() as f64); @@ -179,7 +176,7 @@ impl MergeScanExec { for region_id in regions { let request = QueryRequest { header: Some(RegionRequestHeader { - tracing_context: tracing_context.clone(), + tracing_context: tracing_context.to_w3c(), dbname: dbname.clone(), }), region_id: region_id.into(), diff --git a/src/query/src/query_engine/context.rs b/src/query/src/query_engine/context.rs index d5361b4e5f..b90b7d41f6 100644 --- a/src/query/src/query_engine/context.rs +++ b/src/query/src/query_engine/context.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use common_telemetry::tracing_context::TracingContext; use datafusion::execution::context::{SessionState, TaskContext}; use session::context::QueryContextRef; @@ -41,9 +42,14 @@ impl QueryEngineContext { pub fn build_task_ctx(&self) -> Arc { let dbname = self.query_ctx.get_db_string(); let state = &self.state; + let tracing_context = TracingContext::from_current_span(); + + // pass tracing context in session_id + let session_id = tracing_context.to_json(); + Arc::new(TaskContext::new( Some(dbname), - state.session_id().to_string(), + session_id, state.config().clone(), state.scalar_functions().clone(), state.aggregate_functions().clone(), diff --git a/src/servers/src/grpc/flight/stream.rs b/src/servers/src/grpc/flight/stream.rs index 472b287c13..c2d066a73b 100644 --- a/src/servers/src/grpc/flight/stream.rs +++ b/src/servers/src/grpc/flight/stream.rs @@ -18,7 +18,7 @@ use std::task::{Context, Poll}; use arrow_flight::FlightData; use common_grpc::flight::{FlightEncoder, FlightMessage}; use common_recordbatch::SendableRecordBatchStream; -use common_telemetry::tracing::info_span; +use common_telemetry::tracing::{info_span, Instrument}; use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::warn; use futures::channel::mpsc; @@ -66,7 +66,7 @@ impl FlightRecordBatchStream { return; } - while let Some(batch_or_err) = recordbatches.next().await { + while let Some(batch_or_err) = recordbatches.next().in_current_span().await { match batch_or_err { Ok(recordbatch) => { if let Err(e) = tx.send(Ok(FlightMessage::Recordbatch(recordbatch))).await { diff --git a/src/servers/src/mysql/handler.rs b/src/servers/src/mysql/handler.rs index 9c1540dcfa..43a7dcdad9 100644 --- a/src/servers/src/mysql/handler.rs +++ b/src/servers/src/mysql/handler.rs @@ -92,7 +92,7 @@ impl MysqlInstanceShim { } } - #[tracing::instrument(skip_all)] + #[tracing::instrument(skip_all, name = "mysql::do_query")] async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec> { if let Some(output) = crate::mysql::federated::check(query, query_ctx.clone(), self.session.clone()) @@ -335,6 +335,7 @@ impl AsyncMysqlShim for MysqlInstanceShi let _ = guard.remove(&stmt_id); } + #[tracing::instrument(skip_all)] async fn on_query<'a>( &'a mut self, query: &'a str, diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 3bc6ec77ee..45e2782315 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -23,6 +23,8 @@ use common_query::error::Result as QueryResult; use common_query::physical_plan::{Partitioning, PhysicalPlan, PhysicalPlanRef}; use common_recordbatch::error::Result as RecordBatchResult; use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; +use common_telemetry::tracing::Span; +use common_telemetry::tracing_context::TracingContext; use datafusion::execution::context::TaskContext; use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion_physical_expr::PhysicalSortExpr; @@ -97,14 +99,18 @@ impl PhysicalPlan for StreamScanAdapter { fn execute( &self, partition: usize, - _context: Arc, + context: Arc, ) -> QueryResult { + let tracing_context = TracingContext::from_json(context.session_id().as_str()); + let span = tracing_context.attach(common_telemetry::tracing::info_span!("stream_adapter")); + let mut stream = self.stream.lock().unwrap(); let stream = stream.take().context(query_error::ExecuteRepeatedlySnafu)?; let mem_usage_metrics = MemoryUsageMetrics::new(&self.metric, partition); Ok(Box::pin(StreamWithMetricWrapper { stream, metric: mem_usage_metrics, + span, })) } @@ -116,6 +122,7 @@ impl PhysicalPlan for StreamScanAdapter { pub struct StreamWithMetricWrapper { stream: SendableRecordBatchStream, metric: MemoryUsageMetrics, + span: Span, } impl Stream for StreamWithMetricWrapper { @@ -123,6 +130,7 @@ impl Stream for StreamWithMetricWrapper { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); + let _enter = this.span.enter(); let poll = this.stream.poll_next_unpin(cx); if let Poll::Ready(Some(Ok(record_batch))) = &poll { let batch_mem_size = record_batch