From b1219fa4567d683ed48552e692e5e93015604804 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 5 Jul 2024 16:39:35 +0800 Subject: [PATCH] feat: refine scan metrics logging (#4296) * fix: collect scan cost in row group reader * feat: remove log after scan * feat: collect prepare scan cost before fetching readers * print first poll elapsed Signed-off-by: Ruihang Xia * feat: print more first poll --------- Signed-off-by: Ruihang Xia Co-authored-by: Ruihang Xia --- src/mito2/src/read/scan_region.rs | 6 +----- src/mito2/src/read/seq_scan.rs | 17 +++++++++++------ src/mito2/src/read/unordered_scan.rs | 9 +++++---- src/mito2/src/sst/parquet/reader.rs | 8 ++++---- 4 files changed, 21 insertions(+), 19 deletions(-) diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 48afb2d009..ce4f789b9c 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -16,7 +16,7 @@ use std::fmt; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; use common_error::ext::BoxedError; use common_recordbatch::SendableRecordBatchStream; @@ -782,21 +782,17 @@ pub(crate) struct StreamContext { // Metrics: /// The start time of the query. pub(crate) query_start: Instant, - /// Time elapsed before creating the scanner. - pub(crate) prepare_scan_cost: Duration, } impl StreamContext { /// Creates a new [StreamContext]. pub(crate) fn new(input: ScanInput) -> Self { let query_start = input.query_start.unwrap_or_else(Instant::now); - let prepare_scan_cost = query_start.elapsed(); Self { input, parts: Mutex::new(ScanPartList::default()), query_start, - prepare_scan_cost, } } diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 9a0038135f..7204bf87e7 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -90,7 +90,7 @@ impl SeqScan { /// Builds a [BoxedBatchReader] from sequential scan for compaction. pub async fn build_reader(&self) -> Result { let mut metrics = ScannerMetrics { - prepare_scan_cost: self.stream_ctx.prepare_scan_cost, + prepare_scan_cost: self.stream_ctx.query_start.elapsed(), ..Default::default() }; let maybe_reader = @@ -247,13 +247,15 @@ impl SeqScan { } let mut metrics = ScannerMetrics { - prepare_scan_cost: self.stream_ctx.prepare_scan_cost, + prepare_scan_cost: self.stream_ctx.query_start.elapsed(), ..Default::default() }; let stream_ctx = self.stream_ctx.clone(); let semaphore = self.semaphore.clone(); let partition_ranges = self.properties.partitions[partition].clone(); let stream = try_stream! { + let first_poll = stream_ctx.query_start.elapsed(); + for partition_range in partition_ranges { let maybe_reader = Self::build_merge_reader(&stream_ctx, partition_range.identifier, semaphore.clone(), &mut metrics) @@ -287,10 +289,11 @@ impl SeqScan { metrics.observe_metrics_on_finish(); debug!( - "Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}", + "Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}, first_poll: {:?}", stream_ctx.input.mapper.metadata().region_id, partition, metrics, + first_poll, ); } }; @@ -321,7 +324,7 @@ impl SeqScan { )); } let mut metrics = ScannerMetrics { - prepare_scan_cost: self.stream_ctx.prepare_scan_cost, + prepare_scan_cost: self.stream_ctx.query_start.elapsed(), ..Default::default() }; let stream_ctx = self.stream_ctx.clone(); @@ -329,6 +332,8 @@ impl SeqScan { // build stream let stream = try_stream! { + let first_poll = stream_ctx.query_start.elapsed(); + // init parts let parts_len = { let mut parts = stream_ctx.parts.lock().await; @@ -375,10 +380,11 @@ impl SeqScan { metrics.observe_metrics_on_finish(); debug!( - "Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}", + "Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}, first_poll: {:?}", stream_ctx.input.mapper.metadata().region_id, partition, metrics, + first_poll ); } }; @@ -444,7 +450,6 @@ impl fmt::Debug for SeqScan { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("SeqScan") .field("parts", &self.stream_ctx.parts) - .field("prepare_scan_cost", &self.stream_ctx.prepare_scan_cost) .finish() } } diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 62a173d86c..663097012b 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -141,11 +141,13 @@ impl RegionScanner for UnorderedScan { fn scan_partition(&self, partition: usize) -> Result { let mut metrics = ScannerMetrics { - prepare_scan_cost: self.stream_ctx.prepare_scan_cost, + prepare_scan_cost: self.stream_ctx.query_start.elapsed(), ..Default::default() }; let stream_ctx = self.stream_ctx.clone(); let stream = try_stream! { + let first_poll = stream_ctx.query_start.elapsed(); + let mut parts = stream_ctx.parts.lock().await; maybe_init_parts(&mut parts, &stream_ctx.input, &mut metrics) .await @@ -196,8 +198,8 @@ impl RegionScanner for UnorderedScan { metrics.total_cost = query_start.elapsed(); metrics.observe_metrics_on_finish(); debug!( - "Unordered scan partition {} finished, region_id: {}, metrics: {:?}, reader_metrics: {:?}", - partition, mapper.metadata().region_id, metrics, reader_metrics + "Unordered scan partition {} finished, region_id: {}, metrics: {:?}, reader_metrics: {:?}, first_poll: {:?}", + partition, mapper.metadata().region_id, metrics, reader_metrics, first_poll, ); }; let stream = Box::pin(RecordBatchStreamWrapper::new( @@ -220,7 +222,6 @@ impl fmt::Debug for UnorderedScan { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("UnorderedScan") .field("parts", &self.stream_ctx.parts) - .field("prepare_scan_cost", &self.stream_ctx.prepare_scan_cost) .finish() } } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 98ef033313..32a24f682c 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -729,10 +729,8 @@ impl BatchReader for ParquetReader { return Ok(None); }; - let start = Instant::now(); // We don't collect the elapsed time if the reader returns an error. if let Some(batch) = reader.next_batch().await? { - reader.metrics.scan_cost += start.elapsed(); return Ok(Some(batch)); } @@ -746,13 +744,11 @@ impl BatchReader for ParquetReader { // Resets the parquet reader. reader.reset_reader(parquet_reader); if let Some(batch) = reader.next_batch().await? { - reader.metrics.scan_cost += start.elapsed(); return Ok(Some(batch)); } } // The reader is exhausted. - reader.metrics.scan_cost += start.elapsed(); self.reader_state = ReaderState::Exhausted(std::mem::take(&mut reader.metrics)); Ok(None) } @@ -874,14 +870,17 @@ impl RowGroupReader { /// Tries to fetch next [Batch] from the reader. pub(crate) async fn next_batch(&mut self) -> Result> { + let scan_start = Instant::now(); if let Some(batch) = self.batches.pop_front() { self.metrics.num_rows += batch.num_rows(); + self.metrics.scan_cost += scan_start.elapsed(); return Ok(Some(batch)); } // We need to fetch next record batch and convert it to batches. while self.batches.is_empty() { let Some(record_batch) = self.fetch_next_record_batch()? else { + self.metrics.scan_cost += scan_start.elapsed(); return Ok(None); }; self.metrics.num_record_batches += 1; @@ -894,6 +893,7 @@ impl RowGroupReader { } let batch = self.batches.pop_front(); self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0); + self.metrics.scan_cost += scan_start.elapsed(); Ok(batch) }