diff --git a/src/client/src/database.rs b/src/client/src/database.rs index e1985318df..13ff9fc651 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -64,9 +64,7 @@ type DoPutResponseStream = Pin>>>; /// Terminal metrics associated with a query output. /// /// For streaming outputs, metrics are only final after the stream is fully -/// drained and [`Self::is_ready`] returns `true`. Region watermark helpers keep -/// the RFC distinction between proved regions, unproved participating regions, -/// and non-participating regions. +/// drained and [`Self::is_ready`] returns `true`. #[derive(Debug, Clone, Default)] pub struct OutputMetrics { inner: Arc, @@ -285,7 +283,7 @@ where } Some(FlightMessage::Metrics(_)) => { return IllegalFlightMessagesSnafu { - reason: "'AffectedRows' Flight metadata already carries Metrics and cannot be followed by another Metrics message".to_string(), + reason: "'AffectedRows' Flight metadata already carries Metrics and cannot be followed by another Metrics message", } .fail(); } diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index f16c83a84b..ac8cf4df26 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -295,19 +295,9 @@ impl RegionServer { ) .await?; - let region_latest_seq = - if should_collect_region_watermark_from_extensions(&query_ctx.extensions()) { - query_ctx.get_snapshot(region_id.as_u64()) - } else { - None - }; - - if let Some(seq) = region_latest_seq { - Ok(Box::pin(RegionWatermarkStream::new(stream, region_id, seq)) - as SendableRecordBatchStream) - } else { - Ok(stream) - } + Ok(wrap_flow_region_watermark_stream( + stream, region_id, &query_ctx, + )) } #[tracing::instrument(skip_all)] @@ -333,9 +323,15 @@ impl RegionServer { .context(DataFusionSnafu)? .data; - self.inner - .handle_read(QueryRequest { plan, ..request }, query_ctx) - .await + let region_id = request.region_id; + let stream = self + .inner + .handle_read(QueryRequest { plan, ..request }, query_ctx.clone()) + .await?; + + Ok(wrap_flow_region_watermark_stream( + stream, region_id, &query_ctx, + )) } /// Returns all opened and reportable regions. @@ -769,6 +765,21 @@ impl RegionServer { } } +fn wrap_flow_region_watermark_stream( + stream: SendableRecordBatchStream, + region_id: RegionId, + query_ctx: &QueryContextRef, +) -> SendableRecordBatchStream { + let Some(seq) = should_collect_region_watermark_from_extensions(&query_ctx.extensions()) + .then(|| query_ctx.get_snapshot(region_id.as_u64())) + .flatten() + else { + return stream; + }; + + Box::pin(RegionWatermarkStream::new(stream, region_id, seq)) +} + /// Wraps a region read stream so terminal metrics can carry the scan-open watermark. struct RegionWatermarkStream { stream: SendableRecordBatchStream, @@ -832,6 +843,10 @@ impl RecordBatchStream for RegionWatermarkStream { impl Stream for RegionWatermarkStream { type Item = common_recordbatch::error::Result; + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match Pin::new(&mut self.stream).poll_next(cx) { Poll::Ready(None) => { @@ -1763,6 +1778,7 @@ impl RegionAttribute { mod tests { use std::assert_matches; + use std::collections::HashMap; use std::sync::Arc; use api::v1::SemanticType; @@ -1774,6 +1790,7 @@ mod tests { use datatypes::vectors::Int32Vector; use futures_util::StreamExt; use mito2::test_util::CreateRequestBuilder; + use query::options::FLOW_RETURN_REGION_SEQ; use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; use store_api::region_engine::RegionEngine; use store_api::region_request::{ @@ -1785,6 +1802,19 @@ mod tests { use crate::error::Result; use crate::tests::{MockRegionEngine, mock_region_server}; + fn single_value_stream() -> SendableRecordBatchStream { + let schema = Arc::new(Schema::new(vec![ColumnSchema::new( + "v", + ConcreteDataType::int32_datatype(), + false, + )])); + let values: VectorRef = Arc::new(Int32Vector::from_slice([1])); + let batch = RecordBatch::new(schema.clone(), vec![values]).unwrap(); + RecordBatches::try_new(schema, vec![batch]) + .unwrap() + .as_stream() + } + #[tokio::test] async fn test_region_watermark_stream_only_sets_terminal_metrics() { let schema = Arc::new(Schema::new(vec![ColumnSchema::new( @@ -1848,6 +1878,68 @@ mod tests { ); } + #[tokio::test] + async fn test_wrap_flow_region_watermark_stream_adds_terminal_metrics() { + let region_id = RegionId::new(42, 7); + let query_ctx = Arc::new( + QueryContextBuilder::default() + .extensions(HashMap::from([( + FLOW_RETURN_REGION_SEQ.to_string(), + "true".to_string(), + )])) + .build(), + ); + query_ctx.set_snapshot(region_id.as_u64(), 99); + + let wrapped = + wrap_flow_region_watermark_stream(single_value_stream(), region_id, &query_ctx); + let mut pinned = Box::pin(wrapped); + while pinned.next().await.is_some() {} + + let metrics = pinned.as_ref().get_ref().metrics().unwrap(); + assert_eq!( + metrics.region_watermarks, + vec![RegionWatermarkEntry { + region_id: region_id.as_u64(), + watermark: Some(99), + }] + ); + } + + #[tokio::test] + async fn test_wrap_flow_region_watermark_stream_skips_without_extension() { + let region_id = RegionId::new(42, 7); + let query_ctx = Arc::new(QueryContextBuilder::default().build()); + query_ctx.set_snapshot(region_id.as_u64(), 99); + + let wrapped = + wrap_flow_region_watermark_stream(single_value_stream(), region_id, &query_ctx); + let mut pinned = Box::pin(wrapped); + while pinned.next().await.is_some() {} + + assert!(pinned.as_ref().get_ref().metrics().is_none()); + } + + #[tokio::test] + async fn test_wrap_flow_region_watermark_stream_skips_without_snapshot() { + let region_id = RegionId::new(42, 7); + let query_ctx = Arc::new( + QueryContextBuilder::default() + .extensions(HashMap::from([( + FLOW_RETURN_REGION_SEQ.to_string(), + "true".to_string(), + )])) + .build(), + ); + + let wrapped = + wrap_flow_region_watermark_stream(single_value_stream(), region_id, &query_ctx); + let mut pinned = Box::pin(wrapped); + while pinned.next().await.is_some() {} + + assert!(pinned.as_ref().get_ref().metrics().is_none()); + } + #[tokio::test] async fn test_region_registering() { common_telemetry::init_default_ut_logging(); diff --git a/src/servers/src/grpc/flight.rs b/src/servers/src/grpc/flight.rs index a2ff787d1b..21a03a41d3 100644 --- a/src/servers/src/grpc/flight.rs +++ b/src/servers/src/grpc/flight.rs @@ -578,11 +578,21 @@ fn to_flight_data_stream( Box::pin(stream) as _ } OutputData::AffectedRows(rows) => { - let terminal_metrics = terminal_recordbatch_metrics_from_plan_if_requested( + let terminal_metrics = match terminal_recordbatch_metrics_from_plan_if_requested( output.meta.plan, should_emit_terminal_metrics, - ) - .and_then(|metrics| serde_json::to_string(&metrics).ok()); + ) { + Some(metrics) => match serde_json::to_string(&metrics) { + Ok(metrics) => Some(metrics), + Err(e) => { + let stream = tokio_stream::once(Err(Status::internal(format!( + "Failed to serialize terminal metrics: {e}" + )))); + return Box::pin(stream) as _; + } + }, + None => None, + }; let affected_rows = FlightEncoder::default().encode(FlightMessage::AffectedRows { rows, metrics: terminal_metrics,