feat: dn2fe2client seq in metrics

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-03-17 02:11:54 +08:00
parent 8541e0d096
commit da6eb85bbb
10 changed files with 616 additions and 17 deletions

View File

@@ -25,6 +25,7 @@ use api::v1::{
AlterTableExpr, AuthHeader, Basic, CreateTableExpr, DdlRequest, GreptimeRequest,
InsertRequests, QueryRequest, RequestHeader, RowInsertRequests,
};
use arc_swap::ArcSwapOption;
use arrow_flight::{FlightData, Ticket};
use async_stream::stream;
use base64::Engine;
@@ -35,6 +36,7 @@ use common_error::ext::BoxedError;
use common_grpc::flight::do_put::DoPutResponse;
use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_query::Output;
use common_recordbatch::adapter::RecordBatchMetrics;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper};
use common_telemetry::tracing::Span;
@@ -424,24 +426,76 @@ impl Database {
.fail()
}
FlightMessage::Schema(schema) => {
let metrics = Arc::new(ArcSwapOption::from(None));
let metrics_ref = metrics.clone();
let schema = Arc::new(
datatypes::schema::Schema::try_from(schema)
.context(error::ConvertSchemaSnafu)?,
);
let schema_cloned = schema.clone();
let stream = Box::pin(stream!({
while let Some(flight_message) = flight_message_stream.next().await {
let flight_message = flight_message
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let mut buffered_message: Option<FlightMessage> = None;
let mut stream_ended = false;
while !stream_ended {
let flight_message_item = if let Some(msg) = buffered_message.take() {
Some(Ok(msg))
} else {
flight_message_stream.next().await
};
let flight_message = match flight_message_item {
Some(Ok(message)) => message,
Some(Err(e)) => {
yield Err(BoxedError::new(e)).context(ExternalSnafu);
break;
}
None => break,
};
match flight_message {
FlightMessage::RecordBatch(arrow_batch) => {
yield Ok(RecordBatch::from_df_record_batch(
let result_to_yield = RecordBatch::from_df_record_batch(
schema_cloned.clone(),
arrow_batch,
))
);
if let Some(next_flight_message_result) =
flight_message_stream.next().await
{
match next_flight_message_result {
Ok(FlightMessage::Metrics(s)) => {
let m: Option<Arc<RecordBatchMetrics>> =
serde_json::from_str(&s).ok().map(Arc::new);
metrics_ref.swap(m);
}
Ok(FlightMessage::RecordBatch(rb)) => {
buffered_message = Some(FlightMessage::RecordBatch(rb));
}
Ok(_) => {
yield IllegalFlightMessagesSnafu {reason: "A RecordBatch message can only be succeeded by a Metrics message or another RecordBatch message"}
.fail()
.map_err(BoxedError::new)
.context(ExternalSnafu);
break;
}
Err(e) => {
yield Err(BoxedError::new(e)).context(ExternalSnafu);
break;
}
}
} else {
stream_ended = true;
}
yield Ok(result_to_yield)
}
FlightMessage::Metrics(s) => {
let m: Option<Arc<RecordBatchMetrics>> =
serde_json::from_str(&s).ok().map(Arc::new);
metrics_ref.swap(m);
break;
}
FlightMessage::Metrics(_) => {}
FlightMessage::AffectedRows(_) | FlightMessage::Schema(_) => {
yield IllegalFlightMessagesSnafu {reason: format!("A Schema message must be succeeded exclusively by a set of RecordBatch messages, flight_message: {:?}", flight_message)}
.fail()
@@ -456,7 +510,7 @@ impl Database {
schema,
stream,
output_ordering: None,
metrics: Default::default(),
metrics,
span: Span::current(),
};
Ok(Output::new_with_stream(Box::pin(record_batch_stream)))

View File

@@ -446,6 +446,8 @@ pub struct RecordBatchMetrics {
// Detailed per-plan metrics
/// An ordered list of plan metrics, from top to bottom in post-order.
pub plan_metrics: Vec<PlanMetrics>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub region_latest_sequences: Option<Vec<(u64, u64)>>,
}
/// Determines if a metric name represents a time measurement that should be formatted.

View File

@@ -17,8 +17,10 @@ mod catalog;
use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};
use std::time::Duration;
use api::region::RegionResponse;
@@ -36,7 +38,8 @@ use common_error::status_code::StatusCode;
use common_meta::datanode::TopicStatsReporter;
use common_query::OutputData;
use common_query::request::QueryRequest;
use common_recordbatch::SendableRecordBatchStream;
use common_recordbatch::adapter::RecordBatchMetrics;
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use common_runtime::Runtime;
use common_telemetry::tracing::{self, info_span};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
@@ -45,6 +48,7 @@ use dashmap::DashMap;
use datafusion::datasource::TableProvider;
use datafusion_common::tree_node::TreeNode;
use either::Either;
use futures_util::Stream;
use futures_util::future::try_join_all;
use metric_engine::engine::MetricEngine;
use mito2::engine::{MITO_ENGINE_NAME, MitoEngine};
@@ -53,6 +57,7 @@ use query::QueryEngineRef;
pub use query::dummy_catalog::{
DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
};
use query::options::{FLOW_INCREMENTAL_AFTER_SEQS, FLOW_RETURN_REGION_SEQ};
use serde_json;
use servers::error::{
self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult, SuspendedSnafu,
@@ -256,6 +261,20 @@ impl RegionServer {
};
let region_id = RegionId::from_u64(request.region_id);
let should_return_region_seq = should_return_region_seq(&query_ctx);
let region_latest_seq = if should_return_region_seq {
let engine = self
.find_engine(region_id)?
.context(RegionNotFoundSnafu { region_id })?;
Some(
engine
.get_committed_sequence(region_id)
.await
.with_context(|_| HandleRegionRequestSnafu { region_id })?,
)
} else {
None
};
let catalog_list = Arc::new(NameAwareCatalogList::new(
self.clone(),
region_id,
@@ -278,7 +297,8 @@ impl RegionServer {
.await
.context(DecodeLogicalPlanSnafu)?;
self.inner
let stream = self
.inner
.handle_read(
QueryRequest {
header: request.header,
@@ -287,7 +307,14 @@ impl RegionServer {
},
query_ctx,
)
.await
.await?;
if let Some(seq) = region_latest_seq {
Ok(Box::pin(RegionWatermarkStream::new(stream, region_id, seq))
as SendableRecordBatchStream)
} else {
Ok(stream)
}
}
#[tracing::instrument(skip_all)]
@@ -748,6 +775,143 @@ impl RegionServer {
}
}
fn should_return_region_seq(query_ctx: &QueryContext) -> bool {
query_ctx
.extension(FLOW_RETURN_REGION_SEQ)
.is_some_and(|value| value.eq_ignore_ascii_case("true"))
|| query_ctx.extension(FLOW_INCREMENTAL_AFTER_SEQS).is_some()
}
struct RegionWatermarkStream {
stream: SendableRecordBatchStream,
region_latest_sequence: (u64, u64),
finished: AtomicBool,
}
impl RegionWatermarkStream {
fn new(stream: SendableRecordBatchStream, region_id: RegionId, latest_sequence: u64) -> Self {
Self {
stream,
region_latest_sequence: (region_id.as_u64(), latest_sequence),
finished: AtomicBool::new(false),
}
}
fn merged_metrics(&self, mut metrics: RecordBatchMetrics) -> RecordBatchMetrics {
let region_latest_sequences = metrics.region_latest_sequences.get_or_insert_with(Vec::new);
if let Some((_, seq)) = region_latest_sequences
.iter_mut()
.find(|(region_id, _)| *region_id == self.region_latest_sequence.0)
{
*seq = self.region_latest_sequence.1;
} else {
region_latest_sequences.push(self.region_latest_sequence);
}
metrics
}
}
impl RecordBatchStream for RegionWatermarkStream {
fn name(&self) -> &str {
self.stream.name()
}
fn schema(&self) -> datatypes::schema::SchemaRef {
self.stream.schema()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
self.stream.output_ordering()
}
fn metrics(&self) -> Option<RecordBatchMetrics> {
let base = self.stream.metrics();
if !self.finished.load(Ordering::Relaxed) {
return base;
}
Some(self.merged_metrics(base.unwrap_or_default()))
}
}
impl Stream for RegionWatermarkStream {
type Item = common_recordbatch::error::Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Ready(None) => {
self.finished.store(true, Ordering::Relaxed);
Poll::Ready(None)
}
other => other,
}
}
}
#[cfg(test)]
mod watermark_tests {
use std::collections::HashMap;
use std::sync::Arc;
use common_recordbatch::{RecordBatch, RecordBatches};
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::Int32Vector;
use futures_util::StreamExt;
use session::context::QueryContextBuilder;
use super::*;
#[test]
fn test_should_return_region_seq() {
let ctx = QueryContextBuilder::default()
.extensions(HashMap::from([(
FLOW_RETURN_REGION_SEQ.to_string(),
"true".to_string(),
)]))
.build();
assert!(should_return_region_seq(&ctx));
let ctx = QueryContextBuilder::default()
.extensions(HashMap::from([(
FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
r#"{"1":10}"#.to_string(),
)]))
.build();
assert!(should_return_region_seq(&ctx));
let ctx = QueryContextBuilder::default().build();
assert!(!should_return_region_seq(&ctx));
}
#[tokio::test]
async fn test_region_watermark_stream_only_sets_terminal_metrics() {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"v",
ConcreteDataType::int32_datatype(),
false,
)]));
let values: VectorRef = Arc::new(Int32Vector::from_slice([1, 2]));
let batch = RecordBatch::new(schema.clone(), vec![values]).unwrap();
let stream = RecordBatches::try_new(schema, vec![batch])
.unwrap()
.as_stream();
let region_id = RegionId::new(42, 7);
let wrapped = RegionWatermarkStream::new(stream, region_id, 99);
let mut pinned = Box::pin(wrapped);
assert!(pinned.as_ref().get_ref().metrics().is_none());
while pinned.next().await.is_some() {}
let metrics = pinned.as_ref().get_ref().metrics().unwrap();
assert_eq!(
metrics.region_latest_sequences,
Some(vec![(region_id.as_u64(), 99)])
);
}
}
#[async_trait]
impl RegionServerHandler for RegionServer {
async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {

View File

@@ -399,7 +399,7 @@ impl ErrorExt for Error {
Error::PrometheusLabelValuesQueryPlan { source, .. } => source.status_code(),
Error::CollectRecordbatch { .. } => StatusCode::EngineExecuteQuery,
Error::CollectRecordbatch { source, .. } => source.status_code(),
Error::SqlExecIntercepted { source, .. } => source.status_code(),
Error::StartServer { source, .. } => source.status_code(),

View File

@@ -31,6 +31,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::{Arc, atomic};
use std::time::{Duration, SystemTime};
use arc_swap::ArcSwapOption;
use async_stream::stream;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
@@ -492,9 +493,12 @@ fn attach_timeout(output: Output, mut timeout: Duration) -> Result<Output> {
OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
OutputData::Stream(mut stream) => {
let schema = stream.schema();
let metrics = Arc::new(ArcSwapOption::from(stream.metrics().map(Arc::new)));
let metrics_ref = metrics.clone();
let s = Box::pin(stream! {
let mut start = tokio::time::Instant::now();
while let Some(item) = tokio::time::timeout(timeout, stream.next()).await.map_err(|_| StreamTimeoutSnafu.build())? {
metrics_ref.swap(stream.metrics().map(Arc::new));
yield item;
let now = tokio::time::Instant::now();
@@ -505,12 +509,13 @@ fn attach_timeout(output: Output, mut timeout: Duration) -> Result<Output> {
StreamTimeoutSnafu.fail()?;
}
}
metrics_ref.swap(stream.metrics().map(Arc::new));
}) as Pin<Box<dyn Stream<Item = _> + Send>>;
let stream = RecordBatchStreamWrapper {
schema,
stream: s,
output_ordering: None,
metrics: Default::default(),
metrics,
span: Span::current(),
};
Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
@@ -1131,12 +1136,22 @@ fn should_capture_statement(stmt: Option<&Statement>) -> bool {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Barrier};
use std::task::{Context, Poll};
use std::thread;
use std::time::{Duration, Instant};
use common_base::Plugins;
use common_query::{Output, OutputData};
use common_recordbatch::adapter::RecordBatchMetrics;
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream};
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::VectorRef;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::Int32Vector;
use futures::{Stream, StreamExt};
use query::query_engine::options::QueryOptions;
use session::context::QueryContext;
use sql::dialect::GreptimeDbDialect;
@@ -1381,4 +1396,109 @@ mod tests {
assert_eq!(result.is_ok(), is_ok);
}
}
struct MockMetricsStream {
schema: SchemaRef,
batch: Option<RecordBatch>,
metrics: RecordBatchMetrics,
terminal_metrics_only: bool,
}
impl Stream for MockMetricsStream {
type Item = common_recordbatch::error::Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(self.batch.take().map(Ok))
}
}
impl RecordBatchStream for MockMetricsStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
None
}
fn metrics(&self) -> Option<RecordBatchMetrics> {
if self.terminal_metrics_only && self.batch.is_some() {
return None;
}
Some(self.metrics.clone())
}
}
#[tokio::test]
async fn test_attach_timeout_preserves_stream_metrics() {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"v",
ConcreteDataType::int32_datatype(),
false,
)]));
let batch = RecordBatch::new(
schema.clone(),
vec![Arc::new(Int32Vector::from_slice([1, 2])) as VectorRef],
)
.unwrap();
let expected_metrics = RecordBatchMetrics {
region_latest_sequences: Some(vec![(42, 99)]),
..Default::default()
};
let output = Output::new_with_stream(Box::pin(MockMetricsStream {
schema,
batch: Some(batch),
metrics: expected_metrics.clone(),
terminal_metrics_only: false,
}));
let output = attach_timeout(output, Duration::from_secs(1)).unwrap();
let OutputData::Stream(mut stream) = output.data else {
panic!("expected stream output");
};
assert_eq!(
stream.metrics().and_then(|m| m.region_latest_sequences),
expected_metrics.region_latest_sequences.clone()
);
while stream.next().await.is_some() {}
assert_eq!(
stream.metrics().and_then(|m| m.region_latest_sequences),
expected_metrics.region_latest_sequences
);
}
#[tokio::test]
async fn test_attach_timeout_preserves_terminal_only_metrics() {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"v",
ConcreteDataType::int32_datatype(),
false,
)]));
let batch = RecordBatch::new(
schema.clone(),
vec![Arc::new(Int32Vector::from_slice([1, 2])) as VectorRef],
)
.unwrap();
let expected_metrics = RecordBatchMetrics {
region_latest_sequences: Some(vec![(7, 123)]),
..Default::default()
};
let output = Output::new_with_stream(Box::pin(MockMetricsStream {
schema,
batch: Some(batch),
metrics: expected_metrics.clone(),
terminal_metrics_only: true,
}));
let output = attach_timeout(output, Duration::from_secs(1)).unwrap();
let OutputData::Stream(mut stream) = output.data else {
panic!("expected stream output");
};
assert!(stream.metrics().is_none());
while stream.next().await.is_some() {}
assert_eq!(
stream.metrics().and_then(|m| m.region_latest_sequences),
expected_metrics.region_latest_sequences
);
}
}

