diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 6a7ac62fc3..7a07f478bf 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -25,6 +25,7 @@ use api::v1::{ AlterTableExpr, AuthHeader, Basic, CreateTableExpr, DdlRequest, GreptimeRequest, InsertRequests, QueryRequest, RequestHeader, RowInsertRequests, }; +use arc_swap::ArcSwapOption; use arrow_flight::{FlightData, Ticket}; use async_stream::stream; use base64::Engine; @@ -35,6 +36,7 @@ use common_error::ext::BoxedError; use common_grpc::flight::do_put::DoPutResponse; use common_grpc::flight::{FlightDecoder, FlightMessage}; use common_query::Output; +use common_recordbatch::adapter::RecordBatchMetrics; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper}; use common_telemetry::tracing::Span; @@ -424,24 +426,76 @@ impl Database { .fail() } FlightMessage::Schema(schema) => { + let metrics = Arc::new(ArcSwapOption::from(None)); + let metrics_ref = metrics.clone(); let schema = Arc::new( datatypes::schema::Schema::try_from(schema) .context(error::ConvertSchemaSnafu)?, ); let schema_cloned = schema.clone(); let stream = Box::pin(stream!({ - while let Some(flight_message) = flight_message_stream.next().await { - let flight_message = flight_message - .map_err(BoxedError::new) - .context(ExternalSnafu)?; + let mut buffered_message: Option = None; + let mut stream_ended = false; + + while !stream_ended { + let flight_message_item = if let Some(msg) = buffered_message.take() { + Some(Ok(msg)) + } else { + flight_message_stream.next().await + }; + + let flight_message = match flight_message_item { + Some(Ok(message)) => message, + Some(Err(e)) => { + yield Err(BoxedError::new(e)).context(ExternalSnafu); + break; + } + None => break, + }; + match flight_message { FlightMessage::RecordBatch(arrow_batch) => { - yield Ok(RecordBatch::from_df_record_batch( + let result_to_yield = RecordBatch::from_df_record_batch( schema_cloned.clone(), arrow_batch, - )) + ); + + if let Some(next_flight_message_result) = + flight_message_stream.next().await + { + match next_flight_message_result { + Ok(FlightMessage::Metrics(s)) => { + let m: Option> = + serde_json::from_str(&s).ok().map(Arc::new); + metrics_ref.swap(m); + } + Ok(FlightMessage::RecordBatch(rb)) => { + buffered_message = Some(FlightMessage::RecordBatch(rb)); + } + Ok(_) => { + yield IllegalFlightMessagesSnafu {reason: "A RecordBatch message can only be succeeded by a Metrics message or another RecordBatch message"} + .fail() + .map_err(BoxedError::new) + .context(ExternalSnafu); + break; + } + Err(e) => { + yield Err(BoxedError::new(e)).context(ExternalSnafu); + break; + } + } + } else { + stream_ended = true; + } + + yield Ok(result_to_yield) + } + FlightMessage::Metrics(s) => { + let m: Option> = + serde_json::from_str(&s).ok().map(Arc::new); + metrics_ref.swap(m); + break; } - FlightMessage::Metrics(_) => {} FlightMessage::AffectedRows(_) | FlightMessage::Schema(_) => { yield IllegalFlightMessagesSnafu {reason: format!("A Schema message must be succeeded exclusively by a set of RecordBatch messages, flight_message: {:?}", flight_message)} .fail() @@ -456,7 +510,7 @@ impl Database { schema, stream, output_ordering: None, - metrics: Default::default(), + metrics, span: Span::current(), }; Ok(Output::new_with_stream(Box::pin(record_batch_stream))) diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs index fc12d87dcf..1c23ab10c2 100644 --- a/src/common/recordbatch/src/adapter.rs +++ b/src/common/recordbatch/src/adapter.rs @@ -446,6 +446,8 @@ pub struct RecordBatchMetrics { // Detailed per-plan metrics /// An ordered list of plan metrics, from top to bottom in post-order. pub plan_metrics: Vec, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub region_latest_sequences: Option>, } /// Determines if a metric name represents a time measurement that should be formatted. diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 9d675aa276..662ec43b9d 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -17,8 +17,10 @@ mod catalog; use std::collections::HashMap; use std::fmt::Debug; use std::ops::Deref; +use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; +use std::task::{Context, Poll}; use std::time::Duration; use api::region::RegionResponse; @@ -36,7 +38,8 @@ use common_error::status_code::StatusCode; use common_meta::datanode::TopicStatsReporter; use common_query::OutputData; use common_query::request::QueryRequest; -use common_recordbatch::SendableRecordBatchStream; +use common_recordbatch::adapter::RecordBatchMetrics; +use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream}; use common_runtime::Runtime; use common_telemetry::tracing::{self, info_span}; use common_telemetry::tracing_context::{FutureExt, TracingContext}; @@ -45,6 +48,7 @@ use dashmap::DashMap; use datafusion::datasource::TableProvider; use datafusion_common::tree_node::TreeNode; use either::Either; +use futures_util::Stream; use futures_util::future::try_join_all; use metric_engine::engine::MetricEngine; use mito2::engine::{MITO_ENGINE_NAME, MitoEngine}; @@ -53,6 +57,7 @@ use query::QueryEngineRef; pub use query::dummy_catalog::{ DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef, }; +use query::options::{FLOW_INCREMENTAL_AFTER_SEQS, FLOW_RETURN_REGION_SEQ}; use serde_json; use servers::error::{ self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult, SuspendedSnafu, @@ -256,6 +261,20 @@ impl RegionServer { }; let region_id = RegionId::from_u64(request.region_id); + let should_return_region_seq = should_return_region_seq(&query_ctx); + let region_latest_seq = if should_return_region_seq { + let engine = self + .find_engine(region_id)? + .context(RegionNotFoundSnafu { region_id })?; + Some( + engine + .get_committed_sequence(region_id) + .await + .with_context(|_| HandleRegionRequestSnafu { region_id })?, + ) + } else { + None + }; let catalog_list = Arc::new(NameAwareCatalogList::new( self.clone(), region_id, @@ -278,7 +297,8 @@ impl RegionServer { .await .context(DecodeLogicalPlanSnafu)?; - self.inner + let stream = self + .inner .handle_read( QueryRequest { header: request.header, @@ -287,7 +307,14 @@ impl RegionServer { }, query_ctx, ) - .await + .await?; + + if let Some(seq) = region_latest_seq { + Ok(Box::pin(RegionWatermarkStream::new(stream, region_id, seq)) + as SendableRecordBatchStream) + } else { + Ok(stream) + } } #[tracing::instrument(skip_all)] @@ -748,6 +775,143 @@ impl RegionServer { } } +fn should_return_region_seq(query_ctx: &QueryContext) -> bool { + query_ctx + .extension(FLOW_RETURN_REGION_SEQ) + .is_some_and(|value| value.eq_ignore_ascii_case("true")) + || query_ctx.extension(FLOW_INCREMENTAL_AFTER_SEQS).is_some() +} + +struct RegionWatermarkStream { + stream: SendableRecordBatchStream, + region_latest_sequence: (u64, u64), + finished: AtomicBool, +} + +impl RegionWatermarkStream { + fn new(stream: SendableRecordBatchStream, region_id: RegionId, latest_sequence: u64) -> Self { + Self { + stream, + region_latest_sequence: (region_id.as_u64(), latest_sequence), + finished: AtomicBool::new(false), + } + } + + fn merged_metrics(&self, mut metrics: RecordBatchMetrics) -> RecordBatchMetrics { + let region_latest_sequences = metrics.region_latest_sequences.get_or_insert_with(Vec::new); + if let Some((_, seq)) = region_latest_sequences + .iter_mut() + .find(|(region_id, _)| *region_id == self.region_latest_sequence.0) + { + *seq = self.region_latest_sequence.1; + } else { + region_latest_sequences.push(self.region_latest_sequence); + } + metrics + } +} + +impl RecordBatchStream for RegionWatermarkStream { + fn name(&self) -> &str { + self.stream.name() + } + + fn schema(&self) -> datatypes::schema::SchemaRef { + self.stream.schema() + } + + fn output_ordering(&self) -> Option<&[OrderOption]> { + self.stream.output_ordering() + } + + fn metrics(&self) -> Option { + let base = self.stream.metrics(); + if !self.finished.load(Ordering::Relaxed) { + return base; + } + + Some(self.merged_metrics(base.unwrap_or_default())) + } +} + +impl Stream for RegionWatermarkStream { + type Item = common_recordbatch::error::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match Pin::new(&mut self.stream).poll_next(cx) { + Poll::Ready(None) => { + self.finished.store(true, Ordering::Relaxed); + Poll::Ready(None) + } + other => other, + } + } +} + +#[cfg(test)] +mod watermark_tests { + use std::collections::HashMap; + use std::sync::Arc; + + use common_recordbatch::{RecordBatch, RecordBatches}; + use datatypes::prelude::{ConcreteDataType, VectorRef}; + use datatypes::schema::{ColumnSchema, Schema}; + use datatypes::vectors::Int32Vector; + use futures_util::StreamExt; + use session::context::QueryContextBuilder; + + use super::*; + + #[test] + fn test_should_return_region_seq() { + let ctx = QueryContextBuilder::default() + .extensions(HashMap::from([( + FLOW_RETURN_REGION_SEQ.to_string(), + "true".to_string(), + )])) + .build(); + assert!(should_return_region_seq(&ctx)); + + let ctx = QueryContextBuilder::default() + .extensions(HashMap::from([( + FLOW_INCREMENTAL_AFTER_SEQS.to_string(), + r#"{"1":10}"#.to_string(), + )])) + .build(); + assert!(should_return_region_seq(&ctx)); + + let ctx = QueryContextBuilder::default().build(); + assert!(!should_return_region_seq(&ctx)); + } + + #[tokio::test] + async fn test_region_watermark_stream_only_sets_terminal_metrics() { + let schema = Arc::new(Schema::new(vec![ColumnSchema::new( + "v", + ConcreteDataType::int32_datatype(), + false, + )])); + let values: VectorRef = Arc::new(Int32Vector::from_slice([1, 2])); + let batch = RecordBatch::new(schema.clone(), vec![values]).unwrap(); + let stream = RecordBatches::try_new(schema, vec![batch]) + .unwrap() + .as_stream(); + + let region_id = RegionId::new(42, 7); + let wrapped = RegionWatermarkStream::new(stream, region_id, 99); + let mut pinned = Box::pin(wrapped); + + assert!(pinned.as_ref().get_ref().metrics().is_none()); + while pinned.next().await.is_some() {} + + let metrics = pinned.as_ref().get_ref().metrics().unwrap(); + assert_eq!( + metrics.region_latest_sequences, + Some(vec![(region_id.as_u64(), 99)]) + ); + } +} + #[async_trait] impl RegionServerHandler for RegionServer { async fn handle(&self, request: region_request::Body) -> ServerResult { diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index d148e6aa1b..6f78d23e14 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -399,7 +399,7 @@ impl ErrorExt for Error { Error::PrometheusLabelValuesQueryPlan { source, .. } => source.status_code(), - Error::CollectRecordbatch { .. } => StatusCode::EngineExecuteQuery, + Error::CollectRecordbatch { source, .. } => source.status_code(), Error::SqlExecIntercepted { source, .. } => source.status_code(), Error::StartServer { source, .. } => source.status_code(), diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index ce589bb677..7b4c6adee0 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -31,6 +31,7 @@ use std::sync::atomic::AtomicBool; use std::sync::{Arc, atomic}; use std::time::{Duration, SystemTime}; +use arc_swap::ArcSwapOption; use async_stream::stream; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; @@ -492,9 +493,12 @@ fn attach_timeout(output: Output, mut timeout: Duration) -> Result { OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output, OutputData::Stream(mut stream) => { let schema = stream.schema(); + let metrics = Arc::new(ArcSwapOption::from(stream.metrics().map(Arc::new))); + let metrics_ref = metrics.clone(); let s = Box::pin(stream! { let mut start = tokio::time::Instant::now(); while let Some(item) = tokio::time::timeout(timeout, stream.next()).await.map_err(|_| StreamTimeoutSnafu.build())? { + metrics_ref.swap(stream.metrics().map(Arc::new)); yield item; let now = tokio::time::Instant::now(); @@ -505,12 +509,13 @@ fn attach_timeout(output: Output, mut timeout: Duration) -> Result { StreamTimeoutSnafu.fail()?; } } + metrics_ref.swap(stream.metrics().map(Arc::new)); }) as Pin + Send>>; let stream = RecordBatchStreamWrapper { schema, stream: s, output_ordering: None, - metrics: Default::default(), + metrics, span: Span::current(), }; Output::new(OutputData::Stream(Box::pin(stream)), output.meta) @@ -1131,12 +1136,22 @@ fn should_capture_statement(stmt: Option<&Statement>) -> bool { #[cfg(test)] mod tests { use std::collections::HashMap; + use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Barrier}; + use std::task::{Context, Poll}; use std::thread; use std::time::{Duration, Instant}; use common_base::Plugins; + use common_query::{Output, OutputData}; + use common_recordbatch::adapter::RecordBatchMetrics; + use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream}; + use datatypes::data_type::ConcreteDataType; + use datatypes::prelude::VectorRef; + use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; + use datatypes::vectors::Int32Vector; + use futures::{Stream, StreamExt}; use query::query_engine::options::QueryOptions; use session::context::QueryContext; use sql::dialect::GreptimeDbDialect; @@ -1381,4 +1396,109 @@ mod tests { assert_eq!(result.is_ok(), is_ok); } } + + struct MockMetricsStream { + schema: SchemaRef, + batch: Option, + metrics: RecordBatchMetrics, + terminal_metrics_only: bool, + } + + impl Stream for MockMetricsStream { + type Item = common_recordbatch::error::Result; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(self.batch.take().map(Ok)) + } + } + + impl RecordBatchStream for MockMetricsStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn output_ordering(&self) -> Option<&[OrderOption]> { + None + } + + fn metrics(&self) -> Option { + if self.terminal_metrics_only && self.batch.is_some() { + return None; + } + Some(self.metrics.clone()) + } + } + + #[tokio::test] + async fn test_attach_timeout_preserves_stream_metrics() { + let schema = Arc::new(Schema::new(vec![ColumnSchema::new( + "v", + ConcreteDataType::int32_datatype(), + false, + )])); + let batch = RecordBatch::new( + schema.clone(), + vec![Arc::new(Int32Vector::from_slice([1, 2])) as VectorRef], + ) + .unwrap(); + let expected_metrics = RecordBatchMetrics { + region_latest_sequences: Some(vec![(42, 99)]), + ..Default::default() + }; + let output = Output::new_with_stream(Box::pin(MockMetricsStream { + schema, + batch: Some(batch), + metrics: expected_metrics.clone(), + terminal_metrics_only: false, + })); + + let output = attach_timeout(output, Duration::from_secs(1)).unwrap(); + let OutputData::Stream(mut stream) = output.data else { + panic!("expected stream output"); + }; + assert_eq!( + stream.metrics().and_then(|m| m.region_latest_sequences), + expected_metrics.region_latest_sequences.clone() + ); + while stream.next().await.is_some() {} + assert_eq!( + stream.metrics().and_then(|m| m.region_latest_sequences), + expected_metrics.region_latest_sequences + ); + } + + #[tokio::test] + async fn test_attach_timeout_preserves_terminal_only_metrics() { + let schema = Arc::new(Schema::new(vec![ColumnSchema::new( + "v", + ConcreteDataType::int32_datatype(), + false, + )])); + let batch = RecordBatch::new( + schema.clone(), + vec![Arc::new(Int32Vector::from_slice([1, 2])) as VectorRef], + ) + .unwrap(); + let expected_metrics = RecordBatchMetrics { + region_latest_sequences: Some(vec![(7, 123)]), + ..Default::default() + }; + let output = Output::new_with_stream(Box::pin(MockMetricsStream { + schema, + batch: Some(batch), + metrics: expected_metrics.clone(), + terminal_metrics_only: true, + })); + + let output = attach_timeout(output, Duration::from_secs(1)).unwrap(); + let OutputData::Stream(mut stream) = output.data else { + panic!("expected stream output"); + }; + assert!(stream.metrics().is_none()); + while stream.next().await.is_some() {} + assert_eq!( + stream.metrics().and_then(|m| m.region_latest_sequences), + expected_metrics.region_latest_sequences + ); + } } diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index dc84c4afac..79223d590c 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -59,7 +59,8 @@ use crate::error::{ TableNotFoundSnafu, TableReadOnlySnafu, UnsupportedExprSnafu, }; use crate::executor::QueryExecutor; -use crate::metrics::{OnDone, QUERY_STAGE_ELAPSED}; +use crate::metrics::{OnDone, QUERY_STAGE_ELAPSED, RegionWatermarkMetricsStream}; +use crate::options::FlowQueryExtensions; use crate::physical_wrapper::PhysicalPlanWrapperRef; use crate::planner::{DfLogicalPlanner, LogicalPlanner}; use crate::query_engine::{DescribeResult, QueryEngineContext, QueryEngineState}; @@ -586,6 +587,11 @@ impl QueryExecutor for DatafusionQueryEngine { plan: &Arc, ) -> Result { let explain_verbose = ctx.query_ctx().explain_verbose(); + let should_collect_region_watermark = + FlowQueryExtensions::from_extensions(&ctx.query_ctx().extensions()) + .map(|extensions| extensions.should_collect_region_watermark()) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)?; let output_partitions = plan.properties().output_partitioning().partition_count(); if explain_verbose { common_telemetry::info!("Executing query plan, output_partitions: {output_partitions}"); @@ -621,7 +627,14 @@ impl QueryExecutor for DatafusionQueryEngine { ); } }); - Ok(Box::pin(stream)) + if should_collect_region_watermark { + Ok(Box::pin(RegionWatermarkMetricsStream::new( + Box::pin(stream), + plan.clone(), + ))) + } else { + Ok(Box::pin(stream)) + } } _ => { // merge into a single partition @@ -650,7 +663,14 @@ impl QueryExecutor for DatafusionQueryEngine { ); } }); - Ok(Box::pin(stream)) + if should_collect_region_watermark { + Ok(Box::pin(RegionWatermarkMetricsStream::new( + Box::pin(stream), + plan.clone(), + ))) + } else { + Ok(Box::pin(stream)) + } } } } diff --git a/src/query/src/metrics.rs b/src/query/src/metrics.rs index e0d02e9a3d..a5ed020f38 100644 --- a/src/query/src/metrics.rs +++ b/src/query/src/metrics.rs @@ -13,16 +13,20 @@ // limitations under the License. use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; use common_recordbatch::adapter::RecordBatchMetrics; use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream}; +use datafusion::physical_plan::ExecutionPlan; use datatypes::schema::SchemaRef; use futures::Stream; use futures_util::ready; use lazy_static::lazy_static; use prometheus::*; +use crate::dist_plan::MergeScanExec; + lazy_static! { /// Timer of different stages in query. pub static ref QUERY_STAGE_ELAPSED: HistogramVec = register_histogram_vec!( @@ -129,3 +133,72 @@ impl Stream for OnDone { self.stream.size_hint() } } + +pub struct RegionWatermarkMetricsStream { + stream: SendableRecordBatchStream, + plan: Arc, +} + +impl RegionWatermarkMetricsStream { + pub fn new(stream: SendableRecordBatchStream, plan: Arc) -> Self { + Self { stream, plan } + } +} + +impl RecordBatchStream for RegionWatermarkMetricsStream { + fn name(&self) -> &str { + self.stream.name() + } + + fn schema(&self) -> SchemaRef { + self.stream.schema() + } + + fn output_ordering(&self) -> Option<&[OrderOption]> { + self.stream.output_ordering() + } + + fn metrics(&self) -> Option { + let mut metrics = self.stream.metrics()?; + let region_latest_sequences = collect_region_latest_sequences(self.plan.clone()); + if !region_latest_sequences.is_empty() { + metrics.region_latest_sequences = Some(region_latest_sequences); + } + Some(metrics) + } +} + +impl Stream for RegionWatermarkMetricsStream { + type Item = common_recordbatch::error::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.stream).poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } +} + +fn collect_region_latest_sequences(plan: Arc) -> Vec<(u64, u64)> { + let mut merged = std::collections::HashMap::new(); + let mut stack = vec![plan]; + + while let Some(plan) = stack.pop() { + if let Some(merge_scan) = plan.as_any().downcast_ref::() { + for metrics in merge_scan.sub_stage_metrics() { + if let Some(region_latest_sequences) = metrics.region_latest_sequences { + for (region_id, seq) in region_latest_sequences { + merged.insert(region_id, seq); + } + } + } + } + + stack.extend(plan.children().into_iter().cloned()); + } + + let mut region_latest_sequences = merged.into_iter().collect::>(); + region_latest_sequences.sort_unstable_by_key(|(region_id, _)| *region_id); + region_latest_sequences +} diff --git a/src/query/src/options.rs b/src/query/src/options.rs index 0bcfbb2041..dd1372ce7b 100644 --- a/src/query/src/options.rs +++ b/src/query/src/options.rs @@ -157,6 +157,10 @@ impl FlowQueryExtensions { Ok(self.incremental_after_seqs.is_some()) } + + pub fn should_collect_region_watermark(&self) -> bool { + self.return_region_seq || self.incremental_after_seqs.is_some() + } } fn parse_incremental_after_seqs(value: &str) -> Result> { @@ -368,4 +372,28 @@ mod flow_extension_tests { let apply_incremental = parsed.validate_for_scan(source_region_id).unwrap(); assert!(!apply_incremental); } + + #[test] + fn test_should_collect_region_watermark_defaults_false() { + let parsed = FlowQueryExtensions::default(); + assert!(!parsed.should_collect_region_watermark()); + } + + #[test] + fn test_should_collect_region_watermark_true_for_return_region_seq() { + let parsed = FlowQueryExtensions { + return_region_seq: true, + ..Default::default() + }; + assert!(parsed.should_collect_region_watermark()); + } + + #[test] + fn test_should_collect_region_watermark_true_for_incremental_query() { + let parsed = FlowQueryExtensions { + incremental_after_seqs: Some(HashMap::from([(1, 10)])), + ..Default::default() + }; + assert!(parsed.should_collect_region_watermark()); + } } diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 18ac964f05..42d120fdf5 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -703,7 +703,7 @@ impl ErrorExt for Error { #[cfg(not(windows))] UpdateJemallocMetrics { .. } => StatusCode::Internal, - CollectRecordbatch { .. } => StatusCode::EngineExecuteQuery, + CollectRecordbatch { source, .. } => source.status_code(), ExecuteQuery { source, .. } | ExecutePlan { source, .. } diff --git a/tests-integration/src/grpc/flight.rs b/tests-integration/src/grpc/flight.rs index 9ed6b8176f..ec00f0e526 100644 --- a/tests-integration/src/grpc/flight.rs +++ b/tests-integration/src/grpc/flight.rs @@ -23,10 +23,13 @@ mod test { use auth::user_provider_from_option; use client::{Client, Database}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use common_error::ext::ErrorExt; + use common_error::status_code::StatusCode; use common_grpc::flight::do_put::DoPutMetadata; use common_grpc::flight::{FlightEncoder, FlightMessage}; use common_query::OutputData; use common_recordbatch::RecordBatch; + use common_recordbatch::adapter::RecordBatchMetrics; use datatypes::prelude::{ConcreteDataType, ScalarVector, VectorRef}; use datatypes::schema::{ColumnSchema, Schema}; use datatypes::vectors::{Int32Vector, StringVector, TimestampMillisecondVector}; @@ -129,6 +132,141 @@ mod test { | 1970-01-01T00:00:00.009 | -9 | s9 | +-------------------------+----+----+"; query_and_expect(db.fe_instance().as_ref(), sql, expected).await; + + let output = client.sql(sql).await.unwrap(); + let OutputData::Stream(mut stream) = output.data else { + panic!("expected stream output"); + }; + while let Some(batch) = stream.next().await { + batch.unwrap(); + } + let metrics = stream.metrics().expect("expected terminal metrics"); + assert!(metrics.region_latest_sequences.is_none()); + + let output = client + .sql_with_hint(sql, &[("flow.return_region_seq", "true")]) + .await + .unwrap(); + let OutputData::Stream(mut stream) = output.data else { + panic!("expected stream output"); + }; + + let mut row_count = 0; + while let Some(batch) = stream.next().await { + let batch = batch.unwrap(); + row_count += batch.num_rows(); + } + assert_eq!(row_count, 9); + + let RecordBatchMetrics { + region_latest_sequences, + .. + } = stream.metrics().expect("expected terminal metrics"); + let region_latest_sequences = + region_latest_sequences.expect("expected region watermark metrics"); + assert_eq!(region_latest_sequences.len(), 1); + assert!(region_latest_sequences[0].0 > 0); + assert!(region_latest_sequences[0].1 > 0); + + let previous_watermark = region_latest_sequences[0]; + + let incremental_batches = create_record_batches(10); + test_put_record_batches(&client, incremental_batches).await; + + let output = client + .sql_with_hint( + sql, + &[ + ("flow.incremental_mode", "memtable_only"), + ( + "flow.incremental_after_seqs", + &format!( + r#"{{"{}":"{}"}}"#, + previous_watermark.0, previous_watermark.1 + ), + ), + ("flow.return_region_seq", "true"), + ], + ) + .await + .unwrap(); + let OutputData::Stream(mut stream) = output.data else { + panic!("expected stream output"); + }; + + let schema = stream.schema(); + let mut rows = Vec::new(); + while let Some(batch) = stream.next().await { + rows.push(batch.unwrap()); + } + let pretty = common_recordbatch::RecordBatches::try_new(schema, rows) + .unwrap() + .pretty_print() + .unwrap(); + assert_eq!( + pretty, + "\ ++-------------------------+-----+-----+ +| ts | a | B | ++-------------------------+-----+-----+ +| 1970-01-01T00:00:00.010 | -10 | s10 | +| 1970-01-01T00:00:00.011 | -11 | s11 | +| 1970-01-01T00:00:00.012 | -12 | s12 | +| 1970-01-01T00:00:00.013 | -13 | s13 | +| 1970-01-01T00:00:00.014 | -14 | s14 | +| 1970-01-01T00:00:00.015 | -15 | s15 | +| 1970-01-01T00:00:00.016 | -16 | s16 | +| 1970-01-01T00:00:00.017 | -17 | s17 | +| 1970-01-01T00:00:00.018 | -18 | s18 | ++-------------------------+-----+-----+" + ); + + let RecordBatchMetrics { + region_latest_sequences, + .. + } = stream + .metrics() + .expect("expected terminal incremental metrics"); + let region_latest_sequences = + region_latest_sequences.expect("expected incremental region watermark metrics"); + assert_eq!(region_latest_sequences.len(), 1); + assert_eq!(region_latest_sequences[0].0, previous_watermark.0); + assert!(region_latest_sequences[0].1 > previous_watermark.1); + + client.sql("admin flush_table('foo')").await.unwrap(); + + let output = client + .sql_with_hint( + sql, + &[ + ("flow.incremental_mode", "memtable_only"), + ( + "flow.incremental_after_seqs", + &format!( + r#"{{"{}":"{}"}}"#, + previous_watermark.0, previous_watermark.1 + ), + ), + ("flow.return_region_seq", "true"), + ], + ) + .await + .unwrap(); + let OutputData::Stream(mut stream) = output.data else { + panic!("expected stream output"); + }; + let err = loop { + match stream.next().await { + Some(Err(err)) => break err, + Some(Ok(_)) => continue, + None => panic!("expected stale incremental query to fail while consuming stream"), + } + }; + assert_eq!(err.status_code(), StatusCode::EngineExecuteQuery); + let err_msg = format!("{err:?}"); + assert!(err_msg.contains("STALE_CURSOR")); + assert!(err_msg.contains(&previous_watermark.0.to_string())); + assert!(err_msg.contains(&previous_watermark.1.to_string())); } async fn test_put_record_batches(client: &Database, record_batches: Vec) {