feat(promql): add series count metrics (#5534)

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Weny Xu <wenymedia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-02-13 23:49:28 -08:00
committed by GitHub
parent 858dae7b23
commit 7bbc87b3c0
5 changed files with 105 additions and 30 deletions

View File

@@ -36,3 +36,5 @@ pub use series_divide::{SeriesDivide, SeriesDivideExec, SeriesDivideStream};
pub use union_distinct_on::{UnionDistinctOn, UnionDistinctOnExec, UnionDistinctOnStream};
pub type Millisecond = <TimestampMillisecondType as ArrowPrimitiveType>::Native;
const METRIC_NUM_SERIES: &str = "num_series";

View File

@@ -26,20 +26,23 @@ use datafusion::common::{ColumnStatistics, DFSchema, DFSchemaRef};
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use datatypes::arrow::compute;
use datatypes::arrow::error::Result as ArrowResult;
use futures::{Stream, StreamExt};
use futures::{ready, Stream, StreamExt};
use greptime_proto::substrait_extension as pb;
use prost::Message;
use snafu::ResultExt;
use crate::error::{DeserializeSnafu, Result};
use crate::extension_plan::Millisecond;
use crate::extension_plan::{Millisecond, METRIC_NUM_SERIES};
use crate::metrics::PROMQL_SERIES_COUNT;
/// Manipulate the input record batch to make it suitable for Instant Operator.
///
@@ -242,6 +245,14 @@ impl ExecutionPlan for InstantManipulateExec {
context: Arc<TaskContext>,
) -> DataFusionResult<SendableRecordBatchStream> {
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
let metrics_builder = MetricBuilder::new(&self.metric);
let num_series = Count::new();
metrics_builder
.with_partition(partition)
.build(MetricValue::Count {
name: METRIC_NUM_SERIES.into(),
count: num_series.clone(),
});
let input = self.input.execute(partition, context)?;
let schema = input.schema();
@@ -264,6 +275,7 @@ impl ExecutionPlan for InstantManipulateExec {
schema,
input,
metric: baseline_metric,
num_series,
}))
}
@@ -326,6 +338,8 @@ pub struct InstantManipulateStream {
schema: SchemaRef,
input: SendableRecordBatchStream,
metric: BaselineMetrics,
/// Number of series processed.
num_series: Count,
}
impl RecordBatchStream for InstantManipulateStream {
@@ -338,12 +352,19 @@ impl Stream for InstantManipulateStream {
type Item = DataFusionResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let poll = match self.input.poll_next_unpin(cx) {
Poll::Ready(batch) => {
let _timer = self.metric.elapsed_compute().timer();
Poll::Ready(batch.map(|batch| batch.and_then(|batch| self.manipulate(batch))))
let timer = std::time::Instant::now();
let poll = match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
self.num_series.add(1);
let result = Ok(batch).and_then(|batch| self.manipulate(batch));
self.metric.elapsed_compute().add_elapsed(timer);
Poll::Ready(Some(result))
}
Poll::Pending => Poll::Pending,
None => {
PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
Poll::Ready(None)
}
Some(Err(e)) => Poll::Ready(Some(Err(e))),
};
self.metric.record_poll(poll)
}

View File

@@ -23,7 +23,9 @@ use datafusion::common::{DFSchema, DFSchemaRef, Result as DataFusionResult, Stat
use datafusion::error::DataFusionError;
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
SendableRecordBatchStream,
@@ -32,13 +34,14 @@ use datatypes::arrow::array::TimestampMillisecondArray;
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow::error::Result as ArrowResult;
use datatypes::arrow::record_batch::RecordBatch;
use futures::{Stream, StreamExt};
use futures::{ready, Stream, StreamExt};
use greptime_proto::substrait_extension as pb;
use prost::Message;
use snafu::ResultExt;
use crate::error::{DeserializeSnafu, Result};
use crate::extension_plan::Millisecond;
use crate::extension_plan::{Millisecond, METRIC_NUM_SERIES};
use crate::metrics::PROMQL_SERIES_COUNT;
/// Normalize the input record batch. Notice that for simplicity, this method assumes
/// the input batch only contains sample points from one time series.
@@ -205,6 +208,14 @@ impl ExecutionPlan for SeriesNormalizeExec {
context: Arc<TaskContext>,
) -> DataFusionResult<SendableRecordBatchStream> {
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
let metrics_builder = MetricBuilder::new(&self.metric);
let num_series = Count::new();
metrics_builder
.with_partition(partition)
.build(MetricValue::Count {
name: METRIC_NUM_SERIES.into(),
count: num_series.clone(),
});
let input = self.input.execute(partition, context)?;
let schema = input.schema();
@@ -219,6 +230,7 @@ impl ExecutionPlan for SeriesNormalizeExec {
schema,
input,
metric: baseline_metric,
num_series,
}))
}
@@ -258,6 +270,8 @@ pub struct SeriesNormalizeStream {
schema: SchemaRef,
input: SendableRecordBatchStream,
metric: BaselineMetrics,
/// Number of series processed.
num_series: Count,
}
impl SeriesNormalizeStream {
@@ -324,12 +338,19 @@ impl Stream for SeriesNormalizeStream {
type Item = DataFusionResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let poll = match self.input.poll_next_unpin(cx) {
Poll::Ready(batch) => {
let _timer = self.metric.elapsed_compute().timer();
Poll::Ready(batch.map(|batch| batch.and_then(|batch| self.normalize(batch))))
let timer = std::time::Instant::now();
let poll = match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
self.num_series.add(1);
let result = Ok(batch).and_then(|batch| self.normalize(batch));
self.metric.elapsed_compute().add_elapsed(timer);
Poll::Ready(Some(result))
}
Poll::Pending => Poll::Pending,
None => {
PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
Poll::Ready(None)
}
Some(Err(e)) => Poll::Ready(Some(Err(e))),
};
self.metric.record_poll(poll)
}

View File

@@ -29,19 +29,22 @@ use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use datafusion::sql::TableReference;
use futures::{Stream, StreamExt};
use futures::{ready, Stream, StreamExt};
use greptime_proto::substrait_extension as pb;
use prost::Message;
use snafu::ResultExt;
use crate::error::{DataFusionPlanningSnafu, DeserializeSnafu, Result};
use crate::extension_plan::Millisecond;
use crate::extension_plan::{Millisecond, METRIC_NUM_SERIES};
use crate::metrics::PROMQL_SERIES_COUNT;
use crate::range_array::RangeArray;
/// Time series manipulator for range function.
@@ -359,6 +362,14 @@ impl ExecutionPlan for RangeManipulateExec {
context: Arc<TaskContext>,
) -> DataFusionResult<SendableRecordBatchStream> {
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
let metrics_builder = MetricBuilder::new(&self.metric);
let num_series = Count::new();
metrics_builder
.with_partition(partition)
.build(MetricValue::Count {
name: METRIC_NUM_SERIES.into(),
count: num_series.clone(),
});
let input = self.input.execute(partition, context)?;
let schema = input.schema();
@@ -389,6 +400,7 @@ impl ExecutionPlan for RangeManipulateExec {
output_schema: self.output_schema.clone(),
input,
metric: baseline_metric,
num_series,
}))
}
@@ -448,6 +460,8 @@ pub struct RangeManipulateStream {
output_schema: SchemaRef,
input: SendableRecordBatchStream,
metric: BaselineMetrics,
/// Number of series processed.
num_series: Count,
}
impl RecordBatchStream for RangeManipulateStream {
@@ -460,19 +474,24 @@ impl Stream for RangeManipulateStream {
type Item = DataFusionResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let timer = std::time::Instant::now();
let poll = loop {
match self.input.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(batch))) => {
let _timer = self.metric.elapsed_compute().timer();
match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
let result = self.manipulate(batch);
if let Ok(None) = result {
continue;
} else {
self.num_series.add(1);
self.metric.elapsed_compute().add_elapsed(timer);
break Poll::Ready(result.transpose());
}
}
Poll::Ready(other) => break Poll::Ready(other),
Poll::Pending => break Poll::Pending,
None => {
PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
break Poll::Ready(None);
}
Some(Err(e)) => break Poll::Ready(Some(Err(e))),
}
};
self.metric.record_poll(poll)