View File

@@ -59,7 +59,8 @@ use crate::error::{
TableNotFoundSnafu, TableReadOnlySnafu, UnsupportedExprSnafu,
};
use crate::executor::QueryExecutor;
use crate::metrics::{OnDone, QUERY_STAGE_ELAPSED};
use crate::metrics::{OnDone, QUERY_STAGE_ELAPSED, RegionWatermarkMetricsStream};
use crate::options::FlowQueryExtensions;
use crate::physical_wrapper::PhysicalPlanWrapperRef;
use crate::planner::{DfLogicalPlanner, LogicalPlanner};
use crate::query_engine::{DescribeResult, QueryEngineContext, QueryEngineState};
@@ -586,6 +587,11 @@ impl QueryExecutor for DatafusionQueryEngine {
plan: &Arc<dyn ExecutionPlan>,
) -> Result<SendableRecordBatchStream> {
let explain_verbose = ctx.query_ctx().explain_verbose();
let should_collect_region_watermark =
FlowQueryExtensions::from_extensions(&ctx.query_ctx().extensions())
.map(|extensions| extensions.should_collect_region_watermark())
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let output_partitions = plan.properties().output_partitioning().partition_count();
if explain_verbose {
common_telemetry::info!("Executing query plan, output_partitions: {output_partitions}");
@@ -621,7 +627,14 @@ impl QueryExecutor for DatafusionQueryEngine {
);
}
});
Ok(Box::pin(stream))
if should_collect_region_watermark {
Ok(Box::pin(RegionWatermarkMetricsStream::new(
Box::pin(stream),
plan.clone(),
)))
} else {
Ok(Box::pin(stream))
}
}
_ => {
// merge into a single partition
@@ -650,7 +663,14 @@ impl QueryExecutor for DatafusionQueryEngine {
);
}
});
Ok(Box::pin(stream))
if should_collect_region_watermark {
Ok(Box::pin(RegionWatermarkMetricsStream::new(
Box::pin(stream),
plan.clone(),
)))
} else {
Ok(Box::pin(stream))
}
}
}
}

