refactor: per review

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-04-30 20:32:44 +08:00
parent f0e16c2d4c
commit ef46cd856a
7 changed files with 237 additions and 369 deletions

View File

@@ -36,7 +36,7 @@ use common_catalog::build_db_string;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_grpc::flight::do_put::DoPutResponse;
use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_grpc::flight::{FLOW_EXTENSIONS_METADATA_KEY, FlightDecoder, FlightMessage};
use common_query::Output;
use common_recordbatch::adapter::RecordBatchMetrics;
use common_recordbatch::error::ExternalSnafu;
@@ -61,12 +61,21 @@ type FlightDataStream = Pin<Box<dyn Stream<Item = FlightData> + Send>>;
type DoPutResponseStream = Pin<Box<dyn Stream<Item = Result<DoPutResponse>>>>;
const FLOW_EXTENSIONS_METADATA_KEY: &str = "x-greptime-flow-extensions";
/// Terminal metrics associated with a query output.
///
/// For streaming outputs, metrics are only final after the stream is fully
/// drained and [`Self::is_ready`] returns `true`. Region watermark helpers keep
/// the RFC distinction between proved regions, unproved participating regions,
/// and non-participating regions.
#[derive(Debug, Clone, Default)]
pub struct OutputMetrics {
metrics: Arc<RwLock<Option<RecordBatchMetrics>>>,
ready: Arc<AtomicBool>,
inner: Arc<OutputMetricsInner>,
}
#[derive(Debug, Default)]
struct OutputMetricsInner {
metrics: RwLock<Option<RecordBatchMetrics>>,
ready: AtomicBool,
}
impl OutputMetrics {
@@ -74,20 +83,29 @@ impl OutputMetrics {
Self::default()
}
/// Replaces the current terminal metrics snapshot.
pub fn update(&self, metrics: Option<RecordBatchMetrics>) {
*self.metrics.write().expect("metrics lock poisoned") = metrics;
*self.inner.metrics.write().unwrap() = metrics;
}
/// Marks the terminal metrics as final for this output.
pub fn mark_ready(&self) {
self.ready.store(true, Ordering::Release);
let _ = self
.inner
.ready
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire);
}
/// Returns whether terminal metrics are final.
///
/// Streaming outputs become ready only after the stream reaches EOF.
pub fn is_ready(&self) -> bool {
self.ready.load(Ordering::Acquire)
self.inner.ready.load(Ordering::Acquire)
}
/// Returns the latest terminal metrics snapshot, if any.
pub fn get(&self) -> Option<RecordBatchMetrics> {
self.metrics.read().expect("metrics lock poisoned").clone()
self.inner.metrics.read().unwrap().clone()
}
/// Returns proved per-region watermarks.
@@ -118,6 +136,11 @@ impl OutputMetrics {
}
}
/// Query output together with a handle for its terminal metrics.
///
/// The contained [`OutputMetrics`] lets callers read stream terminal metrics
/// after consuming `output`. For non-stream outputs, metrics are ready
/// immediately.
#[derive(Debug)]
pub struct OutputWithMetrics {
pub output: Output,
@@ -125,6 +148,10 @@ pub struct OutputWithMetrics {
}
impl OutputWithMetrics {
/// Wraps an output with a terminal metrics handle.
///
/// Stream outputs update the handle as the stream is consumed. Non-stream
/// outputs are marked ready immediately.
pub fn from_output(output: Output) -> Self {
let terminal_metrics = OutputMetrics::new();
let output = attach_terminal_metrics(output, &terminal_metrics);
@@ -134,14 +161,17 @@ impl OutputWithMetrics {
}
}
/// Returns proved per-region watermarks from the terminal metrics.
pub fn region_watermark_map(&self) -> Option<std::collections::HashMap<u64, u64>> {
self.metrics.region_watermark_map()
}
/// Returns all regions participating in terminal metric collection.
pub fn participating_regions(&self) -> Option<std::collections::BTreeSet<u64>> {
self.metrics.participating_regions()
}
/// Drops the terminal metrics handle and returns the original output.
pub fn into_output(self) -> Output {
self.output
}
@@ -320,7 +350,6 @@ where
yield Err(BoxedError::new(e)).context(ExternalSnafu);
}
};
break;
}
FlightMessage::AffectedRows { .. } | FlightMessage::Schema(_) => {
yield IllegalFlightMessagesSnafu {reason: format!("A Schema message must be succeeded exclusively by a set of RecordBatch messages, flight_message: {:?}", flight_message)}
@@ -513,9 +542,6 @@ impl Database {
}
fn put_hints(metadata: &mut MetadataMap, hints: &[(&str, &str)]) -> Result<()> {
// Keep this helper for simple ASCII hint values only. The wire format is the
// existing comma-separated `x-greptime-hints` metadata value and does not
// escape commas inside individual values.
let Some(value) = hints
.iter()
.map(|(k, v)| format!("{}={}", k, v))
@@ -646,6 +672,10 @@ impl Database {
.map(OutputWithMetrics::into_output)
}
/// Executes a SQL query and returns the output with terminal metrics.
///
/// For stream outputs, callers must consume the stream before reading final
/// terminal metrics from [`OutputWithMetrics::metrics`].
pub async fn sql_with_terminal_metrics<S>(
&self,
sql: S,
@@ -654,36 +684,33 @@ impl Database {
where
S: AsRef<str>,
{
self.query_with_terminal_metrics(
self.query_with_terminal_metrics_and_flow_extensions(
QueryRequest {
query: Some(Query::Sql(sql.as_ref().to_string())),
},
hints,
&[],
)
.await
}
/// Executes a logical plan directly without SQL parsing.
pub async fn logical_plan(&self, logical_plan: Vec<u8>) -> Result<Output> {
self.query_with_terminal_metrics(
self.query_with_terminal_metrics_and_flow_extensions(
QueryRequest {
query: Some(Query::LogicalPlan(logical_plan)),
},
&[],
&[],
)
.await
.map(OutputWithMetrics::into_output)
}
pub async fn query_with_terminal_metrics(
&self,
request: QueryRequest,
hints: &[(&str, &str)],
) -> Result<OutputWithMetrics> {
self.query_with_terminal_metrics_and_flow_extensions(request, hints, &[])
.await
}
/// Executes a query and carries flow extensions through Flight metadata.
///
/// This is the lower-level terminal-metrics API for Flow callers that need
/// to pass JSON-bearing flow extensions without going through hint metadata.
pub async fn query_with_terminal_metrics_and_flow_extensions(
&self,
request: QueryRequest,
@@ -864,10 +891,14 @@ mod tests {
}
fn terminal_metrics_json() -> String {
terminal_metrics_json_with_seq(42)
}
fn terminal_metrics_json_with_seq(seq: u64) -> String {
serde_json::to_string(&RecordBatchMetrics {
region_watermarks: vec![common_recordbatch::adapter::RegionWatermarkEntry {
region_id: 7,
watermark: Some(42),
watermark: Some(seq),
}],
..Default::default()
})
@@ -1071,6 +1102,55 @@ mod tests {
assert!(terminal_metrics.get().is_none());
}
#[tokio::test]
async fn test_record_batch_stream_continues_after_partial_metrics() {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"v",
ConcreteDataType::int32_datatype(),
false,
)]));
let first_batch = RecordBatch::new(
schema.clone(),
vec![Arc::new(Int32Vector::from_slice([1])) as VectorRef],
)
.unwrap();
let second_batch = RecordBatch::new(
schema.clone(),
vec![Arc::new(Int32Vector::from_slice([2])) as VectorRef],
)
.unwrap();
let output = output_from_flight_message_stream(futures_util::stream::iter(vec![
Ok(FlightMessage::Schema(schema.arrow_schema().clone())),
Ok(FlightMessage::RecordBatch(
first_batch.into_df_record_batch(),
)),
Ok(FlightMessage::Metrics(terminal_metrics_json_with_seq(1))),
Ok(FlightMessage::RecordBatch(
second_batch.into_df_record_batch(),
)),
Ok(FlightMessage::Metrics(terminal_metrics_json_with_seq(2))),
]
as Vec<Result<FlightMessage>>))
.await
.unwrap();
let terminal_metrics = output.metrics.clone();
let OutputData::Stream(mut record_batch_stream) = output.output.data else {
panic!("expected stream output");
};
let first_batch = record_batch_stream.next().await.unwrap().unwrap();
assert_eq!(first_batch.num_rows(), 1);
let second_batch = record_batch_stream.next().await.unwrap().unwrap();
assert_eq!(second_batch.num_rows(), 1);
assert!(record_batch_stream.next().await.is_none());
assert!(terminal_metrics.is_ready());
assert_eq!(
terminal_metrics.region_watermark_map(),
Some(std::collections::HashMap::from([(7, 2)]))
);
}
#[test]
fn test_output_metrics_distinguishes_empty_region_watermarks_from_absence() {
let metrics = OutputMetrics::default();

View File

@@ -37,6 +37,9 @@ use vec1::{Vec1, vec1};
use crate::error;
use crate::error::{DecodeFlightDataSnafu, InvalidFlightDataSnafu, Result};
/// Flight metadata key used to carry flow query extensions as JSON pairs.
pub const FLOW_EXTENSIONS_METADATA_KEY: &str = "x-greptime-flow-extensions";
#[derive(Debug, Clone)]
pub enum FlightMessage {
Schema(SchemaRef),

View File

@@ -774,7 +774,7 @@ struct RegionWatermarkStream {
stream: SendableRecordBatchStream,
region_id: u64,
snapshot_seq: u64,
finished: AtomicBool,
finished: bool,
}
impl RegionWatermarkStream {
@@ -783,28 +783,25 @@ impl RegionWatermarkStream {
stream,
region_id: region_id.as_u64(),
snapshot_seq,
finished: AtomicBool::new(false),
finished: false,
}
}
fn merged_metrics(&self, mut metrics: RecordBatchMetrics) -> RecordBatchMetrics {
let entry = if let Some(entry) = metrics
if metrics
.region_watermarks
.iter_mut()
.find(|entry| entry.region_id == self.region_id)
.iter()
.any(|entry| entry.region_id == self.region_id)
{
entry
} else {
metrics
.region_watermarks
.push(common_recordbatch::adapter::RegionWatermarkEntry {
region_id: self.region_id,
watermark: None,
});
metrics.region_watermarks.last_mut().unwrap()
};
return metrics;
}
entry.watermark = Some(self.snapshot_seq);
metrics
.region_watermarks
.push(common_recordbatch::adapter::RegionWatermarkEntry {
region_id: self.region_id,
watermark: Some(self.snapshot_seq),
});
metrics
}
}
@@ -824,7 +821,7 @@ impl RecordBatchStream for RegionWatermarkStream {
fn metrics(&self) -> Option<RecordBatchMetrics> {
let base = self.stream.metrics();
if !self.finished.load(Ordering::Relaxed) {
if !self.finished {
return base;
}
@@ -838,7 +835,7 @@ impl Stream for RegionWatermarkStream {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Ready(None) => {
self.finished.store(true, Ordering::Relaxed);
self.finished = true;
Poll::Ready(None)
}
other => other,
@@ -1771,7 +1768,7 @@ mod tests {
use api::v1::SemanticType;
use common_error::ext::ErrorExt;
use common_recordbatch::RecordBatches;
use common_recordbatch::adapter::RegionWatermarkEntry;
use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry};
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::Int32Vector;
@@ -1818,6 +1815,39 @@ mod tests {
);
}
#[test]
fn test_region_watermark_stream_preserves_unproved_watermark() {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"v",
ConcreteDataType::int32_datatype(),
false,
)]));
let values: VectorRef = Arc::new(Int32Vector::from_slice([1]));
let batch = RecordBatch::new(schema.clone(), vec![values]).unwrap();
let stream = RecordBatches::try_new(schema, vec![batch])
.unwrap()
.as_stream();
let region_id = RegionId::new(42, 7);
let wrapped = RegionWatermarkStream::new(stream, region_id, 99);
let metrics = RecordBatchMetrics {
region_watermarks: vec![RegionWatermarkEntry {
region_id: region_id.as_u64(),
watermark: None,
}],
..Default::default()
};
let merged = wrapped.merged_metrics(metrics);
assert_eq!(
merged.region_watermarks,
vec![RegionWatermarkEntry {
region_id: region_id.as_u64(),
watermark: None,
}]
);
}
#[tokio::test]
async fn test_region_registering() {
common_telemetry::init_default_ut_logging();

View File

@@ -20,16 +20,15 @@ use std::sync::{Arc, Mutex, Weak};
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{CreateTableExpr, QueryRequest};
use client::{Client, Database, OutputWithMetrics};
use client::{Client, Database};
use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager, load_client_tls_config};
use common_meta::peer::{Peer, PeerDiscovery};
use common_query::{Output, OutputData};
use common_query::Output;
use common_telemetry::warn;
use meta_client::client::MetaClient;
use query::datafusion::QUERY_PARALLELISM_HINT;
use query::metrics::terminal_recordbatch_metrics_from_plan;
use query::options::{FlowQueryExtensions, QueryOptions};
use query::options::QueryOptions;
use rand::rng;
use rand::seq::SliceRandom;
use servers::query_handler::grpc::GrpcQueryHandler;
@@ -343,78 +342,6 @@ impl FrontendClient {
}
}
pub async fn query_with_terminal_metrics(
&self,
catalog: &str,
schema: &str,
request: QueryRequest,
extensions: &[(&str, &str)],
) -> Result<OutputWithMetrics, Error> {
let flow_extensions = build_flow_extensions(extensions)?;
match self {
FrontendClient::Distributed {
query, batch_opts, ..
} => {
let query_parallelism = query.parallelism.to_string();
let hints = vec![
(QUERY_PARALLELISM_HINT, query_parallelism.as_str()),
(READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()),
];
let db = self.get_random_active_frontend(catalog, schema).await?;
db.database
.query_with_terminal_metrics_and_flow_extensions(request, &hints, extensions)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
}
FrontendClient::Standalone {
database_client,
query,
} => {
let mut extensions_map = HashMap::from([(
QUERY_PARALLELISM_HINT.to_string(),
query.parallelism.to_string(),
)]);
for (key, value) in extensions {
extensions_map.insert((*key).to_string(), (*value).to_string());
}
let ctx = QueryContextBuilder::default()
.current_catalog(catalog.to_string())
.current_schema(schema.to_string())
.extensions(extensions_map)
.build();
let ctx = Arc::new(ctx);
let database_client = {
database_client
.handler
.lock()
.map_err(|e| {
UnexpectedSnafu {
reason: format!("Failed to lock database client: {e}"),
}
.build()
})?
.as_ref()
.context(UnexpectedSnafu {
reason: "Standalone's frontend instance is not set",
})?
.upgrade()
.context(UnexpectedSnafu {
reason: "Failed to upgrade database client",
})?
};
database_client
.do_query(Request::Query(request), ctx)
.await
.map(|output| {
wrap_standalone_output_with_terminal_metrics(output, &flow_extensions)
})
.map_err(BoxedError::new)
.context(ExternalSnafu)
}
}
}
/// Handle a request to frontend
pub(crate) async fn handle(
&self,
@@ -505,40 +432,6 @@ impl FrontendClient {
}
}
fn build_flow_extensions(extensions: &[(&str, &str)]) -> Result<FlowQueryExtensions, Error> {
let flow_extensions = HashMap::from_iter(
extensions
.iter()
.map(|(key, value)| ((*key).to_string(), (*value).to_string())),
);
FlowQueryExtensions::parse_flow_extensions(&flow_extensions)
.map_err(BoxedError::new)
.context(ExternalSnafu)
.map(|extensions| extensions.unwrap_or_default())
}
fn wrap_standalone_output_with_terminal_metrics(
output: Output,
flow_extensions: &FlowQueryExtensions,
) -> OutputWithMetrics {
let should_collect_region_watermark = flow_extensions.should_collect_region_watermark();
let terminal_metrics =
if should_collect_region_watermark && !matches!(&output.data, OutputData::Stream(_)) {
output
.meta
.plan
.clone()
.and_then(terminal_recordbatch_metrics_from_plan)
} else {
None
};
let result = OutputWithMetrics::from_output(output);
if let Some(metrics) = terminal_metrics {
result.metrics.update(Some(metrics));
}
result
}
/// Describe a peer of frontend
#[derive(Debug, Default)]
pub(crate) enum PeerDesc {
@@ -563,17 +456,9 @@ impl std::fmt::Display for PeerDesc {
#[cfg(test)]
mod tests {
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use common_query::{Output, OutputData};
use common_recordbatch::adapter::RecordBatchMetrics;
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream};
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::Int32Vector;
use futures::StreamExt;
use common_query::Output;
use tokio::time::timeout;
use super::*;
@@ -581,55 +466,6 @@ mod tests {
#[derive(Debug)]
struct NoopHandler;
struct MockMetricsStream {
schema: datatypes::schema::SchemaRef,
batch: Option<RecordBatch>,
metrics: RecordBatchMetrics,
terminal_metrics_only: bool,
}
impl futures::Stream for MockMetricsStream {
type Item = common_recordbatch::error::Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(self.batch.take().map(Ok))
}
fn size_hint(&self) -> (usize, Option<usize>) {
(
usize::from(self.batch.is_some()),
Some(usize::from(self.batch.is_some())),
)
}
}
impl RecordBatchStream for MockMetricsStream {
fn name(&self) -> &str {
"MockMetricsStream"
}
fn schema(&self) -> datatypes::schema::SchemaRef {
self.schema.clone()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
None
}
fn metrics(&self) -> Option<RecordBatchMetrics> {
if self.terminal_metrics_only && self.batch.is_some() {
return None;
}
Some(self.metrics.clone())
}
}
#[derive(Debug)]
struct MetricsHandler;
#[derive(Debug)]
struct ExtensionAwareHandler;
#[async_trait::async_trait]
impl GrpcQueryHandlerWithBoxedError for NoopHandler {
async fn do_query(
@@ -641,50 +477,6 @@ mod tests {
}
}
#[async_trait::async_trait]
impl GrpcQueryHandlerWithBoxedError for MetricsHandler {
async fn do_query(
&self,
_query: Request,
_ctx: QueryContextRef,
) -> std::result::Result<Output, BoxedError> {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"v",
ConcreteDataType::int32_datatype(),
false,
)]));
let batch = RecordBatch::new(
schema.clone(),
vec![Arc::new(Int32Vector::from_slice([1, 2])) as VectorRef],
)
.unwrap();
Ok(Output::new_with_stream(Box::pin(MockMetricsStream {
schema,
batch: Some(batch),
metrics: RecordBatchMetrics {
region_watermarks: vec![common_recordbatch::adapter::RegionWatermarkEntry {
region_id: 42,
watermark: Some(99),
}],
..Default::default()
},
terminal_metrics_only: true,
})))
}
}
#[async_trait::async_trait]
impl GrpcQueryHandlerWithBoxedError for ExtensionAwareHandler {
async fn do_query(
&self,
_query: Request,
ctx: QueryContextRef,
) -> std::result::Result<Output, BoxedError> {
assert_eq!(ctx.extension("flow.return_region_seq"), Some("true"));
Ok(Output::new_with_affected_rows(1))
}
}
#[tokio::test]
async fn wait_initialized() {
let (client, handler_mut) =
@@ -730,81 +522,4 @@ mod tests {
.is_ok()
);
}
#[tokio::test]
async fn test_query_with_terminal_metrics_tracks_watermark_in_standalone_mode() {
let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(MetricsHandler);
let client =
FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
let result = client
.query_with_terminal_metrics(
"greptime",
"public",
QueryRequest {
query: Some(Query::Sql("select 1".to_string())),
},
&[],
)
.await
.unwrap();
let terminal_metrics = result.metrics.clone();
assert!(!result.metrics.is_ready());
assert!(terminal_metrics.get().is_none());
let OutputData::Stream(mut stream) = result.output.data else {
panic!("expected stream output");
};
while stream.next().await.is_some() {}
assert!(terminal_metrics.is_ready());
assert_eq!(
terminal_metrics.region_watermark_map(),
Some(HashMap::from([(42_u64, 99_u64)]))
);
}
#[tokio::test]
async fn test_query_with_terminal_metrics_forwards_flow_extensions_in_standalone_mode() {
let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(ExtensionAwareHandler);
let client =
FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
let result = client
.query_with_terminal_metrics(
"greptime",
"public",
QueryRequest {
query: Some(Query::Sql("insert into t select 1".to_string())),
},
&[("flow.return_region_seq", "true")],
)
.await
.unwrap();
assert!(result.metrics.is_ready());
assert!(result.region_watermark_map().is_none());
}
#[tokio::test]
async fn test_query_with_terminal_metrics_rejects_invalid_flow_extensions() {
let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
let client =
FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
let err = client
.query_with_terminal_metrics(
"greptime",
"public",
QueryRequest {
query: Some(Query::Sql("select 1".to_string())),
},
&[("flow.return_region_seq", "not-a-bool")],
)
.await
.unwrap_err();
assert!(format!("{err:?}").contains("Invalid value for flow.return_region_seq"));
}
}

