mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 13:52:59 +00:00
refactor: collecting memory usage during scan (#2353)
* chore: try custom metrics * chore: fix header * chore: minor change
This commit is contained in:
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
pub mod adapter;
|
||||
mod metrics;
|
||||
pub mod numbers;
|
||||
pub mod scan;
|
||||
|
||||
|
||||
64
src/table/src/table/metrics.rs
Normal file
64
src/table/src/table/metrics.rs
Normal file
@@ -0,0 +1,64 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use datafusion::physical_plan::metrics::{
|
||||
Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, Timestamp,
|
||||
};
|
||||
|
||||
/// This metrics struct is used to record and hold memory usage
|
||||
/// of result batch in [`crate::table::scan::StreamWithMetricWrapper`]
|
||||
/// during query execution, indicating size of the dataset.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct MemoryUsageMetrics {
|
||||
end_time: Timestamp,
|
||||
// used memory in bytes
|
||||
mem_used: Gauge,
|
||||
// number of rows in output
|
||||
output_rows: Count,
|
||||
}
|
||||
|
||||
impl MemoryUsageMetrics {
|
||||
/// Create a new MemoryUsageMetrics structure, and set `start_time` to now
|
||||
pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
|
||||
let start_time = MetricBuilder::new(metrics).start_timestamp(partition);
|
||||
start_time.record();
|
||||
|
||||
Self {
|
||||
end_time: MetricBuilder::new(metrics).end_timestamp(partition),
|
||||
mem_used: MetricBuilder::new(metrics).mem_used(partition),
|
||||
output_rows: MetricBuilder::new(metrics).output_rows(partition),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn record_mem_usage(&self, mem_used: usize) {
|
||||
self.mem_used.add(mem_used);
|
||||
}
|
||||
|
||||
pub fn record_output(&self, num_rows: usize) {
|
||||
self.output_rows.add(num_rows);
|
||||
}
|
||||
|
||||
/// Record the end time of the query
|
||||
pub fn try_done(&self) {
|
||||
if self.end_time.value().is_none() {
|
||||
self.end_time.record()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for MemoryUsageMetrics {
|
||||
fn drop(&mut self) {
|
||||
self.try_done()
|
||||
}
|
||||
}
|
||||
@@ -24,12 +24,14 @@ use common_query::physical_plan::{Partitioning, PhysicalPlan, PhysicalPlanRef};
|
||||
use common_recordbatch::error::Result as RecordBatchResult;
|
||||
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
|
||||
use datafusion::execution::context::TaskContext;
|
||||
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
|
||||
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
|
||||
use datafusion_physical_expr::PhysicalSortExpr;
|
||||
use datatypes::schema::SchemaRef;
|
||||
use futures::{Stream, StreamExt};
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::table::metrics::MemoryUsageMetrics;
|
||||
|
||||
/// Adapt greptime's [SendableRecordBatchStream] to GreptimeDB's [PhysicalPlan].
|
||||
pub struct StreamScanAdapter {
|
||||
stream: Mutex<Option<SendableRecordBatchStream>>,
|
||||
@@ -97,10 +99,10 @@ impl PhysicalPlan for StreamScanAdapter {
|
||||
) -> QueryResult<SendableRecordBatchStream> {
|
||||
let mut stream = self.stream.lock().unwrap();
|
||||
let stream = stream.take().context(query_error::ExecuteRepeatedlySnafu)?;
|
||||
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
|
||||
let mem_usage_metrics = MemoryUsageMetrics::new(&self.metric, partition);
|
||||
Ok(Box::pin(StreamWithMetricWrapper {
|
||||
stream,
|
||||
metric: baseline_metric,
|
||||
metric: mem_usage_metrics,
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -111,7 +113,7 @@ impl PhysicalPlan for StreamScanAdapter {
|
||||
|
||||
pub struct StreamWithMetricWrapper {
|
||||
stream: SendableRecordBatchStream,
|
||||
metric: BaselineMetrics,
|
||||
metric: MemoryUsageMetrics,
|
||||
}
|
||||
|
||||
impl Stream for StreamWithMetricWrapper {
|
||||
@@ -119,9 +121,16 @@ impl Stream for StreamWithMetricWrapper {
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = self.get_mut();
|
||||
let _timer = this.metric.elapsed_compute().timer();
|
||||
let poll = this.stream.poll_next_unpin(cx);
|
||||
if let Poll::Ready(Option::Some(Result::Ok(record_batch))) = &poll {
|
||||
if let Poll::Ready(Some(Ok(record_batch))) = &poll {
|
||||
let batch_mem_size = record_batch
|
||||
.columns()
|
||||
.iter()
|
||||
.map(|vec_ref| vec_ref.memory_size())
|
||||
.sum::<usize>();
|
||||
// we don't record elapsed time here
|
||||
// since it's calling storage api involving I/O ops
|
||||
this.metric.record_mem_usage(batch_mem_size);
|
||||
this.metric.record_output(record_batch.num_rows());
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user