View File

@@ -13,16 +13,20 @@
// limitations under the License.
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use common_recordbatch::adapter::RecordBatchMetrics;
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use datafusion::physical_plan::ExecutionPlan;
use datatypes::schema::SchemaRef;
use futures::Stream;
use futures_util::ready;
use lazy_static::lazy_static;
use prometheus::*;
use crate::dist_plan::MergeScanExec;
lazy_static! {
/// Timer of different stages in query.
pub static ref QUERY_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
@@ -129,3 +133,72 @@ impl<F: FnOnce() + Unpin> Stream for OnDone<F> {
self.stream.size_hint()
}
}
pub struct RegionWatermarkMetricsStream {
stream: SendableRecordBatchStream,
plan: Arc<dyn ExecutionPlan>,
}
impl RegionWatermarkMetricsStream {
pub fn new(stream: SendableRecordBatchStream, plan: Arc<dyn ExecutionPlan>) -> Self {
Self { stream, plan }
}
}
impl RecordBatchStream for RegionWatermarkMetricsStream {
fn name(&self) -> &str {
self.stream.name()
}
fn schema(&self) -> SchemaRef {
self.stream.schema()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
self.stream.output_ordering()
}
fn metrics(&self) -> Option<RecordBatchMetrics> {
let mut metrics = self.stream.metrics()?;
let region_latest_sequences = collect_region_latest_sequences(self.plan.clone());
if !region_latest_sequences.is_empty() {
metrics.region_latest_sequences = Some(region_latest_sequences);
}
Some(metrics)
}
}
impl Stream for RegionWatermarkMetricsStream {
type Item = common_recordbatch::error::Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.stream).poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}
fn collect_region_latest_sequences(plan: Arc<dyn ExecutionPlan>) -> Vec<(u64, u64)> {
let mut merged = std::collections::HashMap::new();
let mut stack = vec![plan];
while let Some(plan) = stack.pop() {
if let Some(merge_scan) = plan.as_any().downcast_ref::<MergeScanExec>() {
for metrics in merge_scan.sub_stage_metrics() {
if let Some(region_latest_sequences) = metrics.region_latest_sequences {
for (region_id, seq) in region_latest_sequences {
merged.insert(region_id, seq);
}
}
}
}
stack.extend(plan.children().into_iter().cloned());
}
let mut region_latest_sequences = merged.into_iter().collect::<Vec<_>>();
region_latest_sequences.sort_unstable_by_key(|(region_id, _)| *region_id);
region_latest_sequences
}

