From 07be50403ec91c4d21c264663526da8838705fa3 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 10 Oct 2023 14:55:25 +0800 Subject: [PATCH] feat: add basic metrics to query (#2559) * add metrics to merge scan Signed-off-by: Ruihang Xia * count series in promql Signed-off-by: Ruihang Xia * tweak label name Signed-off-by: Ruihang Xia * tweak label name Signed-off-by: Ruihang Xia * document metric label Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- Cargo.lock | 1 + src/promql/Cargo.toml | 1 + .../src/extension_plan/series_divide.rs | 10 +++++++++- src/promql/src/lib.rs | 1 + src/promql/src/metrics.rs | 16 +++++++++++++++ src/query/src/dist_plan/merge_scan.rs | 20 ++++++++++++++++++- src/query/src/metrics.rs | 3 +++ 7 files changed, 50 insertions(+), 2 deletions(-) create mode 100644 src/promql/src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index da7ff94614..3c320e4c78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7036,6 +7036,7 @@ dependencies = [ "datatypes", "futures", "greptime-proto", + "metrics", "promql-parser", "prost", "query", diff --git a/src/promql/Cargo.toml b/src/promql/Cargo.toml index 00f55ce296..990197a34c 100644 --- a/src/promql/Cargo.toml +++ b/src/promql/Cargo.toml @@ -17,6 +17,7 @@ datafusion.workspace = true datatypes = { workspace = true } futures = "0.3" greptime-proto.workspace = true +metrics = { workspace = true } promql-parser = "0.1.1" prost.workspace = true session = { workspace = true } diff --git a/src/promql/src/extension_plan/series_divide.rs b/src/promql/src/extension_plan/series_divide.rs index 3487fe5786..fdf2c9cdf9 100644 --- a/src/promql/src/extension_plan/series_divide.rs +++ b/src/promql/src/extension_plan/series_divide.rs @@ -38,6 +38,7 @@ use prost::Message; use snafu::ResultExt; use crate::error::{DeserializeSnafu, Result}; +use crate::metrics::PROMQL_SERIES_COUNT; #[derive(Debug, PartialEq, Eq, Hash)] pub struct SeriesDivide { @@ -204,6 +205,7 @@ impl ExecutionPlan for SeriesDivideExec { schema, input, metric: baseline_metric, + num_series: 0, })) } @@ -239,6 +241,7 @@ pub struct SeriesDivideStream { schema: SchemaRef, input: SendableRecordBatchStream, metric: BaselineMetrics, + num_series: usize, } impl RecordBatchStream for SeriesDivideStream { @@ -259,6 +262,7 @@ impl Stream for SeriesDivideStream { Some(Ok(batch)) => batch, None => { self.buffer = None; + self.num_series += 1; return Poll::Ready(Some(Ok(batch))); } error => return Poll::Ready(error), @@ -271,12 +275,16 @@ impl Stream for SeriesDivideStream { let result_batch = batch.slice(0, same_length); let remaining_batch = batch.slice(same_length, batch.num_rows() - same_length); self.buffer = Some(remaining_batch); + self.num_series += 1; return Poll::Ready(Some(Ok(result_batch))); } } else { let batch = match ready!(self.as_mut().fetch_next_batch(cx)) { Some(Ok(batch)) => batch, - None => return Poll::Ready(None), + None => { + metrics::histogram!(PROMQL_SERIES_COUNT, self.num_series as f64); + return Poll::Ready(None); + } error => return Poll::Ready(error), }; self.buffer = Some(batch); diff --git a/src/promql/src/lib.rs b/src/promql/src/lib.rs index 1f636dcb53..9514a01538 100644 --- a/src/promql/src/lib.rs +++ b/src/promql/src/lib.rs @@ -18,5 +18,6 @@ pub mod error; pub mod extension_plan; pub mod functions; +mod metrics; pub mod planner; pub mod range_array; diff --git a/src/promql/src/metrics.rs b/src/promql/src/metrics.rs new file mode 100644 index 0000000000..b8bebf7a43 --- /dev/null +++ b/src/promql/src/metrics.rs @@ -0,0 +1,16 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Counter for the number of series processed per query. +pub static PROMQL_SERIES_COUNT: &str = "promql.series_count"; diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index ce07b9e5dd..cc06f84d82 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -14,6 +14,7 @@ use std::any::Any; use std::sync::Arc; +use std::time::Duration; use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use async_stream::stream; @@ -39,8 +40,12 @@ use futures_util::StreamExt; use greptime_proto::v1::region::{QueryRequest, RegionRequestHeader}; use snafu::ResultExt; use store_api::storage::RegionId; +use tokio::time::Instant; use crate::error::ConvertSchemaSnafu; +use crate::metrics::{ + METRIC_MERGE_SCAN_ERRORS_TOTAL, METRIC_MERGE_SCAN_POLL_ELAPSED, METRIC_MERGE_SCAN_REGIONS, +}; use crate::region_query::RegionQueryHandlerRef; #[derive(Debug, Hash, PartialEq, Eq, Clone)] @@ -161,6 +166,7 @@ impl MergeScanExec { let trace_id = trace_id().unwrap_or_default(); let stream = Box::pin(stream!({ + metrics::histogram!(METRIC_MERGE_SCAN_REGIONS, regions.len() as f64); let _finish_timer = metric.finish_time().timer(); let mut ready_timer = metric.ready_time().timer(); let mut first_consume_timer = Some(metric.first_consume_time().timer()); @@ -178,12 +184,21 @@ impl MergeScanExec { let mut stream = region_query_handler .do_get(request) .await - .map_err(BoxedError::new) + .map_err(|e| { + metrics::increment_counter!(METRIC_MERGE_SCAN_ERRORS_TOTAL); + BoxedError::new(e) + }) .context(ExternalSnafu)?; ready_timer.stop(); + let mut poll_duration = Duration::new(0, 0); + + let mut poll_timer = Instant::now(); while let Some(batch) = stream.next().await { + let poll_elapsed = poll_timer.elapsed(); + poll_duration += poll_elapsed; + let batch = batch?; // reconstruct batch using `self.schema` // to remove metadata and correct column name @@ -193,7 +208,10 @@ impl MergeScanExec { first_consume_timer.stop(); } yield Ok(batch); + // reset poll timer + poll_timer = Instant::now(); } + metrics::histogram!(METRIC_MERGE_SCAN_POLL_ELAPSED, poll_duration.as_secs_f64()); } })); diff --git a/src/query/src/metrics.rs b/src/query/src/metrics.rs index 489e7fb62a..7efac4e414 100644 --- a/src/query/src/metrics.rs +++ b/src/query/src/metrics.rs @@ -18,3 +18,6 @@ pub static METRIC_OPTIMIZE_LOGICAL_ELAPSED: &str = "query.optimize_logicalplan_e pub static METRIC_OPTIMIZE_PHYSICAL_ELAPSED: &str = "query.optimize_physicalplan_elapsed"; pub static METRIC_CREATE_PHYSICAL_ELAPSED: &str = "query.create_physicalplan_elapsed"; pub static METRIC_EXEC_PLAN_ELAPSED: &str = "query.execute_plan_elapsed"; +pub static METRIC_MERGE_SCAN_POLL_ELAPSED: &str = "query.merge_scan.poll_elapsed"; +pub static METRIC_MERGE_SCAN_REGIONS: &str = "query.merge_scan.regions"; +pub static METRIC_MERGE_SCAN_ERRORS_TOTAL: &str = "query.merge_scan.errors_total";