From 1d73ad2a2fb96b3209985d1cd764ceea5e095e4f Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 30 Nov 2023 15:37:47 +0800 Subject: [PATCH] feat: add fetch cost metrics --- src/mito2/src/read/merge.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/mito2/src/read/merge.rs b/src/mito2/src/read/merge.rs index f14b6da3c3..511b872ad4 100644 --- a/src/mito2/src/read/merge.rs +++ b/src/mito2/src/read/merge.rs @@ -375,6 +375,8 @@ struct Metrics { num_output_rows: usize, /// Number of deleted rows. num_deleted_rows: usize, + /// Cost to fetch batches from sources. + fetch_cost: Duration, } /// Helper to collect and merge small batches for same primary key. @@ -476,7 +478,9 @@ impl Node { /// It tries to fetch one batch from the `source`. async fn new(mut source: Source, metrics: &mut Metrics) -> Result { // Ensures batch is not empty. + let start = Instant::now(); let current_batch = source.next_batch().await?.map(CompareFirst); + metrics.fetch_cost += start.elapsed(); metrics.num_input_rows += current_batch.as_ref().map(|b| b.0.num_rows()).unwrap_or(0); Ok(Node { @@ -513,8 +517,10 @@ impl Node { /// Panics if the node has reached EOF. async fn fetch_batch(&mut self, metrics: &mut Metrics) -> Result { let current = self.current_batch.take().unwrap(); + let start = Instant::now(); // Ensures batch is not empty. self.current_batch = self.source.next_batch().await?.map(CompareFirst); + metrics.fetch_cost += start.elapsed(); metrics.num_input_rows += self .current_batch .as_ref()