diff --git a/tests-integration/src/grpc/flight.rs b/tests-integration/src/grpc/flight.rs index 3a9e2b7330..c353c41201 100644 --- a/tests-integration/src/grpc/flight.rs +++ b/tests-integration/src/grpc/flight.rs @@ -29,7 +29,7 @@ mod test { use common_grpc::flight::{FlightEncoder, FlightMessage}; use common_query::OutputData; use common_recordbatch::RecordBatch; - use common_recordbatch::adapter::RecordBatchMetrics; + use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry}; use datatypes::prelude::{ConcreteDataType, ScalarVector, VectorRef}; use datatypes::schema::{ColumnSchema, Schema}; use datatypes::vectors::{Int32Vector, StringVector, TimestampMillisecondVector}; @@ -142,7 +142,7 @@ mod test { batch.unwrap(); } let metrics = stream.metrics().expect("expected terminal metrics"); - assert!(metrics.region_latest_sequences.is_none()); + assert!(metrics.region_watermarks.is_empty()); let result = client .sql_with_terminal_metrics(sql, &[("flow.return_region_seq", "true")]) @@ -180,10 +180,14 @@ mod test { assert_eq!(row_count, 9); let metrics = stream.metrics().expect("expected terminal metrics"); - let region_latest_sequences = metrics - .region_latest_sequences - .expect("expected region watermark metrics"); - assert_eq!(region_latest_sequences, vec![expected_watermark]); + let region_watermarks = metrics.region_watermarks; + assert_eq!( + region_watermarks, + vec![RegionWatermarkEntry { + region_id: expected_watermark.0, + watermark: Some(expected_watermark.1), + }] + ); let previous_watermark = expected_watermark; @@ -279,13 +283,10 @@ mod test { ); let RecordBatchMetrics { - region_latest_sequences, - .. + region_watermarks, .. } = stream .metrics() .expect("expected terminal incremental metrics"); - let region_latest_sequences = - region_latest_sequences.expect("expected incremental region watermark metrics"); let regions = db.list_all_regions().await; let region = regions .get(&RegionId::from_u64(previous_watermark.0)) @@ -293,8 +294,11 @@ mod test { let expected_incremental_watermark = (previous_watermark.0, region.find_committed_sequence()); assert_eq!( - region_latest_sequences, - vec![expected_incremental_watermark] + region_watermarks, + vec![RegionWatermarkEntry { + region_id: expected_incremental_watermark.0, + watermark: Some(expected_incremental_watermark.1), + }] ); client.sql("admin flush_table('foo')").await.unwrap();