From ef46cd856a094ff3b78a451803a8219e701f3e5f Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 30 Apr 2026 20:32:44 +0800 Subject: [PATCH] refactor: per review Signed-off-by: discord9 --- src/client/src/database.rs | 130 ++++++-- src/common/grpc/src/flight.rs | 3 + src/datanode/src/region_server.rs | 68 ++-- src/flow/src/batching_mode/frontend_client.rs | 293 +----------------- src/query/src/datafusion.rs | 40 ++- src/query/src/metrics.rs | 43 +++ src/servers/src/grpc/flight.rs | 29 +- 7 files changed, 237 insertions(+), 369 deletions(-) diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 0ca16caa04..d065eae273 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -36,7 +36,7 @@ use common_catalog::build_db_string; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::ext::BoxedError; use common_grpc::flight::do_put::DoPutResponse; -use common_grpc::flight::{FlightDecoder, FlightMessage}; +use common_grpc::flight::{FLOW_EXTENSIONS_METADATA_KEY, FlightDecoder, FlightMessage}; use common_query::Output; use common_recordbatch::adapter::RecordBatchMetrics; use common_recordbatch::error::ExternalSnafu; @@ -61,12 +61,21 @@ type FlightDataStream = Pin + Send>>; type DoPutResponseStream = Pin>>>; -const FLOW_EXTENSIONS_METADATA_KEY: &str = "x-greptime-flow-extensions"; - +/// Terminal metrics associated with a query output. +/// +/// For streaming outputs, metrics are only final after the stream is fully +/// drained and [`Self::is_ready`] returns `true`. Region watermark helpers keep +/// the RFC distinction between proved regions, unproved participating regions, +/// and non-participating regions. #[derive(Debug, Clone, Default)] pub struct OutputMetrics { - metrics: Arc>>, - ready: Arc, + inner: Arc, +} + +#[derive(Debug, Default)] +struct OutputMetricsInner { + metrics: RwLock>, + ready: AtomicBool, } impl OutputMetrics { @@ -74,20 +83,29 @@ impl OutputMetrics { Self::default() } + /// Replaces the current terminal metrics snapshot. pub fn update(&self, metrics: Option) { - *self.metrics.write().expect("metrics lock poisoned") = metrics; + *self.inner.metrics.write().unwrap() = metrics; } + /// Marks the terminal metrics as final for this output. pub fn mark_ready(&self) { - self.ready.store(true, Ordering::Release); + let _ = self + .inner + .ready + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire); } + /// Returns whether terminal metrics are final. + /// + /// Streaming outputs become ready only after the stream reaches EOF. pub fn is_ready(&self) -> bool { - self.ready.load(Ordering::Acquire) + self.inner.ready.load(Ordering::Acquire) } + /// Returns the latest terminal metrics snapshot, if any. pub fn get(&self) -> Option { - self.metrics.read().expect("metrics lock poisoned").clone() + self.inner.metrics.read().unwrap().clone() } /// Returns proved per-region watermarks. @@ -118,6 +136,11 @@ impl OutputMetrics { } } +/// Query output together with a handle for its terminal metrics. +/// +/// The contained [`OutputMetrics`] lets callers read stream terminal metrics +/// after consuming `output`. For non-stream outputs, metrics are ready +/// immediately. #[derive(Debug)] pub struct OutputWithMetrics { pub output: Output, @@ -125,6 +148,10 @@ pub struct OutputWithMetrics { } impl OutputWithMetrics { + /// Wraps an output with a terminal metrics handle. + /// + /// Stream outputs update the handle as the stream is consumed. Non-stream + /// outputs are marked ready immediately. pub fn from_output(output: Output) -> Self { let terminal_metrics = OutputMetrics::new(); let output = attach_terminal_metrics(output, &terminal_metrics); @@ -134,14 +161,17 @@ impl OutputWithMetrics { } } + /// Returns proved per-region watermarks from the terminal metrics. pub fn region_watermark_map(&self) -> Option> { self.metrics.region_watermark_map() } + /// Returns all regions participating in terminal metric collection. pub fn participating_regions(&self) -> Option> { self.metrics.participating_regions() } + /// Drops the terminal metrics handle and returns the original output. pub fn into_output(self) -> Output { self.output } @@ -320,7 +350,6 @@ where yield Err(BoxedError::new(e)).context(ExternalSnafu); } }; - break; } FlightMessage::AffectedRows { .. } | FlightMessage::Schema(_) => { yield IllegalFlightMessagesSnafu {reason: format!("A Schema message must be succeeded exclusively by a set of RecordBatch messages, flight_message: {:?}", flight_message)} @@ -513,9 +542,6 @@ impl Database { } fn put_hints(metadata: &mut MetadataMap, hints: &[(&str, &str)]) -> Result<()> { - // Keep this helper for simple ASCII hint values only. The wire format is the - // existing comma-separated `x-greptime-hints` metadata value and does not - // escape commas inside individual values. let Some(value) = hints .iter() .map(|(k, v)| format!("{}={}", k, v)) @@ -646,6 +672,10 @@ impl Database { .map(OutputWithMetrics::into_output) } + /// Executes a SQL query and returns the output with terminal metrics. + /// + /// For stream outputs, callers must consume the stream before reading final + /// terminal metrics from [`OutputWithMetrics::metrics`]. pub async fn sql_with_terminal_metrics( &self, sql: S, @@ -654,36 +684,33 @@ impl Database { where S: AsRef, { - self.query_with_terminal_metrics( + self.query_with_terminal_metrics_and_flow_extensions( QueryRequest { query: Some(Query::Sql(sql.as_ref().to_string())), }, hints, + &[], ) .await } /// Executes a logical plan directly without SQL parsing. pub async fn logical_plan(&self, logical_plan: Vec) -> Result { - self.query_with_terminal_metrics( + self.query_with_terminal_metrics_and_flow_extensions( QueryRequest { query: Some(Query::LogicalPlan(logical_plan)), }, &[], + &[], ) .await .map(OutputWithMetrics::into_output) } - pub async fn query_with_terminal_metrics( - &self, - request: QueryRequest, - hints: &[(&str, &str)], - ) -> Result { - self.query_with_terminal_metrics_and_flow_extensions(request, hints, &[]) - .await - } - + /// Executes a query and carries flow extensions through Flight metadata. + /// + /// This is the lower-level terminal-metrics API for Flow callers that need + /// to pass JSON-bearing flow extensions without going through hint metadata. pub async fn query_with_terminal_metrics_and_flow_extensions( &self, request: QueryRequest, @@ -864,10 +891,14 @@ mod tests { } fn terminal_metrics_json() -> String { + terminal_metrics_json_with_seq(42) + } + + fn terminal_metrics_json_with_seq(seq: u64) -> String { serde_json::to_string(&RecordBatchMetrics { region_watermarks: vec![common_recordbatch::adapter::RegionWatermarkEntry { region_id: 7, - watermark: Some(42), + watermark: Some(seq), }], ..Default::default() }) @@ -1071,6 +1102,55 @@ mod tests { assert!(terminal_metrics.get().is_none()); } + #[tokio::test] + async fn test_record_batch_stream_continues_after_partial_metrics() { + let schema = Arc::new(Schema::new(vec![ColumnSchema::new( + "v", + ConcreteDataType::int32_datatype(), + false, + )])); + let first_batch = RecordBatch::new( + schema.clone(), + vec![Arc::new(Int32Vector::from_slice([1])) as VectorRef], + ) + .unwrap(); + let second_batch = RecordBatch::new( + schema.clone(), + vec![Arc::new(Int32Vector::from_slice([2])) as VectorRef], + ) + .unwrap(); + let output = output_from_flight_message_stream(futures_util::stream::iter(vec![ + Ok(FlightMessage::Schema(schema.arrow_schema().clone())), + Ok(FlightMessage::RecordBatch( + first_batch.into_df_record_batch(), + )), + Ok(FlightMessage::Metrics(terminal_metrics_json_with_seq(1))), + Ok(FlightMessage::RecordBatch( + second_batch.into_df_record_batch(), + )), + Ok(FlightMessage::Metrics(terminal_metrics_json_with_seq(2))), + ] + as Vec>)) + .await + .unwrap(); + let terminal_metrics = output.metrics.clone(); + let OutputData::Stream(mut record_batch_stream) = output.output.data else { + panic!("expected stream output"); + }; + + let first_batch = record_batch_stream.next().await.unwrap().unwrap(); + assert_eq!(first_batch.num_rows(), 1); + let second_batch = record_batch_stream.next().await.unwrap().unwrap(); + assert_eq!(second_batch.num_rows(), 1); + assert!(record_batch_stream.next().await.is_none()); + + assert!(terminal_metrics.is_ready()); + assert_eq!( + terminal_metrics.region_watermark_map(), + Some(std::collections::HashMap::from([(7, 2)])) + ); + } + #[test] fn test_output_metrics_distinguishes_empty_region_watermarks_from_absence() { let metrics = OutputMetrics::default(); diff --git a/src/common/grpc/src/flight.rs b/src/common/grpc/src/flight.rs index f09400d9b4..9d3bf65da1 100644 --- a/src/common/grpc/src/flight.rs +++ b/src/common/grpc/src/flight.rs @@ -37,6 +37,9 @@ use vec1::{Vec1, vec1}; use crate::error; use crate::error::{DecodeFlightDataSnafu, InvalidFlightDataSnafu, Result}; +/// Flight metadata key used to carry flow query extensions as JSON pairs. +pub const FLOW_EXTENSIONS_METADATA_KEY: &str = "x-greptime-flow-extensions"; + #[derive(Debug, Clone)] pub enum FlightMessage { Schema(SchemaRef), diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 29dc0f4e03..f16c83a84b 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -774,7 +774,7 @@ struct RegionWatermarkStream { stream: SendableRecordBatchStream, region_id: u64, snapshot_seq: u64, - finished: AtomicBool, + finished: bool, } impl RegionWatermarkStream { @@ -783,28 +783,25 @@ impl RegionWatermarkStream { stream, region_id: region_id.as_u64(), snapshot_seq, - finished: AtomicBool::new(false), + finished: false, } } fn merged_metrics(&self, mut metrics: RecordBatchMetrics) -> RecordBatchMetrics { - let entry = if let Some(entry) = metrics + if metrics .region_watermarks - .iter_mut() - .find(|entry| entry.region_id == self.region_id) + .iter() + .any(|entry| entry.region_id == self.region_id) { - entry - } else { - metrics - .region_watermarks - .push(common_recordbatch::adapter::RegionWatermarkEntry { - region_id: self.region_id, - watermark: None, - }); - metrics.region_watermarks.last_mut().unwrap() - }; + return metrics; + } - entry.watermark = Some(self.snapshot_seq); + metrics + .region_watermarks + .push(common_recordbatch::adapter::RegionWatermarkEntry { + region_id: self.region_id, + watermark: Some(self.snapshot_seq), + }); metrics } } @@ -824,7 +821,7 @@ impl RecordBatchStream for RegionWatermarkStream { fn metrics(&self) -> Option { let base = self.stream.metrics(); - if !self.finished.load(Ordering::Relaxed) { + if !self.finished { return base; } @@ -838,7 +835,7 @@ impl Stream for RegionWatermarkStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match Pin::new(&mut self.stream).poll_next(cx) { Poll::Ready(None) => { - self.finished.store(true, Ordering::Relaxed); + self.finished = true; Poll::Ready(None) } other => other, @@ -1771,7 +1768,7 @@ mod tests { use api::v1::SemanticType; use common_error::ext::ErrorExt; use common_recordbatch::RecordBatches; - use common_recordbatch::adapter::RegionWatermarkEntry; + use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry}; use datatypes::prelude::{ConcreteDataType, VectorRef}; use datatypes::schema::{ColumnSchema, Schema}; use datatypes::vectors::Int32Vector; @@ -1818,6 +1815,39 @@ mod tests { ); } + #[test] + fn test_region_watermark_stream_preserves_unproved_watermark() { + let schema = Arc::new(Schema::new(vec![ColumnSchema::new( + "v", + ConcreteDataType::int32_datatype(), + false, + )])); + let values: VectorRef = Arc::new(Int32Vector::from_slice([1])); + let batch = RecordBatch::new(schema.clone(), vec![values]).unwrap(); + let stream = RecordBatches::try_new(schema, vec![batch]) + .unwrap() + .as_stream(); + + let region_id = RegionId::new(42, 7); + let wrapped = RegionWatermarkStream::new(stream, region_id, 99); + let metrics = RecordBatchMetrics { + region_watermarks: vec![RegionWatermarkEntry { + region_id: region_id.as_u64(), + watermark: None, + }], + ..Default::default() + }; + + let merged = wrapped.merged_metrics(metrics); + assert_eq!( + merged.region_watermarks, + vec![RegionWatermarkEntry { + region_id: region_id.as_u64(), + watermark: None, + }] + ); + } + #[tokio::test] async fn test_region_registering() { common_telemetry::init_default_ut_logging(); diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs index 6fd80d12de..9875564c78 100644 --- a/src/flow/src/batching_mode/frontend_client.rs +++ b/src/flow/src/batching_mode/frontend_client.rs @@ -20,16 +20,15 @@ use std::sync::{Arc, Mutex, Weak}; use api::v1::greptime_request::Request; use api::v1::query_request::Query; use api::v1::{CreateTableExpr, QueryRequest}; -use client::{Client, Database, OutputWithMetrics}; +use client::{Client, Database}; use common_error::ext::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager, load_client_tls_config}; use common_meta::peer::{Peer, PeerDiscovery}; -use common_query::{Output, OutputData}; +use common_query::Output; use common_telemetry::warn; use meta_client::client::MetaClient; use query::datafusion::QUERY_PARALLELISM_HINT; -use query::metrics::terminal_recordbatch_metrics_from_plan; -use query::options::{FlowQueryExtensions, QueryOptions}; +use query::options::QueryOptions; use rand::rng; use rand::seq::SliceRandom; use servers::query_handler::grpc::GrpcQueryHandler; @@ -343,78 +342,6 @@ impl FrontendClient { } } - pub async fn query_with_terminal_metrics( - &self, - catalog: &str, - schema: &str, - request: QueryRequest, - extensions: &[(&str, &str)], - ) -> Result { - let flow_extensions = build_flow_extensions(extensions)?; - match self { - FrontendClient::Distributed { - query, batch_opts, .. - } => { - let query_parallelism = query.parallelism.to_string(); - let hints = vec![ - (QUERY_PARALLELISM_HINT, query_parallelism.as_str()), - (READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()), - ]; - let db = self.get_random_active_frontend(catalog, schema).await?; - db.database - .query_with_terminal_metrics_and_flow_extensions(request, &hints, extensions) - .await - .map_err(BoxedError::new) - .context(ExternalSnafu) - } - FrontendClient::Standalone { - database_client, - query, - } => { - let mut extensions_map = HashMap::from([( - QUERY_PARALLELISM_HINT.to_string(), - query.parallelism.to_string(), - )]); - for (key, value) in extensions { - extensions_map.insert((*key).to_string(), (*value).to_string()); - } - let ctx = QueryContextBuilder::default() - .current_catalog(catalog.to_string()) - .current_schema(schema.to_string()) - .extensions(extensions_map) - .build(); - let ctx = Arc::new(ctx); - let database_client = { - database_client - .handler - .lock() - .map_err(|e| { - UnexpectedSnafu { - reason: format!("Failed to lock database client: {e}"), - } - .build() - })? - .as_ref() - .context(UnexpectedSnafu { - reason: "Standalone's frontend instance is not set", - })? - .upgrade() - .context(UnexpectedSnafu { - reason: "Failed to upgrade database client", - })? - }; - database_client - .do_query(Request::Query(request), ctx) - .await - .map(|output| { - wrap_standalone_output_with_terminal_metrics(output, &flow_extensions) - }) - .map_err(BoxedError::new) - .context(ExternalSnafu) - } - } - } - /// Handle a request to frontend pub(crate) async fn handle( &self, @@ -505,40 +432,6 @@ impl FrontendClient { } } -fn build_flow_extensions(extensions: &[(&str, &str)]) -> Result { - let flow_extensions = HashMap::from_iter( - extensions - .iter() - .map(|(key, value)| ((*key).to_string(), (*value).to_string())), - ); - FlowQueryExtensions::parse_flow_extensions(&flow_extensions) - .map_err(BoxedError::new) - .context(ExternalSnafu) - .map(|extensions| extensions.unwrap_or_default()) -} - -fn wrap_standalone_output_with_terminal_metrics( - output: Output, - flow_extensions: &FlowQueryExtensions, -) -> OutputWithMetrics { - let should_collect_region_watermark = flow_extensions.should_collect_region_watermark(); - let terminal_metrics = - if should_collect_region_watermark && !matches!(&output.data, OutputData::Stream(_)) { - output - .meta - .plan - .clone() - .and_then(terminal_recordbatch_metrics_from_plan) - } else { - None - }; - let result = OutputWithMetrics::from_output(output); - if let Some(metrics) = terminal_metrics { - result.metrics.update(Some(metrics)); - } - result -} - /// Describe a peer of frontend #[derive(Debug, Default)] pub(crate) enum PeerDesc { @@ -563,17 +456,9 @@ impl std::fmt::Display for PeerDesc { #[cfg(test)] mod tests { - use std::pin::Pin; - use std::task::{Context, Poll}; use std::time::Duration; - use common_query::{Output, OutputData}; - use common_recordbatch::adapter::RecordBatchMetrics; - use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream}; - use datatypes::prelude::{ConcreteDataType, VectorRef}; - use datatypes::schema::{ColumnSchema, Schema}; - use datatypes::vectors::Int32Vector; - use futures::StreamExt; + use common_query::Output; use tokio::time::timeout; use super::*; @@ -581,55 +466,6 @@ mod tests { #[derive(Debug)] struct NoopHandler; - struct MockMetricsStream { - schema: datatypes::schema::SchemaRef, - batch: Option, - metrics: RecordBatchMetrics, - terminal_metrics_only: bool, - } - - impl futures::Stream for MockMetricsStream { - type Item = common_recordbatch::error::Result; - - fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(self.batch.take().map(Ok)) - } - - fn size_hint(&self) -> (usize, Option) { - ( - usize::from(self.batch.is_some()), - Some(usize::from(self.batch.is_some())), - ) - } - } - - impl RecordBatchStream for MockMetricsStream { - fn name(&self) -> &str { - "MockMetricsStream" - } - - fn schema(&self) -> datatypes::schema::SchemaRef { - self.schema.clone() - } - - fn output_ordering(&self) -> Option<&[OrderOption]> { - None - } - - fn metrics(&self) -> Option { - if self.terminal_metrics_only && self.batch.is_some() { - return None; - } - Some(self.metrics.clone()) - } - } - - #[derive(Debug)] - struct MetricsHandler; - - #[derive(Debug)] - struct ExtensionAwareHandler; - #[async_trait::async_trait] impl GrpcQueryHandlerWithBoxedError for NoopHandler { async fn do_query( @@ -641,50 +477,6 @@ mod tests { } } - #[async_trait::async_trait] - impl GrpcQueryHandlerWithBoxedError for MetricsHandler { - async fn do_query( - &self, - _query: Request, - _ctx: QueryContextRef, - ) -> std::result::Result { - let schema = Arc::new(Schema::new(vec![ColumnSchema::new( - "v", - ConcreteDataType::int32_datatype(), - false, - )])); - let batch = RecordBatch::new( - schema.clone(), - vec![Arc::new(Int32Vector::from_slice([1, 2])) as VectorRef], - ) - .unwrap(); - Ok(Output::new_with_stream(Box::pin(MockMetricsStream { - schema, - batch: Some(batch), - metrics: RecordBatchMetrics { - region_watermarks: vec![common_recordbatch::adapter::RegionWatermarkEntry { - region_id: 42, - watermark: Some(99), - }], - ..Default::default() - }, - terminal_metrics_only: true, - }))) - } - } - - #[async_trait::async_trait] - impl GrpcQueryHandlerWithBoxedError for ExtensionAwareHandler { - async fn do_query( - &self, - _query: Request, - ctx: QueryContextRef, - ) -> std::result::Result { - assert_eq!(ctx.extension("flow.return_region_seq"), Some("true")); - Ok(Output::new_with_affected_rows(1)) - } - } - #[tokio::test] async fn wait_initialized() { let (client, handler_mut) = @@ -730,81 +522,4 @@ mod tests { .is_ok() ); } - - #[tokio::test] - async fn test_query_with_terminal_metrics_tracks_watermark_in_standalone_mode() { - let handler: Arc = Arc::new(MetricsHandler); - let client = - FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default()); - - let result = client - .query_with_terminal_metrics( - "greptime", - "public", - QueryRequest { - query: Some(Query::Sql("select 1".to_string())), - }, - &[], - ) - .await - .unwrap(); - - let terminal_metrics = result.metrics.clone(); - assert!(!result.metrics.is_ready()); - assert!(terminal_metrics.get().is_none()); - - let OutputData::Stream(mut stream) = result.output.data else { - panic!("expected stream output"); - }; - while stream.next().await.is_some() {} - - assert!(terminal_metrics.is_ready()); - assert_eq!( - terminal_metrics.region_watermark_map(), - Some(HashMap::from([(42_u64, 99_u64)])) - ); - } - - #[tokio::test] - async fn test_query_with_terminal_metrics_forwards_flow_extensions_in_standalone_mode() { - let handler: Arc = Arc::new(ExtensionAwareHandler); - let client = - FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default()); - - let result = client - .query_with_terminal_metrics( - "greptime", - "public", - QueryRequest { - query: Some(Query::Sql("insert into t select 1".to_string())), - }, - &[("flow.return_region_seq", "true")], - ) - .await - .unwrap(); - - assert!(result.metrics.is_ready()); - assert!(result.region_watermark_map().is_none()); - } - - #[tokio::test] - async fn test_query_with_terminal_metrics_rejects_invalid_flow_extensions() { - let handler: Arc = Arc::new(NoopHandler); - let client = - FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default()); - - let err = client - .query_with_terminal_metrics( - "greptime", - "public", - QueryRequest { - query: Some(Query::Sql("select 1".to_string())), - }, - &[("flow.return_region_seq", "not-a-bool")], - ) - .await - .unwrap_err(); - - assert!(format!("{err:?}").contains("Invalid value for flow.return_region_seq")); - } } diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 2d2232080d..a173f2aee8 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -60,8 +60,10 @@ use crate::error::{ TableNotFoundSnafu, TableReadOnlySnafu, UnsupportedExprSnafu, }; use crate::executor::QueryExecutor; -use crate::metrics::{OnDone, QUERY_STAGE_ELAPSED, RegionWatermarkMetricsStream}; -use crate::options::FlowQueryExtensions; +use crate::metrics::{ + OnDone, QUERY_STAGE_ELAPSED, maybe_attach_region_watermark_metrics, + should_collect_region_watermark_from_query_ctx, +}; use crate::physical_wrapper::PhysicalPlanWrapperRef; use crate::planner::{DfLogicalPlanner, LogicalPlanner}; use crate::query_engine::{DescribeResult, QueryEngineContext, QueryEngineState}; @@ -547,10 +549,10 @@ impl QueryExecutor for DatafusionQueryEngine { ctx: &QueryEngineContext, plan: &Arc, ) -> Result { - let explain_verbose = ctx.query_ctx().explain_verbose(); + let query_ctx = ctx.query_ctx(); + let explain_verbose = query_ctx.explain_verbose(); let should_collect_region_watermark = - FlowQueryExtensions::parse_flow_extensions(&ctx.query_ctx().extensions())? - .is_some_and(|extensions| extensions.should_collect_region_watermark()); + should_collect_region_watermark_from_query_ctx(&query_ctx)?; let output_partitions = plan.properties().output_partitioning().partition_count(); if explain_verbose { common_telemetry::info!("Executing query plan, output_partitions: {output_partitions}"); @@ -586,14 +588,11 @@ impl QueryExecutor for DatafusionQueryEngine { ); } }); - if should_collect_region_watermark { - Ok(Box::pin(RegionWatermarkMetricsStream::new( - Box::pin(stream), - plan.clone(), - ))) - } else { - Ok(Box::pin(stream)) - } + Ok(maybe_attach_region_watermark_metrics( + Box::pin(stream), + plan.clone(), + should_collect_region_watermark, + )) } _ => { // merge into a single partition @@ -612,7 +611,7 @@ impl QueryExecutor for DatafusionQueryEngine { .map_err(BoxedError::new) .context(QueryExecutionSnafu)?; stream.set_metrics2(plan.clone()); - stream.set_explain_verbose(ctx.query_ctx().explain_verbose()); + stream.set_explain_verbose(explain_verbose); let stream = OnDone::new(Box::pin(stream), move || { let exec_cost = exec_timer.stop_and_record(); if explain_verbose { @@ -622,14 +621,11 @@ impl QueryExecutor for DatafusionQueryEngine { ); } }); - if should_collect_region_watermark { - Ok(Box::pin(RegionWatermarkMetricsStream::new( - Box::pin(stream), - plan.clone(), - ))) - } else { - Ok(Box::pin(stream)) - } + Ok(maybe_attach_region_watermark_metrics( + Box::pin(stream), + plan.clone(), + should_collect_region_watermark, + )) } } } diff --git a/src/query/src/metrics.rs b/src/query/src/metrics.rs index 02bfd43b5d..7b517354fe 100644 --- a/src/query/src/metrics.rs +++ b/src/query/src/metrics.rs @@ -26,8 +26,11 @@ use futures::Stream; use futures_util::ready; use lazy_static::lazy_static; use prometheus::*; +use session::context::QueryContextRef; use crate::dist_plan::MergeScanExec; +use crate::error::Result; +use crate::options::FlowQueryExtensions; /// Intermediate merge state for one participating region while collecting /// terminal correctness watermarks across merge-scan sub-stages. @@ -201,6 +204,27 @@ impl Stream for RegionWatermarkMetricsStream { } } +/// Returns whether terminal region watermark metrics should be collected for the query context. +pub fn should_collect_region_watermark_from_query_ctx(query_ctx: &QueryContextRef) -> Result { + Ok( + FlowQueryExtensions::parse_flow_extensions(&query_ctx.extensions())? + .is_some_and(|extensions| extensions.should_collect_region_watermark()), + ) +} + +/// Attaches terminal region watermark metrics to `stream` when collection is requested. +pub fn maybe_attach_region_watermark_metrics( + stream: SendableRecordBatchStream, + plan: Arc, + should_collect_region_watermark: bool, +) -> SendableRecordBatchStream { + if should_collect_region_watermark { + Box::pin(RegionWatermarkMetricsStream::new(stream, plan)) + } else { + stream + } +} + pub fn terminal_recordbatch_metrics_from_plan( plan: Arc, ) -> Option { @@ -215,6 +239,18 @@ pub fn terminal_recordbatch_metrics_from_plan( } } +/// Collects terminal record-batch metrics from `plan` only when requested. +pub fn terminal_recordbatch_metrics_from_plan_if_requested( + plan: Option>, + should_collect_region_watermark: bool, +) -> Option { + if should_collect_region_watermark { + plan.and_then(terminal_recordbatch_metrics_from_plan) + } else { + None + } +} + fn collect_region_watermarks(plan: Arc) -> Vec { let mut merged = BTreeMap::::new(); let mut stack = vec![plan]; @@ -240,6 +276,10 @@ fn merge_region_watermark_entries( merged: &mut BTreeMap, entries: impl IntoIterator, ) { + // Merge by correctness rather than by maximum/minimum sequence. Any + // explicit unproved entry (`None`) vetoes a proved watermark, and + // conflicting proofs degrade the region back to unproved so Flow cannot + // advance checkpoints from ambiguous terminal metrics. for entry in entries { merged .entry(entry.region_id) @@ -281,6 +321,9 @@ fn merge_merge_scan_region_watermarks( regions: impl IntoIterator, sub_stage_metrics: impl IntoIterator, ) { + // Regions listed by MergeScanExec participated even when no sub-stage can + // prove a watermark. Keep them as explicit `None` entries so callers can + // distinguish unproved participation from non-participation. for region_id in regions { merged.entry(region_id).or_insert(MergeState::Participated); } diff --git a/src/servers/src/grpc/flight.rs b/src/servers/src/grpc/flight.rs index c85019a99c..a2ff787d1b 100644 --- a/src/servers/src/grpc/flight.rs +++ b/src/servers/src/grpc/flight.rs @@ -28,7 +28,9 @@ use async_trait::async_trait; use bytes::{self, Bytes}; use common_error::ext::ErrorExt; use common_grpc::flight::do_put::{DoPutMetadata, DoPutResponse}; -use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage}; +use common_grpc::flight::{ + FLOW_EXTENSIONS_METADATA_KEY, FlightDecoder, FlightEncoder, FlightMessage, +}; use common_memory_manager::MemoryGuard; use common_query::{Output, OutputData}; use common_recordbatch::DfRecordBatch; @@ -39,7 +41,7 @@ use datatypes::arrow::datatypes::SchemaRef; use futures::{Stream, future, ready}; use futures_util::{StreamExt, TryStreamExt}; use prost::Message; -use query::metrics::terminal_recordbatch_metrics_from_plan; +use query::metrics::terminal_recordbatch_metrics_from_plan_if_requested; use query::options::FlowQueryExtensions; use session::context::{Channel, QueryContextRef}; use snafu::{IntoError, ResultExt, ensure}; @@ -58,8 +60,6 @@ use crate::request_memory_limiter::ServerMemoryLimiter; use crate::request_memory_metrics::RequestMemoryMetrics; use crate::{error, hint_headers}; -const FLOW_EXTENSIONS_METADATA_KEY: &str = "x-greptime-flow-extensions"; - pub type TonicStream = Pin> + Send + 'static>>; /// A subset of [FlightService] @@ -204,8 +204,11 @@ impl FlightCraft for GreptimeRequestHandler { // Validate flow hint syntax at the transport boundary before dispatching the request. // This does not authorize or execute anything; `handle_request()` below still performs // the normal frontend handling and auth checks before query execution. - FlowQueryExtensions::parse_flow_extensions(&query_ctx.extensions()) + let flow_extensions = FlowQueryExtensions::parse_flow_extensions(&query_ctx.extensions()) .map_err(|e| Status::invalid_argument(e.output_msg()))?; + let should_emit_terminal_metrics = flow_extensions + .as_ref() + .is_some_and(|extensions| extensions.should_collect_region_watermark()); // The Grpc protocol pass query by Flight. It needs to be wrapped under a span, in order to record stream let span = info_span!( @@ -221,6 +224,7 @@ impl FlightCraft for GreptimeRequestHandler { TracingContext::from_current_span(), flight_compression, query_ctx, + should_emit_terminal_metrics, ); Ok(Response::new(stream)) } @@ -552,6 +556,7 @@ fn to_flight_data_stream( tracing_context: TracingContext, flight_compression: FlightCompression, query_ctx: QueryContextRef, + should_emit_terminal_metrics: bool, ) -> TonicStream { match output.data { OutputData::Stream(stream) => { @@ -573,15 +578,11 @@ fn to_flight_data_stream( Box::pin(stream) as _ } OutputData::AffectedRows(rows) => { - let should_emit_terminal_metrics = - FlowQueryExtensions::parse_flow_extensions(&query_ctx.extensions()) - .expect("flow extensions must be validated before Flight serialization") - .is_some_and(|extensions| extensions.should_collect_region_watermark()); - let terminal_metrics = should_emit_terminal_metrics - .then_some(output.meta.plan) - .flatten() - .and_then(terminal_recordbatch_metrics_from_plan) - .and_then(|metrics| serde_json::to_string(&metrics).ok()); + let terminal_metrics = terminal_recordbatch_metrics_from_plan_if_requested( + output.meta.plan, + should_emit_terminal_metrics, + ) + .and_then(|metrics| serde_json::to_string(&metrics).ok()); let affected_rows = FlightEncoder::default().encode(FlightMessage::AffectedRows { rows, metrics: terminal_metrics,