diff --git a/src/catalog/src/system_schema.rs b/src/catalog/src/system_schema.rs index 2e1c890427..00b96e5292 100644 --- a/src/catalog/src/system_schema.rs +++ b/src/catalog/src/system_schema.rs @@ -24,6 +24,7 @@ use std::sync::Arc; use common_error::ext::BoxedError; use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; +use common_telemetry::tracing::Span; use datatypes::schema::SchemaRef; use futures_util::StreamExt; use snafu::ResultExt; @@ -163,6 +164,7 @@ impl DataSource for SystemTableDataSource { stream: Box::pin(stream), output_ordering: None, metrics: Default::default(), + span: Span::current(), }; Ok(Box::pin(stream)) diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 239f3fe3f9..6a7ac62fc3 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -37,6 +37,7 @@ use common_grpc::flight::{FlightDecoder, FlightMessage}; use common_query::Output; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper}; +use common_telemetry::tracing::Span; use common_telemetry::tracing_context::W3cTrace; use common_telemetry::{error, warn}; use futures::future; @@ -456,6 +457,7 @@ impl Database { stream, output_ordering: None, metrics: Default::default(), + span: Span::current(), }; Ok(Output::new_with_stream(Box::pin(record_batch_stream))) } diff --git a/src/client/src/region.rs b/src/client/src/region.rs index 3e80b83cec..8eefb16e0d 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -30,6 +30,7 @@ use common_query::request::QueryRequest; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream}; use common_telemetry::error; +use common_telemetry::tracing::Span; use common_telemetry::tracing_context::TracingContext; use prost::Message; use query::query_engine::DefaultSerializer; @@ -242,6 +243,7 @@ impl RegionRequester { stream, output_ordering: None, metrics, + span: Span::current(), }; Ok(Box::pin(record_batch_stream)) } diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs index 7e504559b6..fc12d87dcf 100644 --- a/src/common/recordbatch/src/adapter.rs +++ b/src/common/recordbatch/src/adapter.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use common_base::readable_size::ReadableSize; +use common_telemetry::tracing::{Span, info_span}; use common_time::util::format_nanoseconds_human_readable; use datafusion::arrow::compute::cast; use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; @@ -218,6 +219,7 @@ pub struct RecordBatchStreamAdapter { metrics_2: Metrics, /// Display plan and metrics in verbose mode. explain_verbose: bool, + span: Span, } /// Json encoded metrics. Contains metric from a whole plan tree. @@ -238,22 +240,21 @@ impl RecordBatchStreamAdapter { metrics: None, metrics_2: Metrics::Unavailable, explain_verbose: false, + span: Span::current(), }) } - pub fn try_new_with_metrics_and_df_plan( - stream: DfSendableRecordBatchStream, - metrics: BaselineMetrics, - df_plan: Arc, - ) -> Result { + pub fn try_new_with_span(stream: DfSendableRecordBatchStream, span: Span) -> Result { let schema = Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?); + let subspan = info_span!(parent: &span, "RecordBatchStreamAdapter"); Ok(Self { schema, stream, - metrics: Some(metrics), - metrics_2: Metrics::Unresolved(df_plan), + metrics: None, + metrics_2: Metrics::Unavailable, explain_verbose: false, + span: subspan, }) } @@ -300,6 +301,8 @@ impl Stream for RecordBatchStreamAdapter { .map(|m| m.elapsed_compute().clone()) .unwrap_or_default(); let _guard = timer.timer(); + let poll_span = info_span!(parent: &self.span, "poll_next"); + let _entered = poll_span.enter(); match Pin::new(&mut self.stream).poll_next(cx) { Poll::Pending => Poll::Pending, Poll::Ready(Some(df_record_batch)) => { diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index c1253cfa1c..85e0d5c496 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -29,6 +29,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use adapter::RecordBatchMetrics; use arc_swap::ArcSwapOption; use common_base::readable_size::ReadableSize; +use common_telemetry::tracing::Span; pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; use datatypes::arrow::array::{ArrayRef, AsArray, StringBuilder}; use datatypes::arrow::compute::SortOptions; @@ -370,6 +371,7 @@ pub struct RecordBatchStreamWrapper { pub stream: S, pub output_ordering: Option>, pub metrics: Arc>, + pub span: Span, } impl RecordBatchStreamWrapper { @@ -380,6 +382,7 @@ impl RecordBatchStreamWrapper { stream, output_ordering: None, metrics: Default::default(), + span: Span::current(), } } } @@ -408,6 +411,7 @@ impl> + Unpin> Stream for RecordBatchStream type Item = Result; fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { + let _entered = self.span.clone().entered(); Pin::new(&mut self.stream).poll_next(ctx) } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 3c2479fdb5..1a5f2fad66 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -91,6 +91,7 @@ use sql::statements::tql::Tql; use sqlparser::ast::ObjectName; pub use standalone::StandaloneDatanodeManager; use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM}; +use tracing::Span; use crate::error::{ self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu, @@ -508,6 +509,7 @@ fn attach_timeout(output: Output, mut timeout: Duration) -> Result { stream: s, output_ordering: None, metrics: Default::default(), + span: Span::current(), }; Output::new(OutputData::Stream(Box::pin(stream)), output.meta) } diff --git a/src/frontend/src/instance/prom_store.rs b/src/frontend/src/instance/prom_store.rs index 8a965a2453..9a323eb989 100644 --- a/src/frontend/src/instance/prom_store.rs +++ b/src/frontend/src/instance/prom_store.rs @@ -40,6 +40,7 @@ use servers::query_handler::{ }; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; +use tracing::instrument; use crate::error::{ CatalogSnafu, ExecLogicalPlanSnafu, PromStoreRemoteQueryPlanSnafu, ReadTableSnafu, Result, @@ -78,6 +79,7 @@ fn negotiate_response_type(accepted_response_types: &[i32]) -> ServerResult ServerResult { let OutputData::Stream(stream) = output.data else { unreachable!() @@ -194,6 +196,7 @@ impl PromStoreProtocolHandler for Instance { Ok(output) } + #[instrument(skip_all, fields(table_name))] async fn read( &self, request: ReadRequest, diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index f7fbc43f63..d34922956b 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -97,12 +97,16 @@ impl Datanode for RegionInvoker { } async fn handle_query(&self, request: QueryRequest) -> MetaResult { + let region_id = request.region_id.to_string(); let span = request .header .as_ref() .map(|h| TracingContext::from_w3c(&h.tracing_context)) .unwrap_or_default() - .attach(tracing::info_span!("RegionInvoker::handle_query")); + .attach(tracing::info_span!( + "RegionInvoker::handle_query", + region_id = region_id + )); self.region_server .handle_read(request) .trace(span) diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index a3c818c015..3c0b551741 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -393,6 +393,7 @@ impl MitoEngine { } /// Scans a region. + #[tracing::instrument(skip_all, fields(region_id = %region_id))] fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result { self.inner.scan_region(region_id, request) } @@ -972,6 +973,7 @@ impl EngineInner { } /// Handles the scan `request` and returns a [ScanRegion]. + #[tracing::instrument(skip_all, fields(region_id = %region_id))] fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result { let query_start = Instant::now(); // Reading a region doesn't need to go through the region worker thread. diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index cbc6720515..f6d39ee901 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -24,6 +24,7 @@ use api::v1::SemanticType; use common_error::ext::BoxedError; use common_recordbatch::SendableRecordBatchStream; use common_recordbatch::filter::SimpleFilterEvaluator; +use common_telemetry::tracing::Instrument; use common_telemetry::{debug, error, tracing, warn}; use common_time::range::TimestampRange; use datafusion_common::Column; @@ -319,6 +320,7 @@ impl ScanRegion { } /// Returns a [Scanner] to scan the region. + #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))] pub(crate) async fn scanner(self) -> Result { if self.use_series_scan() { self.series_scan().await.map(Scanner::Series) @@ -332,7 +334,11 @@ impl ScanRegion { } /// Returns a [RegionScanner] to scan the region. - #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] + #[tracing::instrument( + level = tracing::Level::DEBUG, + skip_all, + fields(region_id = %self.region_id()) + )] pub(crate) async fn region_scanner(self) -> Result { if self.use_series_scan() { self.series_scan() @@ -348,18 +354,21 @@ impl ScanRegion { } /// Scan sequentially. + #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))] pub(crate) async fn seq_scan(self) -> Result { let input = self.scan_input().await?.with_compaction(false); Ok(SeqScan::new(input)) } /// Unordered scan. + #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))] pub(crate) async fn unordered_scan(self) -> Result { let input = self.scan_input().await?; Ok(UnorderedScan::new(input)) } /// Scans by series. + #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))] pub(crate) async fn series_scan(self) -> Result { let input = self.scan_input().await?; Ok(SeriesScan::new(input)) @@ -390,6 +399,7 @@ impl ScanRegion { } /// Creates a scan input. + #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))] async fn scan_input(mut self) -> Result { let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new); let time_range = self.build_time_range_predicate(); @@ -907,6 +917,13 @@ impl ScanInput { /// Scans sources in parallel. /// /// # Panics if the input doesn't allow parallel scan. + #[tracing::instrument( + skip(self, sources, semaphore), + fields( + region_id = %self.region_metadata().region_id, + source_count = sources.len() + ) + )] pub(crate) fn create_parallel_sources( &self, sources: Vec, @@ -956,6 +973,13 @@ impl ScanInput { } /// Prunes a file to scan and returns the builder to build readers. + #[tracing::instrument( + skip_all, + fields( + region_id = %self.region_metadata().region_id, + file_id = %file.file_id() + ) + )] pub async fn prune_file( &self, file: &FileHandle, @@ -1021,38 +1045,58 @@ impl ScanInput { } /// Scans the input source in another task and sends batches to the sender. + #[tracing::instrument( + skip(self, input, semaphore, sender), + fields(region_id = %self.region_metadata().region_id) + )] pub(crate) fn spawn_scan_task( &self, mut input: Source, semaphore: Arc, sender: mpsc::Sender>, ) { - common_runtime::spawn_global(async move { - loop { - // We release the permit before sending result to avoid the task waiting on - // the channel with the permit held. - let maybe_batch = { - // Safety: We never close the semaphore. - let _permit = semaphore.acquire().await.unwrap(); - input.next_batch().await - }; - match maybe_batch { - Ok(Some(batch)) => { - let _ = sender.send(Ok(batch)).await; - } - Ok(None) => break, - Err(e) => { - let _ = sender.send(Err(e)).await; - break; + let region_id = self.region_metadata().region_id; + let span = tracing::info_span!( + "ScanInput::parallel_scan_task", + region_id = %region_id, + stream_kind = "batch" + ); + common_runtime::spawn_global( + async move { + loop { + // We release the permit before sending result to avoid the task waiting on + // the channel with the permit held. + let maybe_batch = { + // Safety: We never close the semaphore. + let _permit = semaphore.acquire().await.unwrap(); + input.next_batch().await + }; + match maybe_batch { + Ok(Some(batch)) => { + let _ = sender.send(Ok(batch)).await; + } + Ok(None) => break, + Err(e) => { + let _ = sender.send(Err(e)).await; + break; + } } } } - }); + .instrument(span), + ); } /// Scans flat sources (RecordBatch streams) in parallel. /// /// # Panics if the input doesn't allow parallel scan. + #[tracing::instrument( + skip(self, sources, semaphore), + fields( + region_id = %self.region_metadata().region_id, + source_count = sources.len() + ) + )] pub(crate) fn create_parallel_flat_sources( &self, sources: Vec, @@ -1076,33 +1120,46 @@ impl ScanInput { } /// Spawns a task to scan a flat source (RecordBatch stream) asynchronously. + #[tracing::instrument( + skip(self, input, semaphore, sender), + fields(region_id = %self.region_metadata().region_id) + )] pub(crate) fn spawn_flat_scan_task( &self, mut input: BoxedRecordBatchStream, semaphore: Arc, sender: mpsc::Sender>, ) { - common_runtime::spawn_global(async move { - loop { - // We release the permit before sending result to avoid the task waiting on - // the channel with the permit held. - let maybe_batch = { - // Safety: We never close the semaphore. - let _permit = semaphore.acquire().await.unwrap(); - input.next().await - }; - match maybe_batch { - Some(Ok(batch)) => { - let _ = sender.send(Ok(batch)).await; + let region_id = self.region_metadata().region_id; + let span = tracing::info_span!( + "ScanInput::parallel_scan_task", + region_id = %region_id, + stream_kind = "flat" + ); + common_runtime::spawn_global( + async move { + loop { + // We release the permit before sending result to avoid the task waiting on + // the channel with the permit held. + let maybe_batch = { + // Safety: We never close the semaphore. + let _permit = semaphore.acquire().await.unwrap(); + input.next().await + }; + match maybe_batch { + Some(Ok(batch)) => { + let _ = sender.send(Ok(batch)).await; + } + Some(Err(e)) => { + let _ = sender.send(Err(e)).await; + break; + } + None => break, } - Some(Err(e)) => { - let _ = sender.send(Err(e)).await; - break; - } - None => break, } } - }); + .instrument(span), + ); } pub(crate) fn total_rows(&self) -> usize { diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 64d0f96fa2..0605c37096 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -22,6 +22,7 @@ use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use async_stream::try_stream; +use common_telemetry::tracing; use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time}; use datatypes::arrow::record_batch::RecordBatch; use datatypes::timestamp::timestamp_array_to_primitive; @@ -934,6 +935,15 @@ pub(crate) struct SeriesDistributorMetrics { } /// Scans memtable ranges at `index`. +#[tracing::instrument( + skip_all, + fields( + region_id = %stream_ctx.input.region_metadata().region_id, + file_or_mem_index = %index.index, + row_group_index = %index.row_group_index, + source = "mem" + ) +)] pub(crate) fn scan_mem_ranges( stream_ctx: Arc, part_metrics: PartitionMetrics, @@ -964,6 +974,14 @@ pub(crate) fn scan_mem_ranges( } /// Scans memtable ranges at `index` using flat format that returns RecordBatch. +#[tracing::instrument( + skip_all, + fields( + region_id = %stream_ctx.input.region_metadata().region_id, + row_group_index = %index.index, + source = "mem_flat" + ) +)] pub(crate) fn scan_flat_mem_ranges( stream_ctx: Arc, part_metrics: PartitionMetrics, @@ -1072,6 +1090,14 @@ fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics { } /// Scans file ranges at `index`. +#[tracing::instrument( + skip_all, + fields( + region_id = %stream_ctx.input.region_metadata().region_id, + row_group_index = %index.index, + source = read_type + ) +)] pub(crate) async fn scan_file_ranges( stream_ctx: Arc, part_metrics: PartitionMetrics, @@ -1117,6 +1143,14 @@ pub(crate) async fn scan_file_ranges( } /// Scans file ranges at `index` using flat reader that returns RecordBatch. +#[tracing::instrument( + skip_all, + fields( + region_id = %stream_ctx.input.region_metadata().region_id, + row_group_index = %index.index, + source = read_type + ) +)] pub(crate) async fn scan_flat_file_ranges( stream_ctx: Arc, part_metrics: PartitionMetrics, @@ -1162,6 +1196,10 @@ pub(crate) async fn scan_flat_file_ranges( } /// Build the stream of scanning the input [`FileRange`]s. +#[tracing::instrument( + skip_all, + fields(read_type = read_type, range_count = ranges.len()) +)] pub fn build_file_range_scan_stream( stream_ctx: Arc, part_metrics: PartitionMetrics, @@ -1222,6 +1260,10 @@ pub fn build_file_range_scan_stream( } /// Build the stream of scanning the input [`FileRange`]s using flat reader that returns RecordBatch. +#[tracing::instrument( + skip_all, + fields(read_type = read_type, range_count = ranges.len()) +)] pub fn build_flat_file_range_scan_stream( _stream_ctx: Arc, part_metrics: PartitionMetrics, diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 41f6dc7772..0031ec77af 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -90,6 +90,10 @@ impl SeqScan { /// /// The returned stream is not partitioned and will contains all the data. If want /// partitioned scan, use [`RegionScanner::scan_partition`]. + #[tracing::instrument( + skip_all, + fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id) + )] pub fn build_stream(&self) -> Result { let metrics_set = ExecutionPlanMetricsSet::new(); let streams = (0..self.properties.partitions.len()) @@ -373,6 +377,13 @@ impl SeqScan { ))) } + #[tracing::instrument( + skip_all, + fields( + region_id = %self.stream_ctx.input.mapper.metadata().region_id, + partition = partition + ) + )] fn scan_batch_in_partition( &self, partition: usize, @@ -473,6 +484,13 @@ impl SeqScan { Ok(Box::pin(stream)) } + #[tracing::instrument( + skip_all, + fields( + region_id = %self.stream_ctx.input.mapper.metadata().region_id, + partition = partition + ) + )] fn scan_flat_batch_in_partition( &self, partition: usize, diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs index c485348806..cb18a5fdce 100644 --- a/src/mito2/src/read/series_scan.rs +++ b/src/mito2/src/read/series_scan.rs @@ -22,6 +22,7 @@ use async_stream::try_stream; use common_error::ext::BoxedError; use common_recordbatch::util::ChainedRecordBatchStream; use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; +use common_telemetry::tracing::{self, Instrument}; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::arrow::array::BinaryArray; @@ -91,6 +92,13 @@ impl SeriesScan { } } + #[tracing::instrument( + skip_all, + fields( + region_id = %self.stream_ctx.input.mapper.metadata().region_id, + partition = partition + ) + )] fn scan_partition_impl( &self, ctx: &QueryScanContext, @@ -122,6 +130,13 @@ impl SeriesScan { ))) } + #[tracing::instrument( + skip_all, + fields( + region_id = %self.stream_ctx.input.mapper.metadata().region_id, + partition = partition + ) + )] fn scan_batch_in_partition( &self, ctx: &QueryScanContext, @@ -183,6 +198,10 @@ impl SeriesScan { } /// Starts the distributor if the receiver list is empty. + #[tracing::instrument( + skip(self, metrics_set, metrics_list), + fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id) + )] fn maybe_start_distributor( &self, metrics_set: &ExecutionPlanMetricsSet, @@ -202,14 +221,23 @@ impl SeriesScan { metrics_set: metrics_set.clone(), metrics_list: metrics_list.clone(), }; - common_runtime::spawn_global(async move { - distributor.execute().await; - }); + let region_id = distributor.stream_ctx.input.mapper.metadata().region_id; + let span = tracing::info_span!("SeriesScan::distributor", region_id = %region_id); + common_runtime::spawn_global( + async move { + distributor.execute().await; + } + .instrument(span), + ); *rx_list = receivers; } /// Scans the region and returns a stream. + #[tracing::instrument( + skip_all, + fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id) + )] pub(crate) async fn build_stream(&self) -> Result { let part_num = self.properties.num_partitions(); let metrics_set = ExecutionPlanMetricsSet::default(); @@ -388,6 +416,10 @@ struct SeriesDistributor { impl SeriesDistributor { /// Executes the distributor. + #[tracing::instrument( + skip_all, + fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id) + )] async fn execute(&mut self) { let result = if self.stream_ctx.input.flat_format { self.scan_partitions_flat().await @@ -401,6 +433,10 @@ impl SeriesDistributor { } /// Scans all parts in flat format using FlatSeriesBatchDivider. + #[tracing::instrument( + skip_all, + fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id) + )] async fn scan_partitions_flat(&mut self) -> Result<()> { let part_metrics = new_partition_metrics( &self.stream_ctx, @@ -487,6 +523,10 @@ impl SeriesDistributor { } /// Scans all parts. + #[tracing::instrument( + skip_all, + fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id) + )] async fn scan_partitions(&mut self) -> Result<()> { let part_metrics = new_partition_metrics( &self.stream_ctx, diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index c0a48f60da..ee074ef9d7 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -21,6 +21,7 @@ use std::time::Instant; use async_stream::{stream, try_stream}; use common_error::ext::BoxedError; use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; +use common_telemetry::tracing; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::arrow::record_batch::RecordBatch; @@ -71,6 +72,10 @@ impl UnorderedScan { } /// Scans the region and returns a stream. + #[tracing::instrument( + skip_all, + fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id) + )] pub(crate) async fn build_stream(&self) -> Result { let metrics_set = ExecutionPlanMetricsSet::new(); let part_num = self.properties.num_partitions(); @@ -92,6 +97,13 @@ impl UnorderedScan { } /// Scans a [PartitionRange] by its `identifier` and returns a stream. + #[tracing::instrument( + skip_all, + fields( + region_id = %stream_ctx.input.region_metadata().region_id, + part_range_id = part_range_id + ) + )] fn scan_partition_range( stream_ctx: Arc, part_range_id: usize, @@ -138,6 +150,13 @@ impl UnorderedScan { } /// Scans a [PartitionRange] by its `identifier` and returns a flat stream of RecordBatch. + #[tracing::instrument( + skip_all, + fields( + region_id = %stream_ctx.input.region_metadata().region_id, + part_range_id = part_range_id + ) + )] fn scan_flat_partition_range( stream_ctx: Arc, part_range_id: usize, @@ -214,6 +233,13 @@ impl UnorderedScan { part_metrics } + #[tracing::instrument( + skip_all, + fields( + region_id = %self.stream_ctx.input.mapper.metadata().region_id, + partition = partition + ) + )] fn scan_partition_impl( &self, ctx: &QueryScanContext, @@ -252,6 +278,13 @@ impl UnorderedScan { ))) } + #[tracing::instrument( + skip_all, + fields( + region_id = %self.stream_ctx.input.mapper.metadata().region_id, + partition = partition + ) + )] fn scan_batch_in_partition( &self, partition: usize, @@ -336,6 +369,13 @@ impl UnorderedScan { Ok(Box::pin(stream)) } + #[tracing::instrument( + skip_all, + fields( + region_id = %self.stream_ctx.input.mapper.metadata().region_id, + partition = partition + ) + )] fn scan_flat_batch_in_partition( &self, partition: usize, diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs index 547e67b66d..e36874e97d 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use std::time::Instant; use common_base::range_read::RangeReader; -use common_telemetry::warn; +use common_telemetry::{tracing, warn}; use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate}; use index::bloom_filter::reader::{ BloomFilterReadMetrics, BloomFilterReader, BloomFilterReaderImpl, @@ -193,11 +193,16 @@ impl BloomFilterIndexApplier { /// Row group id existing in the returned result means that the row group is searched. /// Empty ranges means that the row group is searched but no rows are found. /// + /// /// # Arguments /// * `file_id` - The region file ID to apply predicates to /// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads /// * `row_groups` - Iterator of row group lengths and whether to search in the row group /// * `metrics` - Optional mutable reference to collect metrics on demand + #[tracing::instrument( + skip_all, + fields(file_id = %file_id) + )] pub async fn apply( &self, file_id: RegionIndexId, diff --git a/src/mito2/src/sst/index/fulltext_index/applier.rs b/src/mito2/src/sst/index/fulltext_index/applier.rs index 54a8e11e89..5ccf4e57a4 100644 --- a/src/mito2/src/sst/index/fulltext_index/applier.rs +++ b/src/mito2/src/sst/index/fulltext_index/applier.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use std::time::Instant; use common_base::range_read::RangeReader; -use common_telemetry::warn; +use common_telemetry::{tracing, warn}; use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate}; use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReaderImpl}; use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher}; @@ -219,6 +219,10 @@ impl FulltextIndexApplier { /// * `file_id` - The region file ID to apply predicates to /// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads /// * `metrics` - Optional mutable reference to collect metrics on demand + #[tracing::instrument( + skip_all, + fields(file_id = %file_id) + )] pub async fn apply_fine( &self, file_id: RegionIndexId, @@ -354,6 +358,11 @@ impl FulltextIndexApplier { /// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads /// * `row_groups` - Iterator of row group lengths and whether to search in the row group /// * `metrics` - Optional mutable reference to collect metrics on demand + #[allow(clippy::type_complexity)] + #[tracing::instrument( + skip_all, + fields(file_id = %file_id) + )] pub async fn apply_coarse( &self, file_id: RegionIndexId, diff --git a/src/mito2/src/sst/index/inverted_index/applier.rs b/src/mito2/src/sst/index/inverted_index/applier.rs index e4a0f25398..f4c80051dd 100644 --- a/src/mito2/src/sst/index/inverted_index/applier.rs +++ b/src/mito2/src/sst/index/inverted_index/applier.rs @@ -192,6 +192,10 @@ impl InvertedIndexApplier { /// * `file_id` - The region file ID to apply predicates to /// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads /// * `metrics` - Optional mutable reference to collect metrics on demand + #[tracing::instrument( + skip_all, + fields(file_id = %file_id) + )] pub async fn apply( &self, file_id: RegionIndexId, diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index d3c2d65dd0..0484892b21 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -21,7 +21,7 @@ use std::time::{Duration, Instant}; use api::v1::SemanticType; use async_trait::async_trait; use common_recordbatch::filter::SimpleFilterEvaluator; -use common_telemetry::{debug, warn}; +use common_telemetry::{debug, tracing, warn}; use datafusion_expr::Expr; use datatypes::arrow::array::ArrayRef; use datatypes::arrow::error::ArrowError; @@ -249,6 +249,13 @@ impl ParquetReaderBuilder { /// Builds a [ParquetReader]. /// /// This needs to perform IO operation. + #[tracing::instrument( + skip_all, + fields( + region_id = %self.file_handle.region_id(), + file_id = %self.file_handle.file_id() + ) + )] pub async fn build(&self) -> Result { let mut metrics = ReaderMetrics::default(); @@ -259,6 +266,13 @@ impl ParquetReaderBuilder { /// Builds a [FileRangeContext] and collects row groups to read. /// /// This needs to perform IO operation. + #[tracing::instrument( + skip_all, + fields( + region_id = %self.file_handle.region_id(), + file_id = %self.file_handle.file_id() + ) + )] pub(crate) async fn build_reader_input( &self, metrics: &mut ReaderMetrics, @@ -466,6 +480,13 @@ impl ParquetReaderBuilder { } /// Computes row groups to read, along with their respective row selections. + #[tracing::instrument( + skip_all, + fields( + region_id = %self.file_handle.region_id(), + file_id = %self.file_handle.file_id() + ) + )] async fn row_groups_to_read( &self, read_format: &ReadFormat, @@ -1420,6 +1441,13 @@ pub struct ParquetReader { #[async_trait] impl BatchReader for ParquetReader { + #[tracing::instrument( + skip_all, + fields( + region_id = %self.context.reader_builder().file_handle.region_id(), + file_id = %self.context.reader_builder().file_handle.file_id() + ) + )] async fn next_batch(&mut self) -> Result> { let ReaderState::Readable(reader) = &mut self.reader_state else { return Ok(None); @@ -1491,6 +1519,13 @@ impl Drop for ParquetReader { impl ParquetReader { /// Creates a new reader. + #[tracing::instrument( + skip_all, + fields( + region_id = %context.reader_builder().file_handle.region_id(), + file_id = %context.reader_builder().file_handle.file_id() + ) + )] pub(crate) async fn new( context: FileRangeContextRef, mut selection: RowGroupSelection, diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 9c81b8d524..391490aa55 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -48,6 +48,7 @@ use snafu::{OptionExt, ResultExt, ensure}; use sqlparser::ast::AnalyzeFormat; use table::TableRef; use table::requests::{DeleteRequest, InsertRequest}; +use tracing::Span; use crate::analyze::DistAnalyzeExec; pub use crate::datafusion::planner::DfContextProviderAdapter; @@ -606,6 +607,7 @@ impl QueryExecutor for DatafusionQueryEngine { let exec_timer = metrics::EXEC_PLAN_ELAPSED.start_timer(); let task_ctx = ctx.build_task_ctx(); + let span = Span::current(); match plan.properties().output_partitioning().partition_count() { 0 => { @@ -622,7 +624,7 @@ impl QueryExecutor for DatafusionQueryEngine { .context(error::DatafusionSnafu) .map_err(BoxedError::new) .context(QueryExecutionSnafu)?; - let mut stream = RecordBatchStreamAdapter::try_new(df_stream) + let mut stream = RecordBatchStreamAdapter::try_new_with_span(df_stream, span) .context(error::ConvertDfRecordBatchStreamSnafu) .map_err(BoxedError::new) .context(QueryExecutionSnafu)?; @@ -655,7 +657,7 @@ impl QueryExecutor for DatafusionQueryEngine { .context(error::DatafusionSnafu) .map_err(BoxedError::new) .context(QueryExecutionSnafu)?; - let mut stream = RecordBatchStreamAdapter::try_new(df_stream) + let mut stream = RecordBatchStreamAdapter::try_new_with_span(df_stream, span) .context(error::ConvertDfRecordBatchStreamSnafu) .map_err(BoxedError::new) .context(QueryExecutionSnafu)?; diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index a4dd5243a7..9625bfec89 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -47,6 +47,7 @@ use session::context::QueryContextRef; use store_api::storage::RegionId; use table::table_name::TableName; use tokio::time::Instant; +use tracing::{Instrument, Span}; use crate::dist_plan::analyzer::AliasMapping; use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS}; @@ -284,6 +285,12 @@ impl MergeScanExec { .step_by(target_partition) .copied() { + let region_span = tracing_context.attach(tracing::info_span!( + parent: &Span::current(), + "merge_scan_region", + region_id = %region_id, + partition = partition + )); let request = QueryRequest { header: Some(RegionRequestHeader { tracing_context: tracing_context.to_w3c(), @@ -306,6 +313,7 @@ impl MergeScanExec { let mut stream = region_query_handler .do_get(read_preference, request) + .instrument(region_span.clone()) .await .map_err(|e| { MERGE_SCAN_ERRORS_TOTAL.inc(); @@ -317,7 +325,7 @@ impl MergeScanExec { let mut poll_duration = Duration::ZERO; let mut poll_timer = Instant::now(); - while let Some(batch) = stream.next().await { + while let Some(batch) = stream.next().instrument(region_span.clone()).await { let poll_elapsed = poll_timer.elapsed(); poll_duration += poll_elapsed; diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 5db824e995..2723b4b12c 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -339,8 +339,11 @@ impl ExecutionPlan for RegionScanExec { context: Arc, ) -> datafusion_common::Result { let tracing_context = TracingContext::from_json(context.session_id().as_str()); - let span = - tracing_context.attach(common_telemetry::tracing::info_span!("read_from_region")); + let span = tracing_context.attach(common_telemetry::tracing::info_span!( + "read_from_region", + partition = partition + )); + let enter = span.enter(); let ctx = QueryScanContext { explain_verbose: self.explain_verbose, @@ -358,6 +361,7 @@ impl ExecutionPlan for RegionScanExec { stream }; + drop(enter); let stream_metrics = StreamMetrics::new(&self.metric, partition); Ok(Box::pin(StreamWithMetricWrapper { stream,