feat: tune query traces (#7524)

* feat: add partition and region id

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* wip: instrument mito

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* connect region scan span

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* instrument streams

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* tweak

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-01-07 16:11:09 +08:00
committed by GitHub
parent 59867cd5b6
commit d39895a970
21 changed files with 344 additions and 56 deletions

View File

@@ -24,6 +24,7 @@ use std::sync::Arc;
use common_error::ext::BoxedError;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::tracing::Span;
use datatypes::schema::SchemaRef;
use futures_util::StreamExt;
use snafu::ResultExt;
@@ -163,6 +164,7 @@ impl DataSource for SystemTableDataSource {
stream: Box::pin(stream),
output_ordering: None,
metrics: Default::default(),
span: Span::current(),
};
Ok(Box::pin(stream))

View File

@@ -37,6 +37,7 @@ use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_query::Output;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper};
use common_telemetry::tracing::Span;
use common_telemetry::tracing_context::W3cTrace;
use common_telemetry::{error, warn};
use futures::future;
@@ -456,6 +457,7 @@ impl Database {
stream,
output_ordering: None,
metrics: Default::default(),
span: Span::current(),
};
Ok(Output::new_with_stream(Box::pin(record_batch_stream)))
}

View File

@@ -30,6 +30,7 @@ use common_query::request::QueryRequest;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::error;
use common_telemetry::tracing::Span;
use common_telemetry::tracing_context::TracingContext;
use prost::Message;
use query::query_engine::DefaultSerializer;
@@ -242,6 +243,7 @@ impl RegionRequester {
stream,
output_ordering: None,
metrics,
span: Span::current(),
};
Ok(Box::pin(record_batch_stream))
}

View File

@@ -21,6 +21,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use common_base::readable_size::ReadableSize;
use common_telemetry::tracing::{Span, info_span};
use common_time::util::format_nanoseconds_human_readable;
use datafusion::arrow::compute::cast;
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
@@ -218,6 +219,7 @@ pub struct RecordBatchStreamAdapter {
metrics_2: Metrics,
/// Display plan and metrics in verbose mode.
explain_verbose: bool,
span: Span,
}
/// Json encoded metrics. Contains metric from a whole plan tree.
@@ -238,22 +240,21 @@ impl RecordBatchStreamAdapter {
metrics: None,
metrics_2: Metrics::Unavailable,
explain_verbose: false,
span: Span::current(),
})
}
pub fn try_new_with_metrics_and_df_plan(
stream: DfSendableRecordBatchStream,
metrics: BaselineMetrics,
df_plan: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
pub fn try_new_with_span(stream: DfSendableRecordBatchStream, span: Span) -> Result<Self> {
let schema =
Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?);
let subspan = info_span!(parent: &span, "RecordBatchStreamAdapter");
Ok(Self {
schema,
stream,
metrics: Some(metrics),
metrics_2: Metrics::Unresolved(df_plan),
metrics: None,
metrics_2: Metrics::Unavailable,
explain_verbose: false,
span: subspan,
})
}
@@ -300,6 +301,8 @@ impl Stream for RecordBatchStreamAdapter {
.map(|m| m.elapsed_compute().clone())
.unwrap_or_default();
let _guard = timer.timer();
let poll_span = info_span!(parent: &self.span, "poll_next");
let _entered = poll_span.enter();
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(df_record_batch)) => {

View File

@@ -29,6 +29,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use adapter::RecordBatchMetrics;
use arc_swap::ArcSwapOption;
use common_base::readable_size::ReadableSize;
use common_telemetry::tracing::Span;
pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::arrow::array::{ArrayRef, AsArray, StringBuilder};
use datatypes::arrow::compute::SortOptions;
@@ -370,6 +371,7 @@ pub struct RecordBatchStreamWrapper<S> {
pub stream: S,
pub output_ordering: Option<Vec<OrderOption>>,
pub metrics: Arc<ArcSwapOption<RecordBatchMetrics>>,
pub span: Span,
}
impl<S> RecordBatchStreamWrapper<S> {
@@ -380,6 +382,7 @@ impl<S> RecordBatchStreamWrapper<S> {
stream,
output_ordering: None,
metrics: Default::default(),
span: Span::current(),
}
}
}
@@ -408,6 +411,7 @@ impl<S: Stream<Item = Result<RecordBatch>> + Unpin> Stream for RecordBatchStream
type Item = Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let _entered = self.span.clone().entered();
Pin::new(&mut self.stream).poll_next(ctx)
}
}