View File

@@ -157,6 +157,10 @@ impl FlowQueryExtensions {
Ok(self.incremental_after_seqs.is_some())
}
pub fn should_collect_region_watermark(&self) -> bool {
self.return_region_seq || self.incremental_after_seqs.is_some()
}
}
fn parse_incremental_after_seqs(value: &str) -> Result<HashMap<u64, u64>> {
@@ -368,4 +372,28 @@ mod flow_extension_tests {
let apply_incremental = parsed.validate_for_scan(source_region_id).unwrap();
assert!(!apply_incremental);
}
#[test]
fn test_should_collect_region_watermark_defaults_false() {
let parsed = FlowQueryExtensions::default();
assert!(!parsed.should_collect_region_watermark());
}
#[test]
fn test_should_collect_region_watermark_true_for_return_region_seq() {
let parsed = FlowQueryExtensions {
return_region_seq: true,
..Default::default()
};
assert!(parsed.should_collect_region_watermark());
}
#[test]
fn test_should_collect_region_watermark_true_for_incremental_query() {
let parsed = FlowQueryExtensions {
incremental_after_seqs: Some(HashMap::from([(1, 10)])),
..Default::default()
};
assert!(parsed.should_collect_region_watermark());
}
}

View File

@@ -703,7 +703,7 @@ impl ErrorExt for Error {
#[cfg(not(windows))]
UpdateJemallocMetrics { .. } => StatusCode::Internal,
CollectRecordbatch { .. } => StatusCode::EngineExecuteQuery,
CollectRecordbatch { source, .. } => source.status_code(),
ExecuteQuery { source, .. }
| ExecutePlan { source, .. }

View File

@@ -23,10 +23,13 @@ mod test {
use auth::user_provider_from_option;
use client::{Client, Database};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_grpc::flight::do_put::DoPutMetadata;
use common_grpc::flight::{FlightEncoder, FlightMessage};
use common_query::OutputData;
use common_recordbatch::RecordBatch;
use common_recordbatch::adapter::RecordBatchMetrics;
use datatypes::prelude::{ConcreteDataType, ScalarVector, VectorRef};
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{Int32Vector, StringVector, TimestampMillisecondVector};
@@ -129,6 +132,141 @@ mod test {
| 1970-01-01T00:00:00.009 | -9 | s9 |
+-------------------------+----+----+";
query_and_expect(db.fe_instance().as_ref(), sql, expected).await;
let output = client.sql(sql).await.unwrap();
let OutputData::Stream(mut stream) = output.data else {
panic!("expected stream output");
};
while let Some(batch) = stream.next().await {
batch.unwrap();
}
let metrics = stream.metrics().expect("expected terminal metrics");
assert!(metrics.region_latest_sequences.is_none());
let output = client
.sql_with_hint(sql, &[("flow.return_region_seq", "true")])
.await
.unwrap();
let OutputData::Stream(mut stream) = output.data else {
panic!("expected stream output");
};
let mut row_count = 0;
while let Some(batch) = stream.next().await {
let batch = batch.unwrap();
row_count += batch.num_rows();
}
assert_eq!(row_count, 9);
let RecordBatchMetrics {
region_latest_sequences,
..
} = stream.metrics().expect("expected terminal metrics");
let region_latest_sequences =
region_latest_sequences.expect("expected region watermark metrics");
assert_eq!(region_latest_sequences.len(), 1);
assert!(region_latest_sequences[0].0 > 0);
assert!(region_latest_sequences[0].1 > 0);
let previous_watermark = region_latest_sequences[0];
let incremental_batches = create_record_batches(10);
test_put_record_batches(&client, incremental_batches).await;
let output = client
.sql_with_hint(
sql,
&[
("flow.incremental_mode", "memtable_only"),
(
"flow.incremental_after_seqs",
&format!(
r#"{{"{}":"{}"}}"#,
previous_watermark.0, previous_watermark.1
),
),
("flow.return_region_seq", "true"),
],
)
.await
.unwrap();
let OutputData::Stream(mut stream) = output.data else {
panic!("expected stream output");
};
let schema = stream.schema();
let mut rows = Vec::new();
while let Some(batch) = stream.next().await {
rows.push(batch.unwrap());
}
let pretty = common_recordbatch::RecordBatches::try_new(schema, rows)
.unwrap()
.pretty_print()
.unwrap();
assert_eq!(
pretty,
"\
+-------------------------+-----+-----+
| ts | a | B |
+-------------------------+-----+-----+
| 1970-01-01T00:00:00.010 | -10 | s10 |
| 1970-01-01T00:00:00.011 | -11 | s11 |
| 1970-01-01T00:00:00.012 | -12 | s12 |
| 1970-01-01T00:00:00.013 | -13 | s13 |
| 1970-01-01T00:00:00.014 | -14 | s14 |
| 1970-01-01T00:00:00.015 | -15 | s15 |
| 1970-01-01T00:00:00.016 | -16 | s16 |
| 1970-01-01T00:00:00.017 | -17 | s17 |
| 1970-01-01T00:00:00.018 | -18 | s18 |
+-------------------------+-----+-----+"
);
let RecordBatchMetrics {
region_latest_sequences,
..
} = stream
.metrics()
.expect("expected terminal incremental metrics");
let region_latest_sequences =
region_latest_sequences.expect("expected incremental region watermark metrics");
assert_eq!(region_latest_sequences.len(), 1);
assert_eq!(region_latest_sequences[0].0, previous_watermark.0);
assert!(region_latest_sequences[0].1 > previous_watermark.1);
client.sql("admin flush_table('foo')").await.unwrap();
let output = client
.sql_with_hint(
sql,
&[
("flow.incremental_mode", "memtable_only"),
(
"flow.incremental_after_seqs",
&format!(
r#"{{"{}":"{}"}}"#,
previous_watermark.0, previous_watermark.1
),
),
("flow.return_region_seq", "true"),
],
)
.await
.unwrap();
let OutputData::Stream(mut stream) = output.data else {
panic!("expected stream output");
};
let err = loop {
match stream.next().await {
Some(Err(err)) => break err,
Some(Ok(_)) => continue,
None => panic!("expected stale incremental query to fail while consuming stream"),
}
};
assert_eq!(err.status_code(), StatusCode::EngineExecuteQuery);
let err_msg = format!("{err:?}");
assert!(err_msg.contains("STALE_CURSOR"));
assert!(err_msg.contains(&previous_watermark.0.to_string()));
assert!(err_msg.contains(&previous_watermark.1.to_string()));
}
async fn test_put_record_batches(client: &Database, record_batches: Vec<RecordBatch>) {