fix: standalone also monkey patch

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-05-11 12:28:21 +08:00
parent 4cef82b39c
commit 2078dd9f6e
3 changed files with 123 additions and 23 deletions

View File

@@ -64,9 +64,7 @@ type DoPutResponseStream = Pin<Box<dyn Stream<Item = Result<DoPutResponse>>>>;
/// Terminal metrics associated with a query output.
///
/// For streaming outputs, metrics are only final after the stream is fully
/// drained and [`Self::is_ready`] returns `true`. Region watermark helpers keep
/// the RFC distinction between proved regions, unproved participating regions,
/// and non-participating regions.
/// drained and [`Self::is_ready`] returns `true`.
#[derive(Debug, Clone, Default)]
pub struct OutputMetrics {
inner: Arc<OutputMetricsInner>,
@@ -285,7 +283,7 @@ where
}
Some(FlightMessage::Metrics(_)) => {
return IllegalFlightMessagesSnafu {
reason: "'AffectedRows' Flight metadata already carries Metrics and cannot be followed by another Metrics message".to_string(),
reason: "'AffectedRows' Flight metadata already carries Metrics and cannot be followed by another Metrics message",
}
.fail();
}

View File

@@ -295,19 +295,9 @@ impl RegionServer {
)
.await?;
let region_latest_seq =
if should_collect_region_watermark_from_extensions(&query_ctx.extensions()) {
query_ctx.get_snapshot(region_id.as_u64())
} else {
None
};
if let Some(seq) = region_latest_seq {
Ok(Box::pin(RegionWatermarkStream::new(stream, region_id, seq))
as SendableRecordBatchStream)
} else {
Ok(stream)
}
Ok(wrap_flow_region_watermark_stream(
stream, region_id, &query_ctx,
))
}
#[tracing::instrument(skip_all)]
@@ -333,9 +323,15 @@ impl RegionServer {
.context(DataFusionSnafu)?
.data;
self.inner
.handle_read(QueryRequest { plan, ..request }, query_ctx)
.await
let region_id = request.region_id;
let stream = self
.inner
.handle_read(QueryRequest { plan, ..request }, query_ctx.clone())
.await?;
Ok(wrap_flow_region_watermark_stream(
stream, region_id, &query_ctx,
))
}
/// Returns all opened and reportable regions.
@@ -769,6 +765,21 @@ impl RegionServer {
}
}
fn wrap_flow_region_watermark_stream(
stream: SendableRecordBatchStream,
region_id: RegionId,
query_ctx: &QueryContextRef,
) -> SendableRecordBatchStream {
let Some(seq) = should_collect_region_watermark_from_extensions(&query_ctx.extensions())
.then(|| query_ctx.get_snapshot(region_id.as_u64()))
.flatten()
else {
return stream;
};
Box::pin(RegionWatermarkStream::new(stream, region_id, seq))
}
/// Wraps a region read stream so terminal metrics can carry the scan-open watermark.
struct RegionWatermarkStream {
stream: SendableRecordBatchStream,
@@ -832,6 +843,10 @@ impl RecordBatchStream for RegionWatermarkStream {
impl Stream for RegionWatermarkStream {
type Item = common_recordbatch::error::Result<RecordBatch>;
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Ready(None) => {
@@ -1763,6 +1778,7 @@ impl RegionAttribute {
mod tests {
use std::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::SemanticType;
@@ -1774,6 +1790,7 @@ mod tests {
use datatypes::vectors::Int32Vector;
use futures_util::StreamExt;
use mito2::test_util::CreateRequestBuilder;
use query::options::FLOW_RETURN_REGION_SEQ;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
@@ -1785,6 +1802,19 @@ mod tests {
use crate::error::Result;
use crate::tests::{MockRegionEngine, mock_region_server};
fn single_value_stream() -> SendableRecordBatchStream {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"v",
ConcreteDataType::int32_datatype(),
false,
)]));
let values: VectorRef = Arc::new(Int32Vector::from_slice([1]));
let batch = RecordBatch::new(schema.clone(), vec![values]).unwrap();
RecordBatches::try_new(schema, vec![batch])
.unwrap()
.as_stream()
}
#[tokio::test]
async fn test_region_watermark_stream_only_sets_terminal_metrics() {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
@@ -1848,6 +1878,68 @@ mod tests {
);
}
#[tokio::test]
async fn test_wrap_flow_region_watermark_stream_adds_terminal_metrics() {
let region_id = RegionId::new(42, 7);
let query_ctx = Arc::new(
QueryContextBuilder::default()
.extensions(HashMap::from([(
FLOW_RETURN_REGION_SEQ.to_string(),
"true".to_string(),
)]))
.build(),
);
query_ctx.set_snapshot(region_id.as_u64(), 99);
let wrapped =
wrap_flow_region_watermark_stream(single_value_stream(), region_id, &query_ctx);
let mut pinned = Box::pin(wrapped);
while pinned.next().await.is_some() {}
let metrics = pinned.as_ref().get_ref().metrics().unwrap();
assert_eq!(
metrics.region_watermarks,
vec![RegionWatermarkEntry {
region_id: region_id.as_u64(),
watermark: Some(99),
}]
);
}
#[tokio::test]
async fn test_wrap_flow_region_watermark_stream_skips_without_extension() {
let region_id = RegionId::new(42, 7);
let query_ctx = Arc::new(QueryContextBuilder::default().build());
query_ctx.set_snapshot(region_id.as_u64(), 99);
let wrapped =
wrap_flow_region_watermark_stream(single_value_stream(), region_id, &query_ctx);
let mut pinned = Box::pin(wrapped);
while pinned.next().await.is_some() {}
assert!(pinned.as_ref().get_ref().metrics().is_none());
}
#[tokio::test]
async fn test_wrap_flow_region_watermark_stream_skips_without_snapshot() {
let region_id = RegionId::new(42, 7);
let query_ctx = Arc::new(
QueryContextBuilder::default()
.extensions(HashMap::from([(
FLOW_RETURN_REGION_SEQ.to_string(),
"true".to_string(),
)]))
.build(),
);
let wrapped =
wrap_flow_region_watermark_stream(single_value_stream(), region_id, &query_ctx);
let mut pinned = Box::pin(wrapped);
while pinned.next().await.is_some() {}
assert!(pinned.as_ref().get_ref().metrics().is_none());
}
#[tokio::test]
async fn test_region_registering() {
common_telemetry::init_default_ut_logging();

View File

@@ -578,11 +578,21 @@ fn to_flight_data_stream(
Box::pin(stream) as _
}
OutputData::AffectedRows(rows) => {
let terminal_metrics = terminal_recordbatch_metrics_from_plan_if_requested(
let terminal_metrics = match terminal_recordbatch_metrics_from_plan_if_requested(
output.meta.plan,
should_emit_terminal_metrics,
)
.and_then(|metrics| serde_json::to_string(&metrics).ok());
) {
Some(metrics) => match serde_json::to_string(&metrics) {
Ok(metrics) => Some(metrics),
Err(e) => {
let stream = tokio_stream::once(Err(Status::internal(format!(
"Failed to serialize terminal metrics: {e}"
))));
return Box::pin(stream) as _;
}
},
None => None,
};
let affected_rows = FlightEncoder::default().encode(FlightMessage::AffectedRows {
rows,
metrics: terminal_metrics,