View File

@@ -91,6 +91,7 @@ use sql::statements::tql::Tql;
use sqlparser::ast::ObjectName;
pub use standalone::StandaloneDatanodeManager;
use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
use tracing::Span;
use crate::error::{
self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
@@ -508,6 +509,7 @@ fn attach_timeout(output: Output, mut timeout: Duration) -> Result<Output> {
stream: s,
output_ordering: None,
metrics: Default::default(),
span: Span::current(),
};
Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
}

View File

@@ -40,6 +40,7 @@ use servers::query_handler::{
};
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use tracing::instrument;
use crate::error::{
CatalogSnafu, ExecLogicalPlanSnafu, PromStoreRemoteQueryPlanSnafu, ReadTableSnafu, Result,
@@ -78,6 +79,7 @@ fn negotiate_response_type(accepted_response_types: &[i32]) -> ServerResult<Resp
Ok(ResponseType::try_from(*response_type).unwrap())
}
#[instrument(skip_all, fields(table_name))]
async fn to_query_result(table_name: &str, output: Output) -> ServerResult<QueryResult> {
let OutputData::Stream(stream) = output.data else {
unreachable!()
@@ -194,6 +196,7 @@ impl PromStoreProtocolHandler for Instance {
Ok(output)
}
#[instrument(skip_all, fields(table_name))]
async fn read(
&self,
request: ReadRequest,

View File

@@ -97,12 +97,16 @@ impl Datanode for RegionInvoker {
}
async fn handle_query(&self, request: QueryRequest) -> MetaResult<SendableRecordBatchStream> {
let region_id = request.region_id.to_string();
let span = request
.header
.as_ref()
.map(|h| TracingContext::from_w3c(&h.tracing_context))
.unwrap_or_default()
.attach(tracing::info_span!("RegionInvoker::handle_query"));
.attach(tracing::info_span!(
"RegionInvoker::handle_query",
region_id = region_id
));
self.region_server
.handle_read(request)
.trace(span)

View File

@@ -393,6 +393,7 @@ impl MitoEngine {
}
/// Scans a region.
#[tracing::instrument(skip_all, fields(region_id = %region_id))]
fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
self.inner.scan_region(region_id, request)
}
@@ -972,6 +973,7 @@ impl EngineInner {
}
/// Handles the scan `request` and returns a [ScanRegion].
#[tracing::instrument(skip_all, fields(region_id = %region_id))]
fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
let query_start = Instant::now();
// Reading a region doesn't need to go through the region worker thread.

View File

@@ -24,6 +24,7 @@ use api::v1::SemanticType;
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
use common_recordbatch::filter::SimpleFilterEvaluator;
use common_telemetry::tracing::Instrument;
use common_telemetry::{debug, error, tracing, warn};
use common_time::range::TimestampRange;
use datafusion_common::Column;
@@ -319,6 +320,7 @@ impl ScanRegion {
}
/// Returns a [Scanner] to scan the region.
#[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
pub(crate) async fn scanner(self) -> Result<Scanner> {
if self.use_series_scan() {
self.series_scan().await.map(Scanner::Series)
@@ -332,7 +334,11 @@ impl ScanRegion {
}
/// Returns a [RegionScanner] to scan the region.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
#[tracing::instrument(
level = tracing::Level::DEBUG,
skip_all,
fields(region_id = %self.region_id())
)]
pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
if self.use_series_scan() {
self.series_scan()
@@ -348,18 +354,21 @@ impl ScanRegion {
}
/// Scan sequentially.
#[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
let input = self.scan_input().await?.with_compaction(false);
Ok(SeqScan::new(input))
}
/// Unordered scan.
#[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
let input = self.scan_input().await?;
Ok(UnorderedScan::new(input))
}
/// Scans by series.
#[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
let input = self.scan_input().await?;
Ok(SeriesScan::new(input))
@@ -390,6 +399,7 @@ impl ScanRegion {
}
/// Creates a scan input.
#[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
async fn scan_input(mut self) -> Result<ScanInput> {
let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
let time_range = self.build_time_range_predicate();
@@ -907,6 +917,13 @@ impl ScanInput {
/// Scans sources in parallel.
///
/// # Panics if the input doesn't allow parallel scan.
#[tracing::instrument(
skip(self, sources, semaphore),
fields(
region_id = %self.region_metadata().region_id,
source_count = sources.len()
)
)]
pub(crate) fn create_parallel_sources(
&self,
sources: Vec<Source>,
@@ -956,6 +973,13 @@ impl ScanInput {
}
/// Prunes a file to scan and returns the builder to build readers.
#[tracing::instrument(
skip_all,
fields(
region_id = %self.region_metadata().region_id,
file_id = %file.file_id()
)
)]
pub async fn prune_file(
&self,
file: &FileHandle,
@@ -1021,38 +1045,58 @@ impl ScanInput {
}
/// Scans the input source in another task and sends batches to the sender.
#[tracing::instrument(
skip(self, input, semaphore, sender),
fields(region_id = %self.region_metadata().region_id)
)]
pub(crate) fn spawn_scan_task(
&self,
mut input: Source,
semaphore: Arc<Semaphore>,
sender: mpsc::Sender<Result<Batch>>,
) {
common_runtime::spawn_global(async move {
loop {
// We release the permit before sending result to avoid the task waiting on
// the channel with the permit held.
let maybe_batch = {
// Safety: We never close the semaphore.
let _permit = semaphore.acquire().await.unwrap();
input.next_batch().await
};
match maybe_batch {
Ok(Some(batch)) => {
let _ = sender.send(Ok(batch)).await;
}
Ok(None) => break,
Err(e) => {
let _ = sender.send(Err(e)).await;
break;
let region_id = self.region_metadata().region_id;
let span = tracing::info_span!(
"ScanInput::parallel_scan_task",
region_id = %region_id,
stream_kind = "batch"
);
common_runtime::spawn_global(
async move {
loop {
// We release the permit before sending result to avoid the task waiting on
// the channel with the permit held.
let maybe_batch = {
// Safety: We never close the semaphore.
let _permit = semaphore.acquire().await.unwrap();
input.next_batch().await
};
match maybe_batch {
Ok(Some(batch)) => {
let _ = sender.send(Ok(batch)).await;
}
Ok(None) => break,
Err(e) => {
let _ = sender.send(Err(e)).await;
break;
}
}
}
}
});
.instrument(span),
);
}
/// Scans flat sources (RecordBatch streams) in parallel.
///
/// # Panics if the input doesn't allow parallel scan.
#[tracing::instrument(
skip(self, sources, semaphore),
fields(
region_id = %self.region_metadata().region_id,
source_count = sources.len()
)
)]
pub(crate) fn create_parallel_flat_sources(
&self,
sources: Vec<BoxedRecordBatchStream>,
@@ -1076,33 +1120,46 @@ impl ScanInput {
}
/// Spawns a task to scan a flat source (RecordBatch stream) asynchronously.
#[tracing::instrument(
skip(self, input, semaphore, sender),
fields(region_id = %self.region_metadata().region_id)
)]
pub(crate) fn spawn_flat_scan_task(
&self,
mut input: BoxedRecordBatchStream,
semaphore: Arc<Semaphore>,
sender: mpsc::Sender<Result<RecordBatch>>,
) {
common_runtime::spawn_global(async move {
loop {
// We release the permit before sending result to avoid the task waiting on
// the channel with the permit held.
let maybe_batch = {
// Safety: We never close the semaphore.
let _permit = semaphore.acquire().await.unwrap();
input.next().await
};
match maybe_batch {
Some(Ok(batch)) => {
let _ = sender.send(Ok(batch)).await;
let region_id = self.region_metadata().region_id;
let span = tracing::info_span!(
"ScanInput::parallel_scan_task",
region_id = %region_id,
stream_kind = "flat"
);
common_runtime::spawn_global(
async move {
loop {
// We release the permit before sending result to avoid the task waiting on
// the channel with the permit held.
let maybe_batch = {
// Safety: We never close the semaphore.
let _permit = semaphore.acquire().await.unwrap();
input.next().await
};
match maybe_batch {
Some(Ok(batch)) => {
let _ = sender.send(Ok(batch)).await;
}
Some(Err(e)) => {
let _ = sender.send(Err(e)).await;
break;
}
None => break,
}
Some(Err(e)) => {
let _ = sender.send(Err(e)).await;
break;
}
None => break,
}
}
});
.instrument(span),
);
}
pub(crate) fn total_rows(&self) -> usize {

View File

@@ -22,6 +22,7 @@ use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use async_stream::try_stream;
use common_telemetry::tracing;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::timestamp::timestamp_array_to_primitive;
@@ -934,6 +935,15 @@ pub(crate) struct SeriesDistributorMetrics {
}
/// Scans memtable ranges at `index`.
#[tracing::instrument(
skip_all,
fields(
region_id = %stream_ctx.input.region_metadata().region_id,
file_or_mem_index = %index.index,
row_group_index = %index.row_group_index,
source = "mem"
)
)]
pub(crate) fn scan_mem_ranges(
stream_ctx: Arc<StreamContext>,
part_metrics: PartitionMetrics,
@@ -964,6 +974,14 @@ pub(crate) fn scan_mem_ranges(
}
/// Scans memtable ranges at `index` using flat format that returns RecordBatch.
#[tracing::instrument(
skip_all,
fields(
region_id = %stream_ctx.input.region_metadata().region_id,
row_group_index = %index.index,
source = "mem_flat"
)
)]
pub(crate) fn scan_flat_mem_ranges(
stream_ctx: Arc<StreamContext>,
part_metrics: PartitionMetrics,
@@ -1072,6 +1090,14 @@ fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics {
}
/// Scans file ranges at `index`.
#[tracing::instrument(
skip_all,
fields(
region_id = %stream_ctx.input.region_metadata().region_id,
row_group_index = %index.index,
source = read_type
)
)]
pub(crate) async fn scan_file_ranges(
stream_ctx: Arc<StreamContext>,
part_metrics: PartitionMetrics,
@@ -1117,6 +1143,14 @@ pub(crate) async fn scan_file_ranges(
}
/// Scans file ranges at `index` using flat reader that returns RecordBatch.
#[tracing::instrument(
skip_all,
fields(
region_id = %stream_ctx.input.region_metadata().region_id,
row_group_index = %index.index,
source = read_type
)
)]
pub(crate) async fn scan_flat_file_ranges(
stream_ctx: Arc<StreamContext>,
part_metrics: PartitionMetrics,
@@ -1162,6 +1196,10 @@ pub(crate) async fn scan_flat_file_ranges(
}
/// Build the stream of scanning the input [`FileRange`]s.
#[tracing::instrument(
skip_all,
fields(read_type = read_type, range_count = ranges.len())
)]
pub fn build_file_range_scan_stream(
stream_ctx: Arc<StreamContext>,
part_metrics: PartitionMetrics,
@@ -1222,6 +1260,10 @@ pub fn build_file_range_scan_stream(
}
/// Build the stream of scanning the input [`FileRange`]s using flat reader that returns RecordBatch.
#[tracing::instrument(
skip_all,
fields(read_type = read_type, range_count = ranges.len())
)]
pub fn build_flat_file_range_scan_stream(
_stream_ctx: Arc<StreamContext>,
part_metrics: PartitionMetrics,

View File

@@ -90,6 +90,10 @@ impl SeqScan {
///
/// The returned stream is not partitioned and will contains all the data. If want
/// partitioned scan, use [`RegionScanner::scan_partition`].
#[tracing::instrument(
skip_all,
fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
)]
pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
let metrics_set = ExecutionPlanMetricsSet::new();
let streams = (0..self.properties.partitions.len())
@@ -373,6 +377,13 @@ impl SeqScan {
)))
}
#[tracing::instrument(
skip_all,
fields(
region_id = %self.stream_ctx.input.mapper.metadata().region_id,
partition = partition
)
)]
fn scan_batch_in_partition(
&self,
partition: usize,
@@ -473,6 +484,13 @@ impl SeqScan {
Ok(Box::pin(stream))
}
#[tracing::instrument(
skip_all,
fields(
region_id = %self.stream_ctx.input.mapper.metadata().region_id,
partition = partition
)
)]
fn scan_flat_batch_in_partition(
&self,
partition: usize,

View File

@@ -22,6 +22,7 @@ use async_stream::try_stream;
use common_error::ext::BoxedError;
use common_recordbatch::util::ChainedRecordBatchStream;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::tracing::{self, Instrument};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::arrow::array::BinaryArray;
@@ -91,6 +92,13 @@ impl SeriesScan {
}
}
#[tracing::instrument(
skip_all,
fields(
region_id = %self.stream_ctx.input.mapper.metadata().region_id,
partition = partition
)
)]
fn scan_partition_impl(
&self,
ctx: &QueryScanContext,
@@ -122,6 +130,13 @@ impl SeriesScan {
)))
}
#[tracing::instrument(
skip_all,
fields(
region_id = %self.stream_ctx.input.mapper.metadata().region_id,
partition = partition
)
)]
fn scan_batch_in_partition(
&self,
ctx: &QueryScanContext,
@@ -183,6 +198,10 @@ impl SeriesScan {
}
/// Starts the distributor if the receiver list is empty.
#[tracing::instrument(
skip(self, metrics_set, metrics_list),
fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
)]
fn maybe_start_distributor(
&self,
metrics_set: &ExecutionPlanMetricsSet,
@@ -202,14 +221,23 @@ impl SeriesScan {
metrics_set: metrics_set.clone(),
metrics_list: metrics_list.clone(),
};
common_runtime::spawn_global(async move {
distributor.execute().await;
});
let region_id = distributor.stream_ctx.input.mapper.metadata().region_id;
let span = tracing::info_span!("SeriesScan::distributor", region_id = %region_id);
common_runtime::spawn_global(
async move {
distributor.execute().await;
}
.instrument(span),
);
*rx_list = receivers;
}
/// Scans the region and returns a stream.
#[tracing::instrument(
skip_all,
fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
)]
pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
let part_num = self.properties.num_partitions();
let metrics_set = ExecutionPlanMetricsSet::default();
@@ -388,6 +416,10 @@ struct SeriesDistributor {
impl SeriesDistributor {
/// Executes the distributor.
#[tracing::instrument(
skip_all,
fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
)]
async fn execute(&mut self) {
let result = if self.stream_ctx.input.flat_format {
self.scan_partitions_flat().await
@@ -401,6 +433,10 @@ impl SeriesDistributor {
}
/// Scans all parts in flat format using FlatSeriesBatchDivider.
#[tracing::instrument(
skip_all,
fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
)]
async fn scan_partitions_flat(&mut self) -> Result<()> {
let part_metrics = new_partition_metrics(
&self.stream_ctx,
@@ -487,6 +523,10 @@ impl SeriesDistributor {
}
/// Scans all parts.
#[tracing::instrument(
skip_all,
fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
)]
async fn scan_partitions(&mut self) -> Result<()> {
let part_metrics = new_partition_metrics(
&self.stream_ctx,

View File

@@ -21,6 +21,7 @@ use std::time::Instant;
use async_stream::{stream, try_stream};
use common_error::ext::BoxedError;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::tracing;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::arrow::record_batch::RecordBatch;
@@ -71,6 +72,10 @@ impl UnorderedScan {
}
/// Scans the region and returns a stream.
#[tracing::instrument(
skip_all,
fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
)]
pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
let metrics_set = ExecutionPlanMetricsSet::new();
let part_num = self.properties.num_partitions();
@@ -92,6 +97,13 @@ impl UnorderedScan {
}
/// Scans a [PartitionRange] by its `identifier` and returns a stream.
#[tracing::instrument(
skip_all,
fields(
region_id = %stream_ctx.input.region_metadata().region_id,
part_range_id = part_range_id
)
)]
fn scan_partition_range(
stream_ctx: Arc<StreamContext>,
part_range_id: usize,
@@ -138,6 +150,13 @@ impl UnorderedScan {
}
/// Scans a [PartitionRange] by its `identifier` and returns a flat stream of RecordBatch.
#[tracing::instrument(
skip_all,
fields(
region_id = %stream_ctx.input.region_metadata().region_id,
part_range_id = part_range_id
)
)]
fn scan_flat_partition_range(
stream_ctx: Arc<StreamContext>,
part_range_id: usize,
@@ -214,6 +233,13 @@ impl UnorderedScan {
part_metrics
}
#[tracing::instrument(
skip_all,
fields(
region_id = %self.stream_ctx.input.mapper.metadata().region_id,
partition = partition
)
)]
fn scan_partition_impl(
&self,
ctx: &QueryScanContext,
@@ -252,6 +278,13 @@ impl UnorderedScan {
)))
}
#[tracing::instrument(
skip_all,
fields(
region_id = %self.stream_ctx.input.mapper.metadata().region_id,
partition = partition
)
)]
fn scan_batch_in_partition(
&self,
partition: usize,
@@ -336,6 +369,13 @@ impl UnorderedScan {
Ok(Box::pin(stream))
}
#[tracing::instrument(
skip_all,
fields(
region_id = %self.stream_ctx.input.mapper.metadata().region_id,
partition = partition
)
)]
fn scan_flat_batch_in_partition(
&self,
partition: usize,

View File

@@ -20,7 +20,7 @@ use std::sync::Arc;
use std::time::Instant;
use common_base::range_read::RangeReader;
use common_telemetry::warn;
use common_telemetry::{tracing, warn};
use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
use index::bloom_filter::reader::{
BloomFilterReadMetrics, BloomFilterReader, BloomFilterReaderImpl,
@@ -193,11 +193,16 @@ impl BloomFilterIndexApplier {
/// Row group id existing in the returned result means that the row group is searched.
/// Empty ranges means that the row group is searched but no rows are found.
///
///
/// # Arguments
/// * `file_id` - The region file ID to apply predicates to
/// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
/// * `row_groups` - Iterator of row group lengths and whether to search in the row group
/// * `metrics` - Optional mutable reference to collect metrics on demand
#[tracing::instrument(
skip_all,
fields(file_id = %file_id)
)]
pub async fn apply(
&self,
file_id: RegionIndexId,

View File

@@ -19,7 +19,7 @@ use std::sync::Arc;
use std::time::Instant;
use common_base::range_read::RangeReader;
use common_telemetry::warn;
use common_telemetry::{tracing, warn};
use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReaderImpl};
use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
@@ -219,6 +219,10 @@ impl FulltextIndexApplier {
/// * `file_id` - The region file ID to apply predicates to
/// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
/// * `metrics` - Optional mutable reference to collect metrics on demand
#[tracing::instrument(
skip_all,
fields(file_id = %file_id)
)]
pub async fn apply_fine(
&self,
file_id: RegionIndexId,
@@ -354,6 +358,11 @@ impl FulltextIndexApplier {
/// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
/// * `row_groups` - Iterator of row group lengths and whether to search in the row group
/// * `metrics` - Optional mutable reference to collect metrics on demand
#[allow(clippy::type_complexity)]
#[tracing::instrument(
skip_all,
fields(file_id = %file_id)
)]
pub async fn apply_coarse(
&self,
file_id: RegionIndexId,

View File

@@ -192,6 +192,10 @@ impl InvertedIndexApplier {
/// * `file_id` - The region file ID to apply predicates to
/// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
/// * `metrics` - Optional mutable reference to collect metrics on demand
#[tracing::instrument(
skip_all,
fields(file_id = %file_id)
)]
pub async fn apply(
&self,
file_id: RegionIndexId,

View File

@@ -21,7 +21,7 @@ use std::time::{Duration, Instant};
use api::v1::SemanticType;
use async_trait::async_trait;
use common_recordbatch::filter::SimpleFilterEvaluator;
use common_telemetry::{debug, warn};
use common_telemetry::{debug, tracing, warn};
use datafusion_expr::Expr;
use datatypes::arrow::array::ArrayRef;
use datatypes::arrow::error::ArrowError;
@@ -249,6 +249,13 @@ impl ParquetReaderBuilder {
/// Builds a [ParquetReader].
///
/// This needs to perform IO operation.
#[tracing::instrument(
skip_all,
fields(
region_id = %self.file_handle.region_id(),
file_id = %self.file_handle.file_id()
)
)]
pub async fn build(&self) -> Result<ParquetReader> {
let mut metrics = ReaderMetrics::default();
@@ -259,6 +266,13 @@ impl ParquetReaderBuilder {
/// Builds a [FileRangeContext] and collects row groups to read.
///
/// This needs to perform IO operation.
#[tracing::instrument(
skip_all,
fields(
region_id = %self.file_handle.region_id(),
file_id = %self.file_handle.file_id()
)
)]
pub(crate) async fn build_reader_input(
&self,
metrics: &mut ReaderMetrics,
@@ -466,6 +480,13 @@ impl ParquetReaderBuilder {
}
/// Computes row groups to read, along with their respective row selections.
#[tracing::instrument(
skip_all,
fields(
region_id = %self.file_handle.region_id(),
file_id = %self.file_handle.file_id()
)
)]
async fn row_groups_to_read(
&self,
read_format: &ReadFormat,
@@ -1420,6 +1441,13 @@ pub struct ParquetReader {
#[async_trait]
impl BatchReader for ParquetReader {
#[tracing::instrument(
skip_all,
fields(
region_id = %self.context.reader_builder().file_handle.region_id(),
file_id = %self.context.reader_builder().file_handle.file_id()
)
)]
async fn next_batch(&mut self) -> Result<Option<Batch>> {
let ReaderState::Readable(reader) = &mut self.reader_state else {
return Ok(None);
@@ -1491,6 +1519,13 @@ impl Drop for ParquetReader {
impl ParquetReader {
/// Creates a new reader.
#[tracing::instrument(
skip_all,
fields(
region_id = %context.reader_builder().file_handle.region_id(),
file_id = %context.reader_builder().file_handle.file_id()
)
)]
pub(crate) async fn new(
context: FileRangeContextRef,
mut selection: RowGroupSelection,

View File

@@ -48,6 +48,7 @@ use snafu::{OptionExt, ResultExt, ensure};
use sqlparser::ast::AnalyzeFormat;
use table::TableRef;
use table::requests::{DeleteRequest, InsertRequest};
use tracing::Span;
use crate::analyze::DistAnalyzeExec;
pub use crate::datafusion::planner::DfContextProviderAdapter;
@@ -606,6 +607,7 @@ impl QueryExecutor for DatafusionQueryEngine {
let exec_timer = metrics::EXEC_PLAN_ELAPSED.start_timer();
let task_ctx = ctx.build_task_ctx();
let span = Span::current();
match plan.properties().output_partitioning().partition_count() {
0 => {
@@ -622,7 +624,7 @@ impl QueryExecutor for DatafusionQueryEngine {
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let mut stream = RecordBatchStreamAdapter::try_new(df_stream)
let mut stream = RecordBatchStreamAdapter::try_new_with_span(df_stream, span)
.context(error::ConvertDfRecordBatchStreamSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
@@ -655,7 +657,7 @@ impl QueryExecutor for DatafusionQueryEngine {
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let mut stream = RecordBatchStreamAdapter::try_new(df_stream)
let mut stream = RecordBatchStreamAdapter::try_new_with_span(df_stream, span)
.context(error::ConvertDfRecordBatchStreamSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;

View File

@@ -47,6 +47,7 @@ use session::context::QueryContextRef;
use store_api::storage::RegionId;
use table::table_name::TableName;
use tokio::time::Instant;
use tracing::{Instrument, Span};
use crate::dist_plan::analyzer::AliasMapping;
use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS};
@@ -284,6 +285,12 @@ impl MergeScanExec {
.step_by(target_partition)
.copied()
{
let region_span = tracing_context.attach(tracing::info_span!(
parent: &Span::current(),
"merge_scan_region",
region_id = %region_id,
partition = partition
));
let request = QueryRequest {
header: Some(RegionRequestHeader {
tracing_context: tracing_context.to_w3c(),
@@ -306,6 +313,7 @@ impl MergeScanExec {
let mut stream = region_query_handler
.do_get(read_preference, request)
.instrument(region_span.clone())
.await
.map_err(|e| {
MERGE_SCAN_ERRORS_TOTAL.inc();
@@ -317,7 +325,7 @@ impl MergeScanExec {
let mut poll_duration = Duration::ZERO;
let mut poll_timer = Instant::now();
while let Some(batch) = stream.next().await {
while let Some(batch) = stream.next().instrument(region_span.clone()).await {
let poll_elapsed = poll_timer.elapsed();
poll_duration += poll_elapsed;

View File

@@ -339,8 +339,11 @@ impl ExecutionPlan for RegionScanExec {
context: Arc<TaskContext>,
) -> datafusion_common::Result<DfSendableRecordBatchStream> {
let tracing_context = TracingContext::from_json(context.session_id().as_str());
let span =
tracing_context.attach(common_telemetry::tracing::info_span!("read_from_region"));
let span = tracing_context.attach(common_telemetry::tracing::info_span!(
"read_from_region",
partition = partition
));
let enter = span.enter();
let ctx = QueryScanContext {
explain_verbose: self.explain_verbose,
@@ -358,6 +361,7 @@ impl ExecutionPlan for RegionScanExec {
stream
};
drop(enter);
let stream_metrics = StreamMetrics::new(&self.metric, partition);
Ok(Box::pin(StreamWithMetricWrapper {
stream,