feat: refine scan metrics logging (#4296)

* fix: collect scan cost in row group reader

* feat: remove log after scan

* feat: collect prepare scan cost before fetching readers

* print first poll elapsed

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* feat: print more first poll

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Yingwen
2024-07-05 16:39:35 +08:00
committed by GitHub
parent 4f0984c1d7
commit b1219fa456
4 changed files with 21 additions and 19 deletions

View File

@@ -16,7 +16,7 @@
use std::fmt;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Instant;
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
@@ -782,21 +782,17 @@ pub(crate) struct StreamContext {
// Metrics:
/// The start time of the query.
pub(crate) query_start: Instant,
/// Time elapsed before creating the scanner.
pub(crate) prepare_scan_cost: Duration,
}
impl StreamContext {
/// Creates a new [StreamContext].
pub(crate) fn new(input: ScanInput) -> Self {
let query_start = input.query_start.unwrap_or_else(Instant::now);
let prepare_scan_cost = query_start.elapsed();
Self {
input,
parts: Mutex::new(ScanPartList::default()),
query_start,
prepare_scan_cost,
}
}

View File

@@ -90,7 +90,7 @@ impl SeqScan {
/// Builds a [BoxedBatchReader] from sequential scan for compaction.
pub async fn build_reader(&self) -> Result<BoxedBatchReader> {
let mut metrics = ScannerMetrics {
prepare_scan_cost: self.stream_ctx.prepare_scan_cost,
prepare_scan_cost: self.stream_ctx.query_start.elapsed(),
..Default::default()
};
let maybe_reader =
@@ -247,13 +247,15 @@ impl SeqScan {
}
let mut metrics = ScannerMetrics {
prepare_scan_cost: self.stream_ctx.prepare_scan_cost,
prepare_scan_cost: self.stream_ctx.query_start.elapsed(),
..Default::default()
};
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.semaphore.clone();
let partition_ranges = self.properties.partitions[partition].clone();
let stream = try_stream! {
let first_poll = stream_ctx.query_start.elapsed();
for partition_range in partition_ranges {
let maybe_reader =
Self::build_merge_reader(&stream_ctx, partition_range.identifier, semaphore.clone(), &mut metrics)
@@ -287,10 +289,11 @@ impl SeqScan {
metrics.observe_metrics_on_finish();
debug!(
"Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}",
"Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}, first_poll: {:?}",
stream_ctx.input.mapper.metadata().region_id,
partition,
metrics,
first_poll,
);
}
};
@@ -321,7 +324,7 @@ impl SeqScan {
));
}
let mut metrics = ScannerMetrics {
prepare_scan_cost: self.stream_ctx.prepare_scan_cost,
prepare_scan_cost: self.stream_ctx.query_start.elapsed(),
..Default::default()
};
let stream_ctx = self.stream_ctx.clone();
@@ -329,6 +332,8 @@ impl SeqScan {
// build stream
let stream = try_stream! {
let first_poll = stream_ctx.query_start.elapsed();
// init parts
let parts_len = {
let mut parts = stream_ctx.parts.lock().await;
@@ -375,10 +380,11 @@ impl SeqScan {
metrics.observe_metrics_on_finish();
debug!(
"Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}",
"Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}, first_poll: {:?}",
stream_ctx.input.mapper.metadata().region_id,
partition,
metrics,
first_poll
);
}
};
@@ -444,7 +450,6 @@ impl fmt::Debug for SeqScan {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SeqScan")
.field("parts", &self.stream_ctx.parts)
.field("prepare_scan_cost", &self.stream_ctx.prepare_scan_cost)
.finish()
}
}

View File

@@ -141,11 +141,13 @@ impl RegionScanner for UnorderedScan {
fn scan_partition(&self, partition: usize) -> Result<SendableRecordBatchStream, BoxedError> {
let mut metrics = ScannerMetrics {
prepare_scan_cost: self.stream_ctx.prepare_scan_cost,
prepare_scan_cost: self.stream_ctx.query_start.elapsed(),
..Default::default()
};
let stream_ctx = self.stream_ctx.clone();
let stream = try_stream! {
let first_poll = stream_ctx.query_start.elapsed();
let mut parts = stream_ctx.parts.lock().await;
maybe_init_parts(&mut parts, &stream_ctx.input, &mut metrics)
.await
@@ -196,8 +198,8 @@ impl RegionScanner for UnorderedScan {
metrics.total_cost = query_start.elapsed();
metrics.observe_metrics_on_finish();
debug!(
"Unordered scan partition {} finished, region_id: {}, metrics: {:?}, reader_metrics: {:?}",
partition, mapper.metadata().region_id, metrics, reader_metrics
"Unordered scan partition {} finished, region_id: {}, metrics: {:?}, reader_metrics: {:?}, first_poll: {:?}",
partition, mapper.metadata().region_id, metrics, reader_metrics, first_poll,
);
};
let stream = Box::pin(RecordBatchStreamWrapper::new(
@@ -220,7 +222,6 @@ impl fmt::Debug for UnorderedScan {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("UnorderedScan")
.field("parts", &self.stream_ctx.parts)
.field("prepare_scan_cost", &self.stream_ctx.prepare_scan_cost)
.finish()
}
}

View File

@@ -729,10 +729,8 @@ impl BatchReader for ParquetReader {
return Ok(None);
};
let start = Instant::now();
// We don't collect the elapsed time if the reader returns an error.
if let Some(batch) = reader.next_batch().await? {
reader.metrics.scan_cost += start.elapsed();
return Ok(Some(batch));
}
@@ -746,13 +744,11 @@ impl BatchReader for ParquetReader {
// Resets the parquet reader.
reader.reset_reader(parquet_reader);
if let Some(batch) = reader.next_batch().await? {
reader.metrics.scan_cost += start.elapsed();
return Ok(Some(batch));
}
}
// The reader is exhausted.
reader.metrics.scan_cost += start.elapsed();
self.reader_state = ReaderState::Exhausted(std::mem::take(&mut reader.metrics));
Ok(None)
}
@@ -874,14 +870,17 @@ impl RowGroupReader {
/// Tries to fetch next [Batch] from the reader.
pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
let scan_start = Instant::now();
if let Some(batch) = self.batches.pop_front() {
self.metrics.num_rows += batch.num_rows();
self.metrics.scan_cost += scan_start.elapsed();
return Ok(Some(batch));
}
// We need to fetch next record batch and convert it to batches.
while self.batches.is_empty() {
let Some(record_batch) = self.fetch_next_record_batch()? else {
self.metrics.scan_cost += scan_start.elapsed();
return Ok(None);
};
self.metrics.num_record_batches += 1;
@@ -894,6 +893,7 @@ impl RowGroupReader {
}
let batch = self.batches.pop_front();
self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
self.metrics.scan_cost += scan_start.elapsed();
Ok(batch)
}