mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-18 14:00:39 +00:00
@@ -29,7 +29,7 @@ mod test {
|
||||
use common_grpc::flight::{FlightEncoder, FlightMessage};
|
||||
use common_query::OutputData;
|
||||
use common_recordbatch::RecordBatch;
|
||||
use common_recordbatch::adapter::RecordBatchMetrics;
|
||||
use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry};
|
||||
use datatypes::prelude::{ConcreteDataType, ScalarVector, VectorRef};
|
||||
use datatypes::schema::{ColumnSchema, Schema};
|
||||
use datatypes::vectors::{Int32Vector, StringVector, TimestampMillisecondVector};
|
||||
@@ -142,7 +142,7 @@ mod test {
|
||||
batch.unwrap();
|
||||
}
|
||||
let metrics = stream.metrics().expect("expected terminal metrics");
|
||||
assert!(metrics.region_latest_sequences.is_none());
|
||||
assert!(metrics.region_watermarks.is_empty());
|
||||
|
||||
let result = client
|
||||
.sql_with_terminal_metrics(sql, &[("flow.return_region_seq", "true")])
|
||||
@@ -180,10 +180,14 @@ mod test {
|
||||
assert_eq!(row_count, 9);
|
||||
|
||||
let metrics = stream.metrics().expect("expected terminal metrics");
|
||||
let region_latest_sequences = metrics
|
||||
.region_latest_sequences
|
||||
.expect("expected region watermark metrics");
|
||||
assert_eq!(region_latest_sequences, vec![expected_watermark]);
|
||||
let region_watermarks = metrics.region_watermarks;
|
||||
assert_eq!(
|
||||
region_watermarks,
|
||||
vec![RegionWatermarkEntry {
|
||||
region_id: expected_watermark.0,
|
||||
watermark: Some(expected_watermark.1),
|
||||
}]
|
||||
);
|
||||
|
||||
let previous_watermark = expected_watermark;
|
||||
|
||||
@@ -279,13 +283,10 @@ mod test {
|
||||
);
|
||||
|
||||
let RecordBatchMetrics {
|
||||
region_latest_sequences,
|
||||
..
|
||||
region_watermarks, ..
|
||||
} = stream
|
||||
.metrics()
|
||||
.expect("expected terminal incremental metrics");
|
||||
let region_latest_sequences =
|
||||
region_latest_sequences.expect("expected incremental region watermark metrics");
|
||||
let regions = db.list_all_regions().await;
|
||||
let region = regions
|
||||
.get(&RegionId::from_u64(previous_watermark.0))
|
||||
@@ -293,8 +294,11 @@ mod test {
|
||||
let expected_incremental_watermark =
|
||||
(previous_watermark.0, region.find_committed_sequence());
|
||||
assert_eq!(
|
||||
region_latest_sequences,
|
||||
vec![expected_incremental_watermark]
|
||||
region_watermarks,
|
||||
vec![RegionWatermarkEntry {
|
||||
region_id: expected_incremental_watermark.0,
|
||||
watermark: Some(expected_incremental_watermark.1),
|
||||
}]
|
||||
);
|
||||
|
||||
client.sql("admin flush_table('foo')").await.unwrap();
|
||||
|
||||
Reference in New Issue
Block a user