From 7bbc87b3c09808cbbebc7c61402b68c750ba46dc Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 13 Feb 2025 23:49:28 -0800 Subject: [PATCH] feat(promql): add series count metrics (#5534) Signed-off-by: Ruihang Xia Co-authored-by: Weny Xu --- src/promql/src/extension_plan.rs | 2 + .../src/extension_plan/instant_manipulate.rs | 37 +++++++++++++++---- src/promql/src/extension_plan/normalize.rs | 37 +++++++++++++++---- .../src/extension_plan/range_manipulate.rs | 35 ++++++++++++++---- .../src/extension_plan/series_divide.rs | 24 +++++++++--- 5 files changed, 105 insertions(+), 30 deletions(-) diff --git a/src/promql/src/extension_plan.rs b/src/promql/src/extension_plan.rs index eba327c1bf..c526676d14 100644 --- a/src/promql/src/extension_plan.rs +++ b/src/promql/src/extension_plan.rs @@ -36,3 +36,5 @@ pub use series_divide::{SeriesDivide, SeriesDivideExec, SeriesDivideStream}; pub use union_distinct_on::{UnionDistinctOn, UnionDistinctOnExec, UnionDistinctOnStream}; pub type Millisecond = ::Native; + +const METRIC_NUM_SERIES: &str = "num_series"; diff --git a/src/promql/src/extension_plan/instant_manipulate.rs b/src/promql/src/extension_plan/instant_manipulate.rs index ec28f9a600..e9f662b1a1 100644 --- a/src/promql/src/extension_plan/instant_manipulate.rs +++ b/src/promql/src/extension_plan/instant_manipulate.rs @@ -26,20 +26,23 @@ use datafusion::common::{ColumnStatistics, DFSchema, DFSchemaRef}; use datafusion::error::{DataFusionError, Result as DataFusionResult}; use datafusion::execution::context::TaskContext; use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore}; -use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::physical_plan::metrics::{ + BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet, +}; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; use datatypes::arrow::compute; use datatypes::arrow::error::Result as ArrowResult; -use futures::{Stream, StreamExt}; +use futures::{ready, Stream, StreamExt}; use greptime_proto::substrait_extension as pb; use prost::Message; use snafu::ResultExt; use crate::error::{DeserializeSnafu, Result}; -use crate::extension_plan::Millisecond; +use crate::extension_plan::{Millisecond, METRIC_NUM_SERIES}; +use crate::metrics::PROMQL_SERIES_COUNT; /// Manipulate the input record batch to make it suitable for Instant Operator. /// @@ -242,6 +245,14 @@ impl ExecutionPlan for InstantManipulateExec { context: Arc, ) -> DataFusionResult { let baseline_metric = BaselineMetrics::new(&self.metric, partition); + let metrics_builder = MetricBuilder::new(&self.metric); + let num_series = Count::new(); + metrics_builder + .with_partition(partition) + .build(MetricValue::Count { + name: METRIC_NUM_SERIES.into(), + count: num_series.clone(), + }); let input = self.input.execute(partition, context)?; let schema = input.schema(); @@ -264,6 +275,7 @@ impl ExecutionPlan for InstantManipulateExec { schema, input, metric: baseline_metric, + num_series, })) } @@ -326,6 +338,8 @@ pub struct InstantManipulateStream { schema: SchemaRef, input: SendableRecordBatchStream, metric: BaselineMetrics, + /// Number of series processed. + num_series: Count, } impl RecordBatchStream for InstantManipulateStream { @@ -338,12 +352,19 @@ impl Stream for InstantManipulateStream { type Item = DataFusionResult; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let poll = match self.input.poll_next_unpin(cx) { - Poll::Ready(batch) => { - let _timer = self.metric.elapsed_compute().timer(); - Poll::Ready(batch.map(|batch| batch.and_then(|batch| self.manipulate(batch)))) + let timer = std::time::Instant::now(); + let poll = match ready!(self.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + self.num_series.add(1); + let result = Ok(batch).and_then(|batch| self.manipulate(batch)); + self.metric.elapsed_compute().add_elapsed(timer); + Poll::Ready(Some(result)) } - Poll::Pending => Poll::Pending, + None => { + PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64); + Poll::Ready(None) + } + Some(Err(e)) => Poll::Ready(Some(Err(e))), }; self.metric.record_poll(poll) } diff --git a/src/promql/src/extension_plan/normalize.rs b/src/promql/src/extension_plan/normalize.rs index 96be166801..c23d5d41eb 100644 --- a/src/promql/src/extension_plan/normalize.rs +++ b/src/promql/src/extension_plan/normalize.rs @@ -23,7 +23,9 @@ use datafusion::common::{DFSchema, DFSchemaRef, Result as DataFusionResult, Stat use datafusion::error::DataFusionError; use datafusion::execution::context::TaskContext; use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore}; -use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::physical_plan::metrics::{ + BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet, +}; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream, SendableRecordBatchStream, @@ -32,13 +34,14 @@ use datatypes::arrow::array::TimestampMillisecondArray; use datatypes::arrow::datatypes::SchemaRef; use datatypes::arrow::error::Result as ArrowResult; use datatypes::arrow::record_batch::RecordBatch; -use futures::{Stream, StreamExt}; +use futures::{ready, Stream, StreamExt}; use greptime_proto::substrait_extension as pb; use prost::Message; use snafu::ResultExt; use crate::error::{DeserializeSnafu, Result}; -use crate::extension_plan::Millisecond; +use crate::extension_plan::{Millisecond, METRIC_NUM_SERIES}; +use crate::metrics::PROMQL_SERIES_COUNT; /// Normalize the input record batch. Notice that for simplicity, this method assumes /// the input batch only contains sample points from one time series. @@ -205,6 +208,14 @@ impl ExecutionPlan for SeriesNormalizeExec { context: Arc, ) -> DataFusionResult { let baseline_metric = BaselineMetrics::new(&self.metric, partition); + let metrics_builder = MetricBuilder::new(&self.metric); + let num_series = Count::new(); + metrics_builder + .with_partition(partition) + .build(MetricValue::Count { + name: METRIC_NUM_SERIES.into(), + count: num_series.clone(), + }); let input = self.input.execute(partition, context)?; let schema = input.schema(); @@ -219,6 +230,7 @@ impl ExecutionPlan for SeriesNormalizeExec { schema, input, metric: baseline_metric, + num_series, })) } @@ -258,6 +270,8 @@ pub struct SeriesNormalizeStream { schema: SchemaRef, input: SendableRecordBatchStream, metric: BaselineMetrics, + /// Number of series processed. + num_series: Count, } impl SeriesNormalizeStream { @@ -324,12 +338,19 @@ impl Stream for SeriesNormalizeStream { type Item = DataFusionResult; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let poll = match self.input.poll_next_unpin(cx) { - Poll::Ready(batch) => { - let _timer = self.metric.elapsed_compute().timer(); - Poll::Ready(batch.map(|batch| batch.and_then(|batch| self.normalize(batch)))) + let timer = std::time::Instant::now(); + let poll = match ready!(self.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + self.num_series.add(1); + let result = Ok(batch).and_then(|batch| self.normalize(batch)); + self.metric.elapsed_compute().add_elapsed(timer); + Poll::Ready(Some(result)) } - Poll::Pending => Poll::Pending, + None => { + PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64); + Poll::Ready(None) + } + Some(Err(e)) => Poll::Ready(Some(Err(e))), }; self.metric.record_poll(poll) } diff --git a/src/promql/src/extension_plan/range_manipulate.rs b/src/promql/src/extension_plan/range_manipulate.rs index c2c4f9b42f..1dd7bf5515 100644 --- a/src/promql/src/extension_plan/range_manipulate.rs +++ b/src/promql/src/extension_plan/range_manipulate.rs @@ -29,19 +29,22 @@ use datafusion::error::{DataFusionError, Result as DataFusionResult}; use datafusion::execution::context::TaskContext; use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion::physical_expr::EquivalenceProperties; -use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::physical_plan::metrics::{ + BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet, +}; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; use datafusion::sql::TableReference; -use futures::{Stream, StreamExt}; +use futures::{ready, Stream, StreamExt}; use greptime_proto::substrait_extension as pb; use prost::Message; use snafu::ResultExt; use crate::error::{DataFusionPlanningSnafu, DeserializeSnafu, Result}; -use crate::extension_plan::Millisecond; +use crate::extension_plan::{Millisecond, METRIC_NUM_SERIES}; +use crate::metrics::PROMQL_SERIES_COUNT; use crate::range_array::RangeArray; /// Time series manipulator for range function. @@ -359,6 +362,14 @@ impl ExecutionPlan for RangeManipulateExec { context: Arc, ) -> DataFusionResult { let baseline_metric = BaselineMetrics::new(&self.metric, partition); + let metrics_builder = MetricBuilder::new(&self.metric); + let num_series = Count::new(); + metrics_builder + .with_partition(partition) + .build(MetricValue::Count { + name: METRIC_NUM_SERIES.into(), + count: num_series.clone(), + }); let input = self.input.execute(partition, context)?; let schema = input.schema(); @@ -389,6 +400,7 @@ impl ExecutionPlan for RangeManipulateExec { output_schema: self.output_schema.clone(), input, metric: baseline_metric, + num_series, })) } @@ -448,6 +460,8 @@ pub struct RangeManipulateStream { output_schema: SchemaRef, input: SendableRecordBatchStream, metric: BaselineMetrics, + /// Number of series processed. + num_series: Count, } impl RecordBatchStream for RangeManipulateStream { @@ -460,19 +474,24 @@ impl Stream for RangeManipulateStream { type Item = DataFusionResult; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let timer = std::time::Instant::now(); let poll = loop { - match self.input.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(batch))) => { - let _timer = self.metric.elapsed_compute().timer(); + match ready!(self.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { let result = self.manipulate(batch); if let Ok(None) = result { continue; } else { + self.num_series.add(1); + self.metric.elapsed_compute().add_elapsed(timer); break Poll::Ready(result.transpose()); } } - Poll::Ready(other) => break Poll::Ready(other), - Poll::Pending => break Poll::Pending, + None => { + PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64); + break Poll::Ready(None); + } + Some(Err(e)) => break Poll::Ready(Some(Err(e))), } }; self.metric.record_poll(poll) diff --git a/src/promql/src/extension_plan/series_divide.rs b/src/promql/src/extension_plan/series_divide.rs index b5fc923c00..e0e0172d4f 100644 --- a/src/promql/src/extension_plan/series_divide.rs +++ b/src/promql/src/extension_plan/series_divide.rs @@ -26,7 +26,9 @@ use datafusion::execution::context::TaskContext; use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion::physical_expr::{LexRequirement, PhysicalSortRequirement}; use datafusion::physical_plan::expressions::Column as ColumnExpr; -use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::physical_plan::metrics::{ + BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet, +}; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream, SendableRecordBatchStream, @@ -38,6 +40,7 @@ use prost::Message; use snafu::ResultExt; use crate::error::{DeserializeSnafu, Result}; +use crate::extension_plan::METRIC_NUM_SERIES; use crate::metrics::PROMQL_SERIES_COUNT; #[derive(Debug, PartialEq, Eq, Hash, PartialOrd)] @@ -190,6 +193,14 @@ impl ExecutionPlan for SeriesDivideExec { context: Arc, ) -> DataFusionResult { let baseline_metric = BaselineMetrics::new(&self.metric, partition); + let metrics_builder = MetricBuilder::new(&self.metric); + let num_series = Count::new(); + metrics_builder + .with_partition(partition) + .build(MetricValue::Count { + name: METRIC_NUM_SERIES.into(), + count: num_series.clone(), + }); let input = self.input.execute(partition, context)?; let schema = input.schema(); @@ -209,7 +220,7 @@ impl ExecutionPlan for SeriesDivideExec { schema, input, metric: baseline_metric, - num_series: 0, + num_series, inspect_start: 0, })) } @@ -240,9 +251,10 @@ pub struct SeriesDivideStream { schema: SchemaRef, input: SendableRecordBatchStream, metric: BaselineMetrics, - num_series: usize, /// Index of buffered batches to start inspect next time. inspect_start: usize, + /// Number of series processed. + num_series: Count, } impl RecordBatchStream for SeriesDivideStream { @@ -279,7 +291,7 @@ impl Stream for SeriesDivideStream { let result_batch = compute::concat_batches(&self.schema, &result_batches)?; self.inspect_start = 0; - self.num_series += 1; + self.num_series.add(1); self.metric.elapsed_compute().add_elapsed(timer); return Poll::Ready(Some(Ok(result_batch))); } else { @@ -293,7 +305,7 @@ impl Stream for SeriesDivideStream { let result = compute::concat_batches(&self.schema, &self.buffer)?; self.buffer.clear(); self.inspect_start = 0; - self.num_series += 1; + self.num_series.add(1); self.metric.elapsed_compute().add_elapsed(timer); return Poll::Ready(Some(Ok(result))); } @@ -302,7 +314,7 @@ impl Stream for SeriesDivideStream { let batch = match ready!(self.as_mut().fetch_next_batch(cx)) { Some(Ok(batch)) => batch, None => { - PROMQL_SERIES_COUNT.observe(self.num_series as f64); + PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64); return Poll::Ready(None); } error => return Poll::Ready(error),