mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-20 15:00:40 +00:00
@@ -47,7 +47,7 @@ use common_telemetry::{error, warn};
|
||||
use futures::future;
|
||||
use futures_util::{Stream, StreamExt, TryStreamExt};
|
||||
use prost::Message;
|
||||
use snafu::{OptionExt, ResultExt, ensure};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use tonic::metadata::{AsciiMetadataKey, AsciiMetadataValue, MetadataMap, MetadataValue};
|
||||
use tonic::transport::Channel;
|
||||
|
||||
@@ -225,13 +225,9 @@ impl Stream for StreamWithMetrics {
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let polled = Pin::new(&mut self.stream).poll_next(cx);
|
||||
match &polled {
|
||||
Poll::Ready(Some(_)) => self.sync_terminal_metrics(),
|
||||
Poll::Ready(None) => {
|
||||
self.sync_terminal_metrics();
|
||||
self.metrics.mark_ready();
|
||||
}
|
||||
Poll::Pending => {}
|
||||
if let Poll::Ready(None) = &polled {
|
||||
self.sync_terminal_metrics();
|
||||
self.metrics.mark_ready();
|
||||
}
|
||||
polled
|
||||
}
|
||||
@@ -286,12 +282,6 @@ where
|
||||
Some(FlightMessage::Metrics(s)) if terminal_metrics.get().is_none() => {
|
||||
terminal_metrics.update(Some(parse_terminal_metrics(&s)?));
|
||||
terminal_metrics.mark_ready();
|
||||
ensure!(
|
||||
flight_message_stream.next().await.is_none(),
|
||||
IllegalFlightMessagesSnafu {
|
||||
reason: "Expect 'AffectedRows' Flight messages to be followed by at most one Metrics message"
|
||||
}
|
||||
);
|
||||
}
|
||||
Some(FlightMessage::Metrics(_)) => {
|
||||
return IllegalFlightMessagesSnafu {
|
||||
|
||||
Reference in New Issue
Block a user