chore: add timer to ready poll

This commit is contained in:
shuiyisong
2023-06-25 12:34:50 +08:00
parent 138032672d
commit 501b1940f3

View File

@@ -116,7 +116,6 @@ impl Stream for DfRecordBatchStreamAdapter {
pub struct RecordBatchStreamAdapter {
schema: SchemaRef,
stream: DfSendableRecordBatchStream,
#[allow(dead_code)]
metrics: Option<BaselineMetrics>,
}
@@ -155,16 +154,16 @@ impl Stream for RecordBatchStreamAdapter {
type Item = Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// DEBUG: remove timer
// let timer = self
// .metrics
// .as_ref()
// .map(|m| m.elapsed_compute().clone())
// .unwrap_or_default();
// let _guard = timer.timer();
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(df_record_batch)) => {
let timer = self
.metrics
.as_ref()
.map(|m| m.elapsed_compute().clone())
.unwrap_or_default();
let _guard = timer.timer();
let df_record_batch = df_record_batch.context(error::PollStreamSnafu)?;
Poll::Ready(Some(RecordBatch::try_from_df_record_batch(
self.schema(),