mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat: time poll elapsed for RegionScan plan (#4482)
* feat: time poll elapsed for RegionScan plan Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * also record await time Signed-off-by: Ruihang Xia <waynestxia@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -12,23 +12,30 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use datafusion::physical_plan::metrics::{
|
||||
Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, Timestamp,
|
||||
Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, ScopedTimerGuard, Time, Timestamp,
|
||||
};
|
||||
|
||||
/// This metrics struct is used to record and hold memory usage
|
||||
/// This metrics struct is used to record and hold metrics like memory usage
|
||||
/// of result batch in [`crate::table::scan::StreamWithMetricWrapper`]
|
||||
/// during query execution, indicating size of the dataset.
|
||||
/// during query execution.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct MemoryUsageMetrics {
|
||||
pub struct StreamMetrics {
|
||||
/// Timestamp when the stream finished
|
||||
end_time: Timestamp,
|
||||
// used memory in bytes
|
||||
/// Used memory in bytes
|
||||
mem_used: Gauge,
|
||||
// number of rows in output
|
||||
/// Number of rows in output
|
||||
output_rows: Count,
|
||||
/// Elapsed time used to `poll` the stream
|
||||
poll_elapsed: Time,
|
||||
/// Elapsed time used to `.await`ing the stream
|
||||
await_elapsed: Time,
|
||||
}
|
||||
|
||||
impl MemoryUsageMetrics {
|
||||
impl StreamMetrics {
|
||||
/// Create a new MemoryUsageMetrics structure, and set `start_time` to now
|
||||
pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
|
||||
let start_time = MetricBuilder::new(metrics).start_timestamp(partition);
|
||||
@@ -38,6 +45,8 @@ impl MemoryUsageMetrics {
|
||||
end_time: MetricBuilder::new(metrics).end_timestamp(partition),
|
||||
mem_used: MetricBuilder::new(metrics).mem_used(partition),
|
||||
output_rows: MetricBuilder::new(metrics).output_rows(partition),
|
||||
poll_elapsed: MetricBuilder::new(metrics).subset_time("elapsed_poll", partition),
|
||||
await_elapsed: MetricBuilder::new(metrics).subset_time("elapsed_await", partition),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,9 +64,18 @@ impl MemoryUsageMetrics {
|
||||
self.end_time.record()
|
||||
}
|
||||
}
|
||||
|
||||
/// Return a timer guard that records the time elapsed in poll
|
||||
pub fn poll_timer(&self) -> ScopedTimerGuard {
|
||||
self.poll_elapsed.timer()
|
||||
}
|
||||
|
||||
pub fn record_await_duration(&self, duration: Duration) {
|
||||
self.await_elapsed.add_duration(duration);
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for MemoryUsageMetrics {
|
||||
impl Drop for StreamMetrics {
|
||||
fn drop(&mut self) {
|
||||
self.try_done()
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ use std::any::Any;
|
||||
use std::pin::Pin;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Instant;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream, SendableRecordBatchStream};
|
||||
@@ -34,7 +35,7 @@ use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef;
|
||||
use futures::{Stream, StreamExt};
|
||||
use store_api::region_engine::{PartitionRange, RegionScannerRef};
|
||||
|
||||
use crate::table::metrics::MemoryUsageMetrics;
|
||||
use crate::table::metrics::StreamMetrics;
|
||||
|
||||
/// A plan to read multiple partitions from a region of a table.
|
||||
#[derive(Debug)]
|
||||
@@ -139,11 +140,12 @@ impl ExecutionPlan for RegionScanExec {
|
||||
.unwrap()
|
||||
.scan_partition(partition)
|
||||
.map_err(|e| DataFusionError::External(Box::new(e)))?;
|
||||
let mem_usage_metrics = MemoryUsageMetrics::new(&self.metric, partition);
|
||||
let stream_metrics = StreamMetrics::new(&self.metric, partition);
|
||||
Ok(Box::pin(StreamWithMetricWrapper {
|
||||
stream,
|
||||
metric: mem_usage_metrics,
|
||||
metric: stream_metrics,
|
||||
span,
|
||||
await_timer: None,
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -164,8 +166,9 @@ impl DisplayAs for RegionScanExec {
|
||||
|
||||
pub struct StreamWithMetricWrapper {
|
||||
stream: SendableRecordBatchStream,
|
||||
metric: MemoryUsageMetrics,
|
||||
metric: StreamMetrics,
|
||||
span: Span,
|
||||
await_timer: Option<Instant>,
|
||||
}
|
||||
|
||||
impl Stream for StreamWithMetricWrapper {
|
||||
@@ -174,22 +177,32 @@ impl Stream for StreamWithMetricWrapper {
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = self.get_mut();
|
||||
let _enter = this.span.enter();
|
||||
match this.stream.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(result)) => match result {
|
||||
Ok(record_batch) => {
|
||||
let batch_mem_size = record_batch
|
||||
.columns()
|
||||
.iter()
|
||||
.map(|vec_ref| vec_ref.memory_size())
|
||||
.sum::<usize>();
|
||||
// we don't record elapsed time here
|
||||
// since it's calling storage api involving I/O ops
|
||||
this.metric.record_mem_usage(batch_mem_size);
|
||||
this.metric.record_output(record_batch.num_rows());
|
||||
Poll::Ready(Some(Ok(record_batch.into_df_record_batch())))
|
||||
let poll_timer = this.metric.poll_timer();
|
||||
this.await_timer.get_or_insert(Instant::now());
|
||||
let poll_result = this.stream.poll_next_unpin(cx);
|
||||
drop(poll_timer);
|
||||
match poll_result {
|
||||
Poll::Ready(Some(result)) => {
|
||||
if let Some(instant) = this.await_timer.take() {
|
||||
let elapsed = instant.elapsed();
|
||||
this.metric.record_await_duration(elapsed);
|
||||
}
|
||||
Err(e) => Poll::Ready(Some(Err(DataFusionError::External(Box::new(e))))),
|
||||
},
|
||||
match result {
|
||||
Ok(record_batch) => {
|
||||
let batch_mem_size = record_batch
|
||||
.columns()
|
||||
.iter()
|
||||
.map(|vec_ref| vec_ref.memory_size())
|
||||
.sum::<usize>();
|
||||
// we don't record elapsed time here
|
||||
// since it's calling storage api involving I/O ops
|
||||
this.metric.record_mem_usage(batch_mem_size);
|
||||
this.metric.record_output(record_batch.num_rows());
|
||||
Poll::Ready(Some(Ok(record_batch.into_df_record_batch())))
|
||||
}
|
||||
Err(e) => Poll::Ready(Some(Err(DataFusionError::External(Box::new(e))))),
|
||||
}
|
||||
}
|
||||
Poll::Ready(None) => Poll::Ready(None),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user