From ca54b05be30ed3b78c598e11d3eca92ad3fb85c8 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 1 Aug 2024 20:19:15 +0800 Subject: [PATCH] feat: time poll elapsed for RegionScan plan (#4482) * feat: time poll elapsed for RegionScan plan Signed-off-by: Ruihang Xia * also record await time Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/table/src/table/metrics.rs | 34 +++++++++++++++++------ src/table/src/table/scan.rs | 51 +++++++++++++++++++++------------- 2 files changed, 58 insertions(+), 27 deletions(-) diff --git a/src/table/src/table/metrics.rs b/src/table/src/table/metrics.rs index e24f0ff90a..5fbdf641f5 100644 --- a/src/table/src/table/metrics.rs +++ b/src/table/src/table/metrics.rs @@ -12,23 +12,30 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::Duration; + use datafusion::physical_plan::metrics::{ - Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, Timestamp, + Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, ScopedTimerGuard, Time, Timestamp, }; -/// This metrics struct is used to record and hold memory usage +/// This metrics struct is used to record and hold metrics like memory usage /// of result batch in [`crate::table::scan::StreamWithMetricWrapper`] -/// during query execution, indicating size of the dataset. +/// during query execution. #[derive(Debug, Clone)] -pub struct MemoryUsageMetrics { +pub struct StreamMetrics { + /// Timestamp when the stream finished end_time: Timestamp, - // used memory in bytes + /// Used memory in bytes mem_used: Gauge, - // number of rows in output + /// Number of rows in output output_rows: Count, + /// Elapsed time used to `poll` the stream + poll_elapsed: Time, + /// Elapsed time used to `.await`ing the stream + await_elapsed: Time, } -impl MemoryUsageMetrics { +impl StreamMetrics { /// Create a new MemoryUsageMetrics structure, and set `start_time` to now pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { let start_time = MetricBuilder::new(metrics).start_timestamp(partition); @@ -38,6 +45,8 @@ impl MemoryUsageMetrics { end_time: MetricBuilder::new(metrics).end_timestamp(partition), mem_used: MetricBuilder::new(metrics).mem_used(partition), output_rows: MetricBuilder::new(metrics).output_rows(partition), + poll_elapsed: MetricBuilder::new(metrics).subset_time("elapsed_poll", partition), + await_elapsed: MetricBuilder::new(metrics).subset_time("elapsed_await", partition), } } @@ -55,9 +64,18 @@ impl MemoryUsageMetrics { self.end_time.record() } } + + /// Return a timer guard that records the time elapsed in poll + pub fn poll_timer(&self) -> ScopedTimerGuard { + self.poll_elapsed.timer() + } + + pub fn record_await_duration(&self, duration: Duration) { + self.await_elapsed.add_duration(duration); + } } -impl Drop for MemoryUsageMetrics { +impl Drop for StreamMetrics { fn drop(&mut self) { self.try_done() } diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 70e76586b6..02c1147875 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -16,6 +16,7 @@ use std::any::Any; use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; +use std::time::Instant; use common_error::ext::BoxedError; use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream, SendableRecordBatchStream}; @@ -34,7 +35,7 @@ use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef; use futures::{Stream, StreamExt}; use store_api::region_engine::{PartitionRange, RegionScannerRef}; -use crate::table::metrics::MemoryUsageMetrics; +use crate::table::metrics::StreamMetrics; /// A plan to read multiple partitions from a region of a table. #[derive(Debug)] @@ -139,11 +140,12 @@ impl ExecutionPlan for RegionScanExec { .unwrap() .scan_partition(partition) .map_err(|e| DataFusionError::External(Box::new(e)))?; - let mem_usage_metrics = MemoryUsageMetrics::new(&self.metric, partition); + let stream_metrics = StreamMetrics::new(&self.metric, partition); Ok(Box::pin(StreamWithMetricWrapper { stream, - metric: mem_usage_metrics, + metric: stream_metrics, span, + await_timer: None, })) } @@ -164,8 +166,9 @@ impl DisplayAs for RegionScanExec { pub struct StreamWithMetricWrapper { stream: SendableRecordBatchStream, - metric: MemoryUsageMetrics, + metric: StreamMetrics, span: Span, + await_timer: Option, } impl Stream for StreamWithMetricWrapper { @@ -174,22 +177,32 @@ impl Stream for StreamWithMetricWrapper { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); let _enter = this.span.enter(); - match this.stream.poll_next_unpin(cx) { - Poll::Ready(Some(result)) => match result { - Ok(record_batch) => { - let batch_mem_size = record_batch - .columns() - .iter() - .map(|vec_ref| vec_ref.memory_size()) - .sum::(); - // we don't record elapsed time here - // since it's calling storage api involving I/O ops - this.metric.record_mem_usage(batch_mem_size); - this.metric.record_output(record_batch.num_rows()); - Poll::Ready(Some(Ok(record_batch.into_df_record_batch()))) + let poll_timer = this.metric.poll_timer(); + this.await_timer.get_or_insert(Instant::now()); + let poll_result = this.stream.poll_next_unpin(cx); + drop(poll_timer); + match poll_result { + Poll::Ready(Some(result)) => { + if let Some(instant) = this.await_timer.take() { + let elapsed = instant.elapsed(); + this.metric.record_await_duration(elapsed); } - Err(e) => Poll::Ready(Some(Err(DataFusionError::External(Box::new(e))))), - }, + match result { + Ok(record_batch) => { + let batch_mem_size = record_batch + .columns() + .iter() + .map(|vec_ref| vec_ref.memory_size()) + .sum::(); + // we don't record elapsed time here + // since it's calling storage api involving I/O ops + this.metric.record_mem_usage(batch_mem_size); + this.metric.record_output(record_batch.num_rows()); + Poll::Ready(Some(Ok(record_batch.into_df_record_batch()))) + } + Err(e) => Poll::Ready(Some(Err(DataFusionError::External(Box::new(e))))), + } + } Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, }