feat: organize tracing on query path (#3310)

* feat: organize tracing on query path

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* warp json conversion to TracingContext's methods

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unnecessary .trace()

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/query/src/dist_plan/merge_scan.rs

Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Ruihang Xia
2024-02-18 23:04:57 +08:00
committed by GitHub
parent df6260d525
commit 72cd443ba3
13 changed files with 52 additions and 18 deletions

1
Cargo.lock generated
View File

@@ -2060,6 +2060,7 @@ dependencies = [
"parking_lot 0.12.1",
"prometheus",
"serde",
"serde_json",
"tokio",
"tracing",
"tracing-appender",

View File

@@ -28,6 +28,7 @@ use common_meta::error::{self as meta_error, Result as MetaResult};
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::error;
use common_telemetry::tracing_context::TracingContext;
use prost::Message;
use snafu::{location, Location, OptionExt, ResultExt};
use tokio_stream::StreamExt;
@@ -125,7 +126,12 @@ impl RegionRequester {
let metrics_str = Arc::new(ArcSwapOption::from(None));
let ref_str = metrics_str.clone();
let tracing_context = TracingContext::from_current_span();
let stream = Box::pin(stream!({
let _span = tracing_context.attach(common_telemetry::tracing::info_span!(
"poll_flight_data_stream"
));
while let Some(flight_message) = flight_message_stream.next().await {
let flight_message = flight_message
.map_err(BoxedError::new)

View File

@@ -24,6 +24,7 @@ opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio"] }
parking_lot = { version = "0.12" }
prometheus.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
tracing = "0.1"
tracing-appender = "0.2"

View File

@@ -89,4 +89,17 @@ impl TracingContext {
let context = Propagator::new().extract(fields);
Self(context)
}
/// Convert the tracing context to a JSON string in W3C trace context format.
pub fn to_json(&self) -> String {
serde_json::to_string(&self.to_w3c()).unwrap()
}
/// Create a new tracing context from a JSON string in W3C trace context format.
///
/// Illegal json string will produce an empty tracing context and no error will be reported.
pub fn from_json(json: &str) -> Self {
let fields: W3cTrace = serde_json::from_str(json).unwrap_or_default();
Self::from_w3c(&fields)
}
}

View File

@@ -15,7 +15,7 @@
use common_error::ext::ErrorExt;
use common_meta::instruction::{InstructionReply, SimpleReply};
use common_meta::RegionIdent;
use common_telemetry::warn;
use common_telemetry::{tracing, warn};
use futures_util::future::BoxFuture;
use store_api::region_request::{RegionCloseRequest, RegionRequest};
@@ -23,6 +23,7 @@ use crate::error;
use crate::heartbeat::handler::HandlerContext;
impl HandlerContext {
#[tracing::instrument(skip_all)]
pub(crate) fn handle_close_region_instruction(
self,
region_ident: RegionIdent,

View File

@@ -123,6 +123,7 @@ impl RegionServer {
})
}
#[tracing::instrument(skip_all, fields(request_type = request.request_type()))]
pub async fn handle_request(
&self,
region_id: RegionId,
@@ -226,6 +227,7 @@ impl RegionServerHandler for RegionServer {
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?;
let tracing_context = TracingContext::from_current_span();
let results = if is_parallel {
let join_tasks = requests.into_iter().map(|(region_id, req)| {
let self_to_move = self.clone();
@@ -488,17 +490,11 @@ impl RegionServerInner {
CurrentEngine::EarlyReturn(rows) => return Ok(rows),
};
let engine_type = engine.name();
// Sets corresponding region status to registering/deregistering before the operation.
self.set_region_status_not_ready(region_id, &engine, &region_change);
match engine
.handle_request(region_id, request)
.trace(info_span!(
"RegionEngine::handle_region_request",
engine_type
))
.await
.with_context(|_| HandleRegionRequestSnafu { region_id })
{

View File

@@ -51,6 +51,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::tracing;
use object_store::manager::ObjectStoreManagerRef;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::logstore::LogStore;
@@ -281,6 +282,7 @@ impl RegionEngine for MitoEngine {
MITO_ENGINE_NAME
}
#[tracing::instrument(skip_all)]
async fn handle_request(
&self,
region_id: RegionId,
@@ -293,6 +295,7 @@ impl RegionEngine for MitoEngine {
}
/// Handle substrait query and return a stream of record batches
#[tracing::instrument(skip_all)]
async fn handle_query(
&self,
region_id: RegionId,

View File

@@ -21,7 +21,7 @@ use async_stream::try_stream;
use common_error::ext::BoxedError;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::{debug, error};
use common_telemetry::{debug, error, tracing};
use common_time::range::TimestampRange;
use snafu::ResultExt;
use table::predicate::Predicate;
@@ -286,6 +286,7 @@ impl SeqScan {
}
/// Fetch a batch from the reader and convert it into a record batch.
#[tracing::instrument(skip_all, level = "trace")]
async fn fetch_record_batch(
reader: &mut dyn BatchReader,
mapper: &ProjectionMapper,

View File

@@ -29,7 +29,6 @@ use common_recordbatch::{
DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream,
};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{tracing, warn};
use datafusion::physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time,
};
@@ -158,7 +157,6 @@ impl MergeScanExec {
})
}
#[tracing::instrument(skip_all)]
pub fn to_stream(&self, context: Arc<TaskContext>) -> Result<SendableRecordBatchStream> {
let substrait_plan = self.substrait_plan.to_vec();
let regions = self.regions.clone();
@@ -167,8 +165,7 @@ impl MergeScanExec {
let schema = Self::arrow_schema_to_schema(self.schema())?;
let dbname = context.task_id().unwrap_or_default();
let tracing_context = TracingContext::from_current_span().to_w3c();
let tracing_context = TracingContext::from_json(context.session_id().as_str());
let stream = Box::pin(stream!({
METRIC_MERGE_SCAN_REGIONS.observe(regions.len() as f64);
@@ -179,7 +176,7 @@ impl MergeScanExec {
for region_id in regions {
let request = QueryRequest {
header: Some(RegionRequestHeader {
tracing_context: tracing_context.clone(),
tracing_context: tracing_context.to_w3c(),
dbname: dbname.clone(),
}),
region_id: region_id.into(),

View File

@@ -14,6 +14,7 @@
use std::sync::Arc;
use common_telemetry::tracing_context::TracingContext;
use datafusion::execution::context::{SessionState, TaskContext};
use session::context::QueryContextRef;
@@ -41,9 +42,14 @@ impl QueryEngineContext {
pub fn build_task_ctx(&self) -> Arc<TaskContext> {
let dbname = self.query_ctx.get_db_string();
let state = &self.state;
let tracing_context = TracingContext::from_current_span();
// pass tracing context in session_id
let session_id = tracing_context.to_json();
Arc::new(TaskContext::new(
Some(dbname),
state.session_id().to_string(),
session_id,
state.config().clone(),
state.scalar_functions().clone(),
state.aggregate_functions().clone(),

View File

@@ -18,7 +18,7 @@ use std::task::{Context, Poll};
use arrow_flight::FlightData;
use common_grpc::flight::{FlightEncoder, FlightMessage};
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::tracing::info_span;
use common_telemetry::tracing::{info_span, Instrument};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::warn;
use futures::channel::mpsc;
@@ -66,7 +66,7 @@ impl FlightRecordBatchStream {
return;
}
while let Some(batch_or_err) = recordbatches.next().await {
while let Some(batch_or_err) = recordbatches.next().in_current_span().await {
match batch_or_err {
Ok(recordbatch) => {
if let Err(e) = tx.send(Ok(FlightMessage::Recordbatch(recordbatch))).await {

View File

@@ -92,7 +92,7 @@ impl MysqlInstanceShim {
}
}
#[tracing::instrument(skip_all)]
#[tracing::instrument(skip_all, name = "mysql::do_query")]
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
if let Some(output) =
crate::mysql::federated::check(query, query_ctx.clone(), self.session.clone())
@@ -335,6 +335,7 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
let _ = guard.remove(&stmt_id);
}
#[tracing::instrument(skip_all)]
async fn on_query<'a>(
&'a mut self,
query: &'a str,

View File

@@ -23,6 +23,8 @@ use common_query::error::Result as QueryResult;
use common_query::physical_plan::{Partitioning, PhysicalPlan, PhysicalPlanRef};
use common_recordbatch::error::Result as RecordBatchResult;
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use common_telemetry::tracing::Span;
use common_telemetry::tracing_context::TracingContext;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion_physical_expr::PhysicalSortExpr;
@@ -97,14 +99,18 @@ impl PhysicalPlan for StreamScanAdapter {
fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
context: Arc<TaskContext>,
) -> QueryResult<SendableRecordBatchStream> {
let tracing_context = TracingContext::from_json(context.session_id().as_str());
let span = tracing_context.attach(common_telemetry::tracing::info_span!("stream_adapter"));
let mut stream = self.stream.lock().unwrap();
let stream = stream.take().context(query_error::ExecuteRepeatedlySnafu)?;
let mem_usage_metrics = MemoryUsageMetrics::new(&self.metric, partition);
Ok(Box::pin(StreamWithMetricWrapper {
stream,
metric: mem_usage_metrics,
span,
}))
}
@@ -116,6 +122,7 @@ impl PhysicalPlan for StreamScanAdapter {
pub struct StreamWithMetricWrapper {
stream: SendableRecordBatchStream,
metric: MemoryUsageMetrics,
span: Span,
}
impl Stream for StreamWithMetricWrapper {
@@ -123,6 +130,7 @@ impl Stream for StreamWithMetricWrapper {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let _enter = this.span.enter();
let poll = this.stream.poll_next_unpin(cx);
if let Poll::Ready(Some(Ok(record_batch))) = &poll {
let batch_mem_size = record_batch