View File

@@ -26,7 +26,9 @@ use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_expr::{LexRequirement, PhysicalSortRequirement};
use datafusion::physical_plan::expressions::Column as ColumnExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
SendableRecordBatchStream,
@@ -38,6 +40,7 @@ use prost::Message;
use snafu::ResultExt;
use crate::error::{DeserializeSnafu, Result};
use crate::extension_plan::METRIC_NUM_SERIES;
use crate::metrics::PROMQL_SERIES_COUNT;
#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
@@ -190,6 +193,14 @@ impl ExecutionPlan for SeriesDivideExec {
context: Arc<TaskContext>,
) -> DataFusionResult<SendableRecordBatchStream> {
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
let metrics_builder = MetricBuilder::new(&self.metric);
let num_series = Count::new();
metrics_builder
.with_partition(partition)
.build(MetricValue::Count {
name: METRIC_NUM_SERIES.into(),
count: num_series.clone(),
});
let input = self.input.execute(partition, context)?;
let schema = input.schema();
@@ -209,7 +220,7 @@ impl ExecutionPlan for SeriesDivideExec {
schema,
input,
metric: baseline_metric,
num_series: 0,
num_series,
inspect_start: 0,
}))
}
@@ -240,9 +251,10 @@ pub struct SeriesDivideStream {
schema: SchemaRef,
input: SendableRecordBatchStream,
metric: BaselineMetrics,
num_series: usize,
/// Index of buffered batches to start inspect next time.
inspect_start: usize,
/// Number of series processed.
num_series: Count,
}
impl RecordBatchStream for SeriesDivideStream {
@@ -279,7 +291,7 @@ impl Stream for SeriesDivideStream {
let result_batch = compute::concat_batches(&self.schema, &result_batches)?;
self.inspect_start = 0;
self.num_series += 1;
self.num_series.add(1);
self.metric.elapsed_compute().add_elapsed(timer);
return Poll::Ready(Some(Ok(result_batch)));
} else {
@@ -293,7 +305,7 @@ impl Stream for SeriesDivideStream {
let result = compute::concat_batches(&self.schema, &self.buffer)?;
self.buffer.clear();
self.inspect_start = 0;
self.num_series += 1;
self.num_series.add(1);
self.metric.elapsed_compute().add_elapsed(timer);
return Poll::Ready(Some(Ok(result)));
}
@@ -302,7 +314,7 @@ impl Stream for SeriesDivideStream {
let batch = match ready!(self.as_mut().fetch_next_batch(cx)) {
Some(Ok(batch)) => batch,
None => {
PROMQL_SERIES_COUNT.observe(self.num_series as f64);
PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
return Poll::Ready(None);
}
error => return Poll::Ready(error),