feat: add basic metrics to query (#2559)

* add metrics to merge scan

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* count series in promql

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* tweak label name

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* tweak label name

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* document metric label

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-10-10 14:55:25 +08:00
committed by GitHub
parent 8bdef9a348
commit 07be50403e
7 changed files with 50 additions and 2 deletions

1
Cargo.lock generated
View File

@@ -7036,6 +7036,7 @@ dependencies = [
"datatypes",
"futures",
"greptime-proto",
"metrics",
"promql-parser",
"prost",
"query",

View File

@@ -17,6 +17,7 @@ datafusion.workspace = true
datatypes = { workspace = true }
futures = "0.3"
greptime-proto.workspace = true
metrics = { workspace = true }
promql-parser = "0.1.1"
prost.workspace = true
session = { workspace = true }

View File

@@ -38,6 +38,7 @@ use prost::Message;
use snafu::ResultExt;
use crate::error::{DeserializeSnafu, Result};
use crate::metrics::PROMQL_SERIES_COUNT;
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct SeriesDivide {
@@ -204,6 +205,7 @@ impl ExecutionPlan for SeriesDivideExec {
schema,
input,
metric: baseline_metric,
num_series: 0,
}))
}
@@ -239,6 +241,7 @@ pub struct SeriesDivideStream {
schema: SchemaRef,
input: SendableRecordBatchStream,
metric: BaselineMetrics,
num_series: usize,
}
impl RecordBatchStream for SeriesDivideStream {
@@ -259,6 +262,7 @@ impl Stream for SeriesDivideStream {
Some(Ok(batch)) => batch,
None => {
self.buffer = None;
self.num_series += 1;
return Poll::Ready(Some(Ok(batch)));
}
error => return Poll::Ready(error),
@@ -271,12 +275,16 @@ impl Stream for SeriesDivideStream {
let result_batch = batch.slice(0, same_length);
let remaining_batch = batch.slice(same_length, batch.num_rows() - same_length);
self.buffer = Some(remaining_batch);
self.num_series += 1;
return Poll::Ready(Some(Ok(result_batch)));
}
} else {
let batch = match ready!(self.as_mut().fetch_next_batch(cx)) {
Some(Ok(batch)) => batch,
None => return Poll::Ready(None),
None => {
metrics::histogram!(PROMQL_SERIES_COUNT, self.num_series as f64);
return Poll::Ready(None);
}
error => return Poll::Ready(error),
};
self.buffer = Some(batch);

View File

@@ -18,5 +18,6 @@
pub mod error;
pub mod extension_plan;
pub mod functions;
mod metrics;
pub mod planner;
pub mod range_array;

16
src/promql/src/metrics.rs Normal file
View File

@@ -0,0 +1,16 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Counter for the number of series processed per query.
pub static PROMQL_SERIES_COUNT: &str = "promql.series_count";

View File

@@ -14,6 +14,7 @@
use std::any::Any;
use std::sync::Arc;
use std::time::Duration;
use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use async_stream::stream;
@@ -39,8 +40,12 @@ use futures_util::StreamExt;
use greptime_proto::v1::region::{QueryRequest, RegionRequestHeader};
use snafu::ResultExt;
use store_api::storage::RegionId;
use tokio::time::Instant;
use crate::error::ConvertSchemaSnafu;
use crate::metrics::{
METRIC_MERGE_SCAN_ERRORS_TOTAL, METRIC_MERGE_SCAN_POLL_ELAPSED, METRIC_MERGE_SCAN_REGIONS,
};
use crate::region_query::RegionQueryHandlerRef;
#[derive(Debug, Hash, PartialEq, Eq, Clone)]
@@ -161,6 +166,7 @@ impl MergeScanExec {
let trace_id = trace_id().unwrap_or_default();
let stream = Box::pin(stream!({
metrics::histogram!(METRIC_MERGE_SCAN_REGIONS, regions.len() as f64);
let _finish_timer = metric.finish_time().timer();
let mut ready_timer = metric.ready_time().timer();
let mut first_consume_timer = Some(metric.first_consume_time().timer());
@@ -178,12 +184,21 @@ impl MergeScanExec {
let mut stream = region_query_handler
.do_get(request)
.await
.map_err(BoxedError::new)
.map_err(|e| {
metrics::increment_counter!(METRIC_MERGE_SCAN_ERRORS_TOTAL);
BoxedError::new(e)
})
.context(ExternalSnafu)?;
ready_timer.stop();
let mut poll_duration = Duration::new(0, 0);
let mut poll_timer = Instant::now();
while let Some(batch) = stream.next().await {
let poll_elapsed = poll_timer.elapsed();
poll_duration += poll_elapsed;
let batch = batch?;
// reconstruct batch using `self.schema`
// to remove metadata and correct column name
@@ -193,7 +208,10 @@ impl MergeScanExec {
first_consume_timer.stop();
}
yield Ok(batch);
// reset poll timer
poll_timer = Instant::now();
}
metrics::histogram!(METRIC_MERGE_SCAN_POLL_ELAPSED, poll_duration.as_secs_f64());
}
}));

View File

@@ -18,3 +18,6 @@ pub static METRIC_OPTIMIZE_LOGICAL_ELAPSED: &str = "query.optimize_logicalplan_e
pub static METRIC_OPTIMIZE_PHYSICAL_ELAPSED: &str = "query.optimize_physicalplan_elapsed";
pub static METRIC_CREATE_PHYSICAL_ELAPSED: &str = "query.create_physicalplan_elapsed";
pub static METRIC_EXEC_PLAN_ELAPSED: &str = "query.execute_plan_elapsed";
pub static METRIC_MERGE_SCAN_POLL_ELAPSED: &str = "query.merge_scan.poll_elapsed";
pub static METRIC_MERGE_SCAN_REGIONS: &str = "query.merge_scan.regions";
pub static METRIC_MERGE_SCAN_ERRORS_TOTAL: &str = "query.merge_scan.errors_total";