From 639b3ddc3e0f4bf16c5d55761479c709ab39f869 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 16 Jul 2025 09:59:10 -0700 Subject: [PATCH] feat: update partial execution metrics (#6499) * feat: update partial execution metrics Signed-off-by: Ruihang Xia * send data with metrics in distributed mode Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia * only send partial metrics under VERBOSE flag Signed-off-by: Ruihang Xia * loop to while Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/client/src/region.rs | 63 ++++++++++++++++++++++++--- src/common/recordbatch/src/adapter.rs | 19 +++++++- src/datanode/src/region_server.rs | 48 ++++++++++---------- src/pipeline/src/etl.rs | 3 -- src/query/src/dist_plan/merge_scan.rs | 20 +++++++-- src/servers/src/grpc/flight.rs | 19 ++++++-- src/servers/src/grpc/flight/stream.rs | 19 +++++++- tests-integration/src/grpc.rs | 13 +++--- tests-integration/src/instance.rs | 13 +++--- 9 files changed, 165 insertions(+), 52 deletions(-) diff --git a/src/client/src/region.rs b/src/client/src/region.rs index d4a7e5b36c..2472aee2b2 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -160,19 +160,70 @@ impl RegionRequester { let _span = tracing_context.attach(common_telemetry::tracing::info_span!( "poll_flight_data_stream" )); - while let Some(flight_message) = flight_message_stream.next().await { - let flight_message = flight_message - .map_err(BoxedError::new) - .context(ExternalSnafu)?; + + let mut buffered_message: Option = None; + let mut stream_ended = false; + + while !stream_ended { + // get the next message from the buffered message or read from the flight message stream + let flight_message_item = if let Some(msg) = buffered_message.take() { + Some(Ok(msg)) + } else { + flight_message_stream.next().await + }; + + let flight_message = match flight_message_item { + Some(Ok(message)) => message, + Some(Err(e)) => { + yield Err(BoxedError::new(e)).context(ExternalSnafu); + break; + } + None => break, + }; match flight_message { FlightMessage::RecordBatch(record_batch) => { - yield RecordBatch::try_from_df_record_batch( + let result_to_yield = RecordBatch::try_from_df_record_batch( schema_cloned.clone(), record_batch, - ) + ); + + // get the next message from the stream. normally it should be a metrics message. + if let Some(next_flight_message_result) = flight_message_stream.next().await + { + match next_flight_message_result { + Ok(FlightMessage::Metrics(s)) => { + let m = serde_json::from_str(&s).ok().map(Arc::new); + metrics_ref.swap(m); + } + Ok(FlightMessage::RecordBatch(rb)) => { + // for some reason it's not a metrics message, so we need to buffer this record batch + // and yield it in the next iteration. + buffered_message = Some(FlightMessage::RecordBatch(rb)); + } + Ok(_) => { + yield IllegalFlightMessagesSnafu { + reason: "A RecordBatch message can only be succeeded by a Metrics message or another RecordBatch message" + } + .fail() + .map_err(BoxedError::new) + .context(ExternalSnafu); + break; + } + Err(e) => { + yield Err(BoxedError::new(e)).context(ExternalSnafu); + break; + } + } + } else { + // the stream has ended + stream_ended = true; + } + + yield result_to_yield; } FlightMessage::Metrics(s) => { + // just a branch in case of some metrics message comes after other things. let m = serde_json::from_str(&s).ok().map(Arc::new); metrics_ref.swap(m); break; diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs index 3d27d7120f..ec3754a242 100644 --- a/src/common/recordbatch/src/adapter.rs +++ b/src/common/recordbatch/src/adapter.rs @@ -222,6 +222,7 @@ pub struct RecordBatchStreamAdapter { enum Metrics { Unavailable, Unresolved(Arc), + PartialResolved(Arc, RecordBatchMetrics), Resolved(RecordBatchMetrics), } @@ -275,7 +276,9 @@ impl RecordBatchStream for RecordBatchStreamAdapter { fn metrics(&self) -> Option { match &self.metrics_2 { - Metrics::Resolved(metrics) => Some(metrics.clone()), + Metrics::Resolved(metrics) | Metrics::PartialResolved(_, metrics) => { + Some(metrics.clone()) + } Metrics::Unavailable | Metrics::Unresolved(_) => None, } } @@ -299,13 +302,25 @@ impl Stream for RecordBatchStreamAdapter { Poll::Pending => Poll::Pending, Poll::Ready(Some(df_record_batch)) => { let df_record_batch = df_record_batch?; + if let Metrics::Unresolved(df_plan) | Metrics::PartialResolved(df_plan, _) = + &self.metrics_2 + { + let mut metric_collector = MetricCollector::new(self.explain_verbose); + accept(df_plan.as_ref(), &mut metric_collector).unwrap(); + self.metrics_2 = Metrics::PartialResolved( + df_plan.clone(), + metric_collector.record_batch_metrics, + ); + } Poll::Ready(Some(RecordBatch::try_from_df_record_batch( self.schema(), df_record_batch, ))) } Poll::Ready(None) => { - if let Metrics::Unresolved(df_plan) = &self.metrics_2 { + if let Metrics::Unresolved(df_plan) | Metrics::PartialResolved(df_plan, _) = + &self.metrics_2 + { let mut metric_collector = MetricCollector::new(self.explain_verbose); accept(df_plan.as_ref(), &mut metric_collector).unwrap(); self.metrics_2 = Metrics::Resolved(metric_collector.record_batch_metrics); diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index a924908f76..ac51fdbe23 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -54,7 +54,7 @@ use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as S use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream}; use servers::grpc::region_server::RegionServerHandler; use servers::grpc::FlightCompression; -use session::context::{QueryContextBuilder, QueryContextRef}; +use session::context::{QueryContext, QueryContextBuilder, QueryContextRef}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metric_engine_consts::{ FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, @@ -197,6 +197,7 @@ impl RegionServer { pub async fn handle_remote_read( &self, request: api::v1::region::QueryRequest, + query_ctx: QueryContextRef, ) -> Result { let _permit = if let Some(p) = &self.inner.parallelism { Some(p.acquire().await?) @@ -204,12 +205,6 @@ impl RegionServer { None }; - let query_ctx: QueryContextRef = request - .header - .as_ref() - .map(|h| Arc::new(h.into())) - .unwrap_or_else(|| Arc::new(QueryContextBuilder::default().build())); - let region_id = RegionId::from_u64(request.region_id); let provider = self.table_provider(region_id, Some(&query_ctx)).await?; let catalog_list = Arc::new(DummyCatalogList::with_table_provider(provider)); @@ -217,7 +212,7 @@ impl RegionServer { let decoder = self .inner .query_engine - .engine_context(query_ctx) + .engine_context(query_ctx.clone()) .new_plan_decoder() .context(NewPlanDecoderSnafu)?; @@ -227,11 +222,14 @@ impl RegionServer { .context(DecodeLogicalPlanSnafu)?; self.inner - .handle_read(QueryRequest { - header: request.header, - region_id, - plan, - }) + .handle_read( + QueryRequest { + header: request.header, + region_id, + plan, + }, + query_ctx, + ) .await } @@ -246,6 +244,7 @@ impl RegionServer { let ctx: Option = request.header.as_ref().map(|h| h.into()); let provider = self.table_provider(request.region_id, ctx.as_ref()).await?; + let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build())); struct RegionDataSourceInjector { source: Arc, @@ -274,7 +273,7 @@ impl RegionServer { .data; self.inner - .handle_read(QueryRequest { plan, ..request }) + .handle_read(QueryRequest { plan, ..request }, query_ctx) .await } @@ -588,9 +587,14 @@ impl FlightCraft for RegionServer { .as_ref() .map(|h| TracingContext::from_w3c(&h.tracing_context)) .unwrap_or_default(); + let query_ctx = request + .header + .as_ref() + .map(|h| Arc::new(QueryContext::from(h))) + .unwrap_or(QueryContext::arc()); let result = self - .handle_remote_read(request) + .handle_remote_read(request, query_ctx.clone()) .trace(tracing_context.attach(info_span!("RegionServer::handle_read"))) .await?; @@ -598,6 +602,7 @@ impl FlightCraft for RegionServer { result, tracing_context, self.flight_compression, + query_ctx, )); Ok(Response::new(stream)) } @@ -1177,16 +1182,13 @@ impl RegionServerInner { Ok(()) } - pub async fn handle_read(&self, request: QueryRequest) -> Result { + pub async fn handle_read( + &self, + request: QueryRequest, + query_ctx: QueryContextRef, + ) -> Result { // TODO(ruihang): add metrics and set trace id - // Build query context from gRPC header - let query_ctx: QueryContextRef = request - .header - .as_ref() - .map(|h| Arc::new(h.into())) - .unwrap_or_else(|| QueryContextBuilder::default().build().into()); - let result = self .query_engine .execute(request.plan, query_ctx) diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index 80ddf18767..b24768145b 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -498,9 +498,6 @@ transform: .into_transformed() .unwrap(); - // println!("[DEBUG]schema_info: {:?}", schema_info.schema); - // println!("[DEBUG]re: {:?}", result.0.values); - assert_eq!(schema_info.schema.len(), result.0.values.len()); let test = vec![ ( diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index c98206dbc5..a4a59753e0 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -16,7 +16,7 @@ use std::any::Any; use std::sync::{Arc, Mutex}; use std::time::Duration; -use ahash::HashSet; +use ahash::{HashMap, HashSet}; use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SortOptions}; use async_stream::stream; use common_catalog::parse_catalog_and_schema_from_db_string; @@ -143,7 +143,7 @@ pub struct MergeScanExec { metric: ExecutionPlanMetricsSet, properties: PlanProperties, /// Metrics from sub stages - sub_stage_metrics: Arc>>, + sub_stage_metrics: Arc>>, query_ctx: QueryContextRef, target_partition: usize, partition_cols: Vec, @@ -317,6 +317,12 @@ impl MergeScanExec { if let Some(mut first_consume_timer) = first_consume_timer.take() { first_consume_timer.stop(); } + + if let Some(metrics) = stream.metrics() { + let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap(); + sub_stage_metrics.insert(region_id, metrics); + } + yield Ok(batch); // reset poll timer poll_timer = Instant::now(); @@ -341,7 +347,8 @@ impl MergeScanExec { metric.record_greptime_exec_cost(value as usize); // record metrics from sub sgates - sub_stage_metrics_moved.lock().unwrap().push(metrics); + let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap(); + sub_stage_metrics.insert(region_id, metrics); } MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64()); @@ -409,7 +416,12 @@ impl MergeScanExec { } pub fn sub_stage_metrics(&self) -> Vec { - self.sub_stage_metrics.lock().unwrap().clone() + self.sub_stage_metrics + .lock() + .unwrap() + .values() + .cloned() + .collect() } pub fn partition_count(&self) -> usize { diff --git a/src/servers/src/grpc/flight.rs b/src/servers/src/grpc/flight.rs index 8500181cc8..0264a3b46b 100644 --- a/src/servers/src/grpc/flight.rs +++ b/src/servers/src/grpc/flight.rs @@ -36,6 +36,7 @@ use common_telemetry::tracing_context::{FutureExt, TracingContext}; use futures::{future, ready, Stream}; use futures_util::{StreamExt, TryStreamExt}; use prost::Message; +use session::context::{QueryContext, QueryContextRef}; use snafu::{ensure, ResultExt}; use table::table_name::TableName; use tokio::sync::mpsc; @@ -188,6 +189,7 @@ impl FlightCraft for GreptimeRequestHandler { let ticket = request.into_inner().ticket; let request = GreptimeRequest::decode(ticket.as_ref()).context(error::InvalidFlightTicketSnafu)?; + let query_ctx = QueryContext::arc(); // The Grpc protocol pass query by Flight. It needs to be wrapped under a span, in order to record stream let span = info_span!( @@ -202,6 +204,7 @@ impl FlightCraft for GreptimeRequestHandler { output, TracingContext::from_current_span(), flight_compression, + query_ctx, ); Ok(Response::new(stream)) } @@ -371,15 +374,25 @@ fn to_flight_data_stream( output: Output, tracing_context: TracingContext, flight_compression: FlightCompression, + query_ctx: QueryContextRef, ) -> TonicStream { match output.data { OutputData::Stream(stream) => { - let stream = FlightRecordBatchStream::new(stream, tracing_context, flight_compression); + let stream = FlightRecordBatchStream::new( + stream, + tracing_context, + flight_compression, + query_ctx, + ); Box::pin(stream) as _ } OutputData::RecordBatches(x) => { - let stream = - FlightRecordBatchStream::new(x.as_stream(), tracing_context, flight_compression); + let stream = FlightRecordBatchStream::new( + x.as_stream(), + tracing_context, + flight_compression, + query_ctx, + ); Box::pin(stream) as _ } OutputData::AffectedRows(rows) => { diff --git a/src/servers/src/grpc/flight/stream.rs b/src/servers/src/grpc/flight/stream.rs index 1937fb6346..939fa8381c 100644 --- a/src/servers/src/grpc/flight/stream.rs +++ b/src/servers/src/grpc/flight/stream.rs @@ -27,6 +27,7 @@ use futures::channel::mpsc; use futures::channel::mpsc::Sender; use futures::{SinkExt, Stream, StreamExt}; use pin_project::{pin_project, pinned_drop}; +use session::context::QueryContextRef; use snafu::ResultExt; use tokio::task::JoinHandle; @@ -49,10 +50,12 @@ impl FlightRecordBatchStream { recordbatches: SendableRecordBatchStream, tracing_context: TracingContext, compression: FlightCompression, + query_ctx: QueryContextRef, ) -> Self { + let should_send_partial_metrics = query_ctx.explain_verbose(); let (tx, rx) = mpsc::channel::>(1); let join_handle = common_runtime::spawn_global(async move { - Self::flight_data_stream(recordbatches, tx) + Self::flight_data_stream(recordbatches, tx, should_send_partial_metrics) .trace(tracing_context.attach(info_span!("flight_data_stream"))) .await }); @@ -73,6 +76,7 @@ impl FlightRecordBatchStream { async fn flight_data_stream( mut recordbatches: SendableRecordBatchStream, mut tx: Sender>, + should_send_partial_metrics: bool, ) { let schema = recordbatches.schema().arrow_schema().clone(); if let Err(e) = tx.send(Ok(FlightMessage::Schema(schema))).await { @@ -92,6 +96,17 @@ impl FlightRecordBatchStream { warn!(e; "stop sending Flight data"); return; } + if should_send_partial_metrics { + if let Some(metrics) = recordbatches + .metrics() + .and_then(|m| serde_json::to_string(&m).ok()) + { + if let Err(e) = tx.send(Ok(FlightMessage::Metrics(metrics))).await { + warn!(e; "stop sending Flight data"); + return; + } + } + } } Err(e) => { if e.status_code().should_log_error() { @@ -171,6 +186,7 @@ mod test { use datatypes::schema::{ColumnSchema, Schema}; use datatypes::vectors::Int32Vector; use futures::StreamExt; + use session::context::QueryContext; use super::*; @@ -192,6 +208,7 @@ mod test { recordbatches, TracingContext::default(), FlightCompression::default(), + QueryContext::arc(), ); let mut raw_data = Vec::with_capacity(2); diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index 19ff1fa3ae..5de87ea6eb 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -882,11 +882,14 @@ CREATE TABLE {table_name} ( let region_id = RegionId::new(table_id, *region); let stream = region_server - .handle_remote_read(RegionQueryRequest { - region_id: region_id.as_u64(), - plan: plan.to_vec(), - ..Default::default() - }) + .handle_remote_read( + RegionQueryRequest { + region_id: region_id.as_u64(), + plan: plan.to_vec(), + ..Default::default() + }, + QueryContext::arc(), + ) .await .unwrap(); diff --git a/tests-integration/src/instance.rs b/tests-integration/src/instance.rs index e23c4234be..49bd516e90 100644 --- a/tests-integration/src/instance.rs +++ b/tests-integration/src/instance.rs @@ -249,11 +249,14 @@ mod tests { let region_id = RegionId::new(table_id, *region); let stream = region_server - .handle_remote_read(QueryRequest { - region_id: region_id.as_u64(), - plan: plan.to_vec(), - ..Default::default() - }) + .handle_remote_read( + QueryRequest { + region_id: region_id.as_u64(), + plan: plan.to_vec(), + ..Default::default() + }, + QueryContext::arc(), + ) .await .unwrap();