From 01e0df2c3a421d345f8af4bd5739fc0ad773862f Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 30 Nov 2023 17:48:34 +0800 Subject: [PATCH] feat: logs --- src/mito2/src/read.rs | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index c3ce229780..2e4a287c83 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -22,6 +22,7 @@ pub(crate) mod seq_scan; use std::collections::HashSet; use std::sync::Arc; +use std::time::{Duration, Instant}; use api::v1::OpType; use async_trait::async_trait; @@ -671,7 +672,7 @@ pub enum Source { /// Source from a [BoxedBatchIterator]. Iter(BoxedBatchIterator), /// Source from a [BoxedBatchStream]. - Stream(BoxedBatchStream), + Stream(BoxedBatchStream, Duration), } impl Source { @@ -680,7 +681,20 @@ impl Source { match self { Source::Reader(reader) => reader.next_batch().await, Source::Iter(iter) => iter.next().transpose(), - Source::Stream(stream) => stream.try_next().await, + Source::Stream(stream, duration) => { + let start = Instant::now(); + let ret = stream.try_next().await; + *duration += start.elapsed(); + ret + } + } + } +} + +impl Drop for Source { + fn drop(&mut self) { + if let Source::Stream(s, du) = &self { + common_telemetry::debug!("Source fetch time {:?}", du); } } }