View File

@@ -60,8 +60,10 @@ use crate::error::{
TableNotFoundSnafu, TableReadOnlySnafu, UnsupportedExprSnafu,
};
use crate::executor::QueryExecutor;
use crate::metrics::{OnDone, QUERY_STAGE_ELAPSED, RegionWatermarkMetricsStream};
use crate::options::FlowQueryExtensions;
use crate::metrics::{
OnDone, QUERY_STAGE_ELAPSED, maybe_attach_region_watermark_metrics,
should_collect_region_watermark_from_query_ctx,
};
use crate::physical_wrapper::PhysicalPlanWrapperRef;
use crate::planner::{DfLogicalPlanner, LogicalPlanner};
use crate::query_engine::{DescribeResult, QueryEngineContext, QueryEngineState};
@@ -547,10 +549,10 @@ impl QueryExecutor for DatafusionQueryEngine {
ctx: &QueryEngineContext,
plan: &Arc<dyn ExecutionPlan>,
) -> Result<SendableRecordBatchStream> {
let explain_verbose = ctx.query_ctx().explain_verbose();
let query_ctx = ctx.query_ctx();
let explain_verbose = query_ctx.explain_verbose();
let should_collect_region_watermark =
FlowQueryExtensions::parse_flow_extensions(&ctx.query_ctx().extensions())?
.is_some_and(|extensions| extensions.should_collect_region_watermark());
should_collect_region_watermark_from_query_ctx(&query_ctx)?;
let output_partitions = plan.properties().output_partitioning().partition_count();
if explain_verbose {
common_telemetry::info!("Executing query plan, output_partitions: {output_partitions}");
@@ -586,14 +588,11 @@ impl QueryExecutor for DatafusionQueryEngine {
);
}
});
if should_collect_region_watermark {
Ok(Box::pin(RegionWatermarkMetricsStream::new(
Box::pin(stream),
plan.clone(),
)))
} else {
Ok(Box::pin(stream))
}
Ok(maybe_attach_region_watermark_metrics(
Box::pin(stream),
plan.clone(),
should_collect_region_watermark,
))
}
_ => {
// merge into a single partition
@@ -612,7 +611,7 @@ impl QueryExecutor for DatafusionQueryEngine {
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
stream.set_metrics2(plan.clone());
stream.set_explain_verbose(ctx.query_ctx().explain_verbose());
stream.set_explain_verbose(explain_verbose);
let stream = OnDone::new(Box::pin(stream), move || {
let exec_cost = exec_timer.stop_and_record();
if explain_verbose {
@@ -622,14 +621,11 @@ impl QueryExecutor for DatafusionQueryEngine {
);
}
});
if should_collect_region_watermark {
Ok(Box::pin(RegionWatermarkMetricsStream::new(
Box::pin(stream),
plan.clone(),
)))
} else {
Ok(Box::pin(stream))
}
Ok(maybe_attach_region_watermark_metrics(
Box::pin(stream),
plan.clone(),
should_collect_region_watermark,
))
}
}
}

