From 501b1940f353d2c30dfc9c12b8b760676909a4bf Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Sun, 25 Jun 2023 12:34:50 +0800 Subject: [PATCH] chore: add timer to ready poll --- src/common/recordbatch/src/adapter.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs index b03f66d1a1..4a5dfdcc13 100644 --- a/src/common/recordbatch/src/adapter.rs +++ b/src/common/recordbatch/src/adapter.rs @@ -116,7 +116,6 @@ impl Stream for DfRecordBatchStreamAdapter { pub struct RecordBatchStreamAdapter { schema: SchemaRef, stream: DfSendableRecordBatchStream, - #[allow(dead_code)] metrics: Option, } @@ -155,16 +154,16 @@ impl Stream for RecordBatchStreamAdapter { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // DEBUG: remove timer - // let timer = self - // .metrics - // .as_ref() - // .map(|m| m.elapsed_compute().clone()) - // .unwrap_or_default(); - // let _guard = timer.timer(); match Pin::new(&mut self.stream).poll_next(cx) { Poll::Pending => Poll::Pending, Poll::Ready(Some(df_record_batch)) => { + let timer = self + .metrics + .as_ref() + .map(|m| m.elapsed_compute().clone()) + .unwrap_or_default(); + let _guard = timer.timer(); + let df_record_batch = df_record_batch.context(error::PollStreamSnafu)?; Poll::Ready(Some(RecordBatch::try_from_df_record_batch( self.schema(),