feat: update partial execution metrics (#6499)

* feat: update partial execution metrics

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* send data with metrics in distributed mode

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* only send partial metrics under VERBOSE flag

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* loop to while

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Ruihang Xia
2025-07-16 09:59:10 -07:00
committed by Yingwen
parent 291f3c89fe
commit 717c1d1807
9 changed files with 165 additions and 52 deletions

View File

@@ -163,19 +163,70 @@ impl RegionRequester {
let _span = tracing_context.attach(common_telemetry::tracing::info_span!(
"poll_flight_data_stream"
));
while let Some(flight_message) = flight_message_stream.next().await {
let flight_message = flight_message
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let mut buffered_message: Option<FlightMessage> = None;
let mut stream_ended = false;
while !stream_ended {
// get the next message from the buffered message or read from the flight message stream
let flight_message_item = if let Some(msg) = buffered_message.take() {
Some(Ok(msg))
} else {
flight_message_stream.next().await
};
let flight_message = match flight_message_item {
Some(Ok(message)) => message,
Some(Err(e)) => {
yield Err(BoxedError::new(e)).context(ExternalSnafu);
break;
}
None => break,
};
match flight_message {
FlightMessage::RecordBatch(record_batch) => {
yield RecordBatch::try_from_df_record_batch(
let result_to_yield = RecordBatch::try_from_df_record_batch(
schema_cloned.clone(),
record_batch,
)
);
// get the next message from the stream. normally it should be a metrics message.
if let Some(next_flight_message_result) = flight_message_stream.next().await
{
match next_flight_message_result {
Ok(FlightMessage::Metrics(s)) => {
let m = serde_json::from_str(&s).ok().map(Arc::new);
metrics_ref.swap(m);
}
Ok(FlightMessage::RecordBatch(rb)) => {
// for some reason it's not a metrics message, so we need to buffer this record batch
// and yield it in the next iteration.
buffered_message = Some(FlightMessage::RecordBatch(rb));
}
Ok(_) => {
yield IllegalFlightMessagesSnafu {
reason: "A RecordBatch message can only be succeeded by a Metrics message or another RecordBatch message"
}
.fail()
.map_err(BoxedError::new)
.context(ExternalSnafu);
break;
}
Err(e) => {
yield Err(BoxedError::new(e)).context(ExternalSnafu);
break;
}
}
} else {
// the stream has ended
stream_ended = true;
}
yield result_to_yield;
}
FlightMessage::Metrics(s) => {
// just a branch in case of some metrics message comes after other things.
let m = serde_json::from_str(&s).ok().map(Arc::new);
metrics_ref.swap(m);
break;

View File

@@ -222,6 +222,7 @@ pub struct RecordBatchStreamAdapter {
enum Metrics {
Unavailable,
Unresolved(Arc<dyn ExecutionPlan>),
PartialResolved(Arc<dyn ExecutionPlan>, RecordBatchMetrics),
Resolved(RecordBatchMetrics),
}
@@ -275,7 +276,9 @@ impl RecordBatchStream for RecordBatchStreamAdapter {
fn metrics(&self) -> Option<RecordBatchMetrics> {
match &self.metrics_2 {
Metrics::Resolved(metrics) => Some(metrics.clone()),
Metrics::Resolved(metrics) | Metrics::PartialResolved(_, metrics) => {
Some(metrics.clone())
}
Metrics::Unavailable | Metrics::Unresolved(_) => None,
}
}
@@ -299,13 +302,25 @@ impl Stream for RecordBatchStreamAdapter {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(df_record_batch)) => {
let df_record_batch = df_record_batch?;
if let Metrics::Unresolved(df_plan) | Metrics::PartialResolved(df_plan, _) =
&self.metrics_2
{
let mut metric_collector = MetricCollector::new(self.explain_verbose);
accept(df_plan.as_ref(), &mut metric_collector).unwrap();
self.metrics_2 = Metrics::PartialResolved(
df_plan.clone(),
metric_collector.record_batch_metrics,
);
}
Poll::Ready(Some(RecordBatch::try_from_df_record_batch(
self.schema(),
df_record_batch,
)))
}
Poll::Ready(None) => {
if let Metrics::Unresolved(df_plan) = &self.metrics_2 {
if let Metrics::Unresolved(df_plan) | Metrics::PartialResolved(df_plan, _) =
&self.metrics_2
{
let mut metric_collector = MetricCollector::new(self.explain_verbose);
accept(df_plan.as_ref(), &mut metric_collector).unwrap();
self.metrics_2 = Metrics::Resolved(metric_collector.record_batch_metrics);

View File

@@ -51,7 +51,7 @@ use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as S
use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
use servers::grpc::region_server::RegionServerHandler;
use servers::grpc::FlightCompression;
use session::context::{QueryContextBuilder, QueryContextRef};
use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metric_engine_consts::{
FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
@@ -194,6 +194,7 @@ impl RegionServer {
pub async fn handle_remote_read(
&self,
request: api::v1::region::QueryRequest,
query_ctx: QueryContextRef,
) -> Result<SendableRecordBatchStream> {
let _permit = if let Some(p) = &self.inner.parallelism {
Some(p.acquire().await?)
@@ -201,12 +202,6 @@ impl RegionServer {
None
};
let query_ctx: QueryContextRef = request
.header
.as_ref()
.map(|h| Arc::new(h.into()))
.unwrap_or_else(|| Arc::new(QueryContextBuilder::default().build()));
let region_id = RegionId::from_u64(request.region_id);
let provider = self.table_provider(region_id, Some(&query_ctx)).await?;
let catalog_list = Arc::new(DummyCatalogList::with_table_provider(provider));
@@ -214,7 +209,7 @@ impl RegionServer {
let decoder = self
.inner
.query_engine
.engine_context(query_ctx)
.engine_context(query_ctx.clone())
.new_plan_decoder()
.context(NewPlanDecoderSnafu)?;
@@ -224,11 +219,14 @@ impl RegionServer {
.context(DecodeLogicalPlanSnafu)?;
self.inner
.handle_read(QueryRequest {
header: request.header,
region_id,
plan,
})
.handle_read(
QueryRequest {
header: request.header,
region_id,
plan,
},
query_ctx,
)
.await
}
@@ -243,6 +241,7 @@ impl RegionServer {
let ctx: Option<session::context::QueryContext> = request.header.as_ref().map(|h| h.into());
let provider = self.table_provider(request.region_id, ctx.as_ref()).await?;
let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build()));
struct RegionDataSourceInjector {
source: Arc<dyn TableSource>,
@@ -271,7 +270,7 @@ impl RegionServer {
.data;
self.inner
.handle_read(QueryRequest { plan, ..request })
.handle_read(QueryRequest { plan, ..request }, query_ctx)
.await
}
@@ -536,9 +535,14 @@ impl FlightCraft for RegionServer {
.as_ref()
.map(|h| TracingContext::from_w3c(&h.tracing_context))
.unwrap_or_default();
let query_ctx = request
.header
.as_ref()
.map(|h| Arc::new(QueryContext::from(h)))
.unwrap_or(QueryContext::arc());
let result = self
.handle_remote_read(request)
.handle_remote_read(request, query_ctx.clone())
.trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
.await?;
@@ -546,6 +550,7 @@ impl FlightCraft for RegionServer {
result,
tracing_context,
self.flight_compression,
query_ctx,
));
Ok(Response::new(stream))
}
@@ -1123,16 +1128,13 @@ impl RegionServerInner {
Ok(())
}
pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
pub async fn handle_read(
&self,
request: QueryRequest,
query_ctx: QueryContextRef,
) -> Result<SendableRecordBatchStream> {
// TODO(ruihang): add metrics and set trace id
// Build query context from gRPC header
let query_ctx: QueryContextRef = request
.header
.as_ref()
.map(|h| Arc::new(h.into()))
.unwrap_or_else(|| QueryContextBuilder::default().build().into());
let result = self
.query_engine
.execute(request.plan, query_ctx)

View File

@@ -530,9 +530,6 @@ transform:
.into_transformed()
.unwrap();
// println!("[DEBUG]schema_info: {:?}", schema_info.schema);
// println!("[DEBUG]re: {:?}", result.0.values);
assert_eq!(schema_info.schema.len(), result.0.values.len());
let test = vec![
(

View File

@@ -16,7 +16,7 @@ use std::any::Any;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use ahash::HashSet;
use ahash::{HashMap, HashSet};
use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SortOptions};
use async_stream::stream;
use common_catalog::parse_catalog_and_schema_from_db_string;
@@ -143,7 +143,7 @@ pub struct MergeScanExec {
metric: ExecutionPlanMetricsSet,
properties: PlanProperties,
/// Metrics from sub stages
sub_stage_metrics: Arc<Mutex<Vec<RecordBatchMetrics>>>,
sub_stage_metrics: Arc<Mutex<HashMap<RegionId, RecordBatchMetrics>>>,
query_ctx: QueryContextRef,
target_partition: usize,
partition_cols: Vec<String>,
@@ -317,6 +317,12 @@ impl MergeScanExec {
if let Some(mut first_consume_timer) = first_consume_timer.take() {
first_consume_timer.stop();
}
if let Some(metrics) = stream.metrics() {
let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
sub_stage_metrics.insert(region_id, metrics);
}
yield Ok(batch);
// reset poll timer
poll_timer = Instant::now();
@@ -341,7 +347,8 @@ impl MergeScanExec {
metric.record_greptime_exec_cost(value as usize);
// record metrics from sub sgates
sub_stage_metrics_moved.lock().unwrap().push(metrics);
let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
sub_stage_metrics.insert(region_id, metrics);
}
MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64());
@@ -409,7 +416,12 @@ impl MergeScanExec {
}
pub fn sub_stage_metrics(&self) -> Vec<RecordBatchMetrics> {
self.sub_stage_metrics.lock().unwrap().clone()
self.sub_stage_metrics
.lock()
.unwrap()
.values()
.cloned()
.collect()
}
pub fn partition_count(&self) -> usize {

View File

@@ -36,6 +36,7 @@ use common_telemetry::tracing_context::{FutureExt, TracingContext};
use futures::{future, ready, Stream};
use futures_util::{StreamExt, TryStreamExt};
use prost::Message;
use session::context::{QueryContext, QueryContextRef};
use snafu::{ensure, ResultExt};
use table::table_name::TableName;
use tokio::sync::mpsc;
@@ -188,6 +189,7 @@ impl FlightCraft for GreptimeRequestHandler {
let ticket = request.into_inner().ticket;
let request =
GreptimeRequest::decode(ticket.as_ref()).context(error::InvalidFlightTicketSnafu)?;
let query_ctx = QueryContext::arc();
// The Grpc protocol pass query by Flight. It needs to be wrapped under a span, in order to record stream
let span = info_span!(
@@ -202,6 +204,7 @@ impl FlightCraft for GreptimeRequestHandler {
output,
TracingContext::from_current_span(),
flight_compression,
query_ctx,
);
Ok(Response::new(stream))
}
@@ -371,15 +374,25 @@ fn to_flight_data_stream(
output: Output,
tracing_context: TracingContext,
flight_compression: FlightCompression,
query_ctx: QueryContextRef,
) -> TonicStream<FlightData> {
match output.data {
OutputData::Stream(stream) => {
let stream = FlightRecordBatchStream::new(stream, tracing_context, flight_compression);
let stream = FlightRecordBatchStream::new(
stream,
tracing_context,
flight_compression,
query_ctx,
);
Box::pin(stream) as _
}
OutputData::RecordBatches(x) => {
let stream =
FlightRecordBatchStream::new(x.as_stream(), tracing_context, flight_compression);
let stream = FlightRecordBatchStream::new(
x.as_stream(),
tracing_context,
flight_compression,
query_ctx,
);
Box::pin(stream) as _
}
OutputData::AffectedRows(rows) => {

View File

@@ -25,6 +25,7 @@ use futures::channel::mpsc;
use futures::channel::mpsc::Sender;
use futures::{SinkExt, Stream, StreamExt};
use pin_project::{pin_project, pinned_drop};
use session::context::QueryContextRef;
use snafu::ResultExt;
use tokio::task::JoinHandle;
@@ -46,10 +47,12 @@ impl FlightRecordBatchStream {
recordbatches: SendableRecordBatchStream,
tracing_context: TracingContext,
compression: FlightCompression,
query_ctx: QueryContextRef,
) -> Self {
let should_send_partial_metrics = query_ctx.explain_verbose();
let (tx, rx) = mpsc::channel::<TonicResult<FlightMessage>>(1);
let join_handle = common_runtime::spawn_global(async move {
Self::flight_data_stream(recordbatches, tx)
Self::flight_data_stream(recordbatches, tx, should_send_partial_metrics)
.trace(tracing_context.attach(info_span!("flight_data_stream")))
.await
});
@@ -69,6 +72,7 @@ impl FlightRecordBatchStream {
async fn flight_data_stream(
mut recordbatches: SendableRecordBatchStream,
mut tx: Sender<TonicResult<FlightMessage>>,
should_send_partial_metrics: bool,
) {
let schema = recordbatches.schema().arrow_schema().clone();
if let Err(e) = tx.send(Ok(FlightMessage::Schema(schema))).await {
@@ -88,6 +92,17 @@ impl FlightRecordBatchStream {
warn!(e; "stop sending Flight data");
return;
}
if should_send_partial_metrics {
if let Some(metrics) = recordbatches
.metrics()
.and_then(|m| serde_json::to_string(&m).ok())
{
if let Err(e) = tx.send(Ok(FlightMessage::Metrics(metrics))).await {
warn!(e; "stop sending Flight data");
return;
}
}
}
}
Err(e) => {
let e = Err(e).context(error::CollectRecordbatchSnafu);
@@ -154,6 +169,7 @@ mod test {
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::Int32Vector;
use futures::StreamExt;
use session::context::QueryContext;
use super::*;
@@ -175,6 +191,7 @@ mod test {
recordbatches,
TracingContext::default(),
FlightCompression::default(),
QueryContext::arc(),
);
let mut raw_data = Vec::with_capacity(2);

View File

@@ -882,11 +882,14 @@ CREATE TABLE {table_name} (
let region_id = RegionId::new(table_id, *region);
let stream = region_server
.handle_remote_read(RegionQueryRequest {
region_id: region_id.as_u64(),
plan: plan.to_vec(),
..Default::default()
})
.handle_remote_read(
RegionQueryRequest {
region_id: region_id.as_u64(),
plan: plan.to_vec(),
..Default::default()
},
QueryContext::arc(),
)
.await
.unwrap();

View File

@@ -249,11 +249,14 @@ mod tests {
let region_id = RegionId::new(table_id, *region);
let stream = region_server
.handle_remote_read(QueryRequest {
region_id: region_id.as_u64(),
plan: plan.to_vec(),
..Default::default()
})
.handle_remote_read(
QueryRequest {
region_id: region_id.as_u64(),
plan: plan.to_vec(),
..Default::default()
},
QueryContext::arc(),
)
.await
.unwrap();