View File

@@ -26,8 +26,11 @@ use futures::Stream;
use futures_util::ready;
use lazy_static::lazy_static;
use prometheus::*;
use session::context::QueryContextRef;
use crate::dist_plan::MergeScanExec;
use crate::error::Result;
use crate::options::FlowQueryExtensions;
/// Intermediate merge state for one participating region while collecting
/// terminal correctness watermarks across merge-scan sub-stages.
@@ -201,6 +204,27 @@ impl Stream for RegionWatermarkMetricsStream {
}
}
/// Returns whether terminal region watermark metrics should be collected for the query context.
pub fn should_collect_region_watermark_from_query_ctx(query_ctx: &QueryContextRef) -> Result<bool> {
Ok(
FlowQueryExtensions::parse_flow_extensions(&query_ctx.extensions())?
.is_some_and(|extensions| extensions.should_collect_region_watermark()),
)
}
/// Attaches terminal region watermark metrics to `stream` when collection is requested.
pub fn maybe_attach_region_watermark_metrics(
stream: SendableRecordBatchStream,
plan: Arc<dyn ExecutionPlan>,
should_collect_region_watermark: bool,
) -> SendableRecordBatchStream {
if should_collect_region_watermark {
Box::pin(RegionWatermarkMetricsStream::new(stream, plan))
} else {
stream
}
}
pub fn terminal_recordbatch_metrics_from_plan(
plan: Arc<dyn ExecutionPlan>,
) -> Option<RecordBatchMetrics> {
@@ -215,6 +239,18 @@ pub fn terminal_recordbatch_metrics_from_plan(
}
}
/// Collects terminal record-batch metrics from `plan` only when requested.
pub fn terminal_recordbatch_metrics_from_plan_if_requested(
plan: Option<Arc<dyn ExecutionPlan>>,
should_collect_region_watermark: bool,
) -> Option<RecordBatchMetrics> {
if should_collect_region_watermark {
plan.and_then(terminal_recordbatch_metrics_from_plan)
} else {
None
}
}
fn collect_region_watermarks(plan: Arc<dyn ExecutionPlan>) -> Vec<RegionWatermarkEntry> {
let mut merged = BTreeMap::<u64, MergeState>::new();
let mut stack = vec![plan];
@@ -240,6 +276,10 @@ fn merge_region_watermark_entries(
merged: &mut BTreeMap<u64, MergeState>,
entries: impl IntoIterator<Item = RegionWatermarkEntry>,
) {
// Merge by correctness rather than by maximum/minimum sequence. Any
// explicit unproved entry (`None`) vetoes a proved watermark, and
// conflicting proofs degrade the region back to unproved so Flow cannot
// advance checkpoints from ambiguous terminal metrics.
for entry in entries {
merged
.entry(entry.region_id)
@@ -281,6 +321,9 @@ fn merge_merge_scan_region_watermarks(
regions: impl IntoIterator<Item = u64>,
sub_stage_metrics: impl IntoIterator<Item = RecordBatchMetrics>,
) {
// Regions listed by MergeScanExec participated even when no sub-stage can
// prove a watermark. Keep them as explicit `None` entries so callers can
// distinguish unproved participation from non-participation.
for region_id in regions {
merged.entry(region_id).or_insert(MergeState::Participated);
}

View File

@@ -28,7 +28,9 @@ use async_trait::async_trait;
use bytes::{self, Bytes};
use common_error::ext::ErrorExt;
use common_grpc::flight::do_put::{DoPutMetadata, DoPutResponse};
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
use common_grpc::flight::{
FLOW_EXTENSIONS_METADATA_KEY, FlightDecoder, FlightEncoder, FlightMessage,
};
use common_memory_manager::MemoryGuard;
use common_query::{Output, OutputData};
use common_recordbatch::DfRecordBatch;
@@ -39,7 +41,7 @@ use datatypes::arrow::datatypes::SchemaRef;
use futures::{Stream, future, ready};
use futures_util::{StreamExt, TryStreamExt};
use prost::Message;
use query::metrics::terminal_recordbatch_metrics_from_plan;
use query::metrics::terminal_recordbatch_metrics_from_plan_if_requested;
use query::options::FlowQueryExtensions;
use session::context::{Channel, QueryContextRef};
use snafu::{IntoError, ResultExt, ensure};
@@ -58,8 +60,6 @@ use crate::request_memory_limiter::ServerMemoryLimiter;
use crate::request_memory_metrics::RequestMemoryMetrics;
use crate::{error, hint_headers};
const FLOW_EXTENSIONS_METADATA_KEY: &str = "x-greptime-flow-extensions";
pub type TonicStream<T> = Pin<Box<dyn Stream<Item = TonicResult<T>> + Send + 'static>>;
/// A subset of [FlightService]
@@ -204,8 +204,11 @@ impl FlightCraft for GreptimeRequestHandler {
// Validate flow hint syntax at the transport boundary before dispatching the request.
// This does not authorize or execute anything; `handle_request()` below still performs
// the normal frontend handling and auth checks before query execution.
FlowQueryExtensions::parse_flow_extensions(&query_ctx.extensions())
let flow_extensions = FlowQueryExtensions::parse_flow_extensions(&query_ctx.extensions())
.map_err(|e| Status::invalid_argument(e.output_msg()))?;
let should_emit_terminal_metrics = flow_extensions
.as_ref()
.is_some_and(|extensions| extensions.should_collect_region_watermark());
// The Grpc protocol pass query by Flight. It needs to be wrapped under a span, in order to record stream
let span = info_span!(
@@ -221,6 +224,7 @@ impl FlightCraft for GreptimeRequestHandler {
TracingContext::from_current_span(),
flight_compression,
query_ctx,
should_emit_terminal_metrics,
);
Ok(Response::new(stream))
}
@@ -552,6 +556,7 @@ fn to_flight_data_stream(
tracing_context: TracingContext,
flight_compression: FlightCompression,
query_ctx: QueryContextRef,
should_emit_terminal_metrics: bool,
) -> TonicStream<FlightData> {
match output.data {
OutputData::Stream(stream) => {
@@ -573,15 +578,11 @@ fn to_flight_data_stream(
Box::pin(stream) as _
}
OutputData::AffectedRows(rows) => {
let should_emit_terminal_metrics =
FlowQueryExtensions::parse_flow_extensions(&query_ctx.extensions())
.expect("flow extensions must be validated before Flight serialization")
.is_some_and(|extensions| extensions.should_collect_region_watermark());
let terminal_metrics = should_emit_terminal_metrics
.then_some(output.meta.plan)
.flatten()
.and_then(terminal_recordbatch_metrics_from_plan)
.and_then(|metrics| serde_json::to_string(&metrics).ok());
let terminal_metrics = terminal_recordbatch_metrics_from_plan_if_requested(
output.meta.plan,
should_emit_terminal_metrics,
)
.and_then(|metrics| serde_json::to_string(&metrics).ok());
let affected_rows = FlightEncoder::default().encode(FlightMessage::AffectedRows {
rows,
metrics: terminal_metrics,