mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-16 18:22:55 +00:00
chore: add some metrics for grpc client (#1398)
* chore: add some metrics for grpc client * chore: add grpc preix and change metrics-exporter-ptometheus to add global prefix --------- Co-authored-by: paomian <qtang@greptime.com>
This commit is contained in:
5
Cargo.lock
generated
5
Cargo.lock
generated
@@ -4797,9 +4797,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "metrics-exporter-prometheus"
|
||||
version = "0.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8603921e1f54ef386189335f288441af761e0fc61bcb552168d9cedfe63ebc70"
|
||||
version = "0.11.1"
|
||||
source = "git+https://github.com/GreptimeTeam/metrics.git?rev=174de287e9f7f9f57c0272be56c95df156489476#174de287e9f7f9f57c0272be56c95df156489476"
|
||||
dependencies = [
|
||||
"indexmap",
|
||||
"metrics",
|
||||
|
||||
@@ -82,6 +82,7 @@ tokio = { version = "1.24.2", features = ["full"] }
|
||||
tokio-util = { version = "0.7", features = ["io-util"] }
|
||||
tonic = { version = "0.9", features = ["tls"] }
|
||||
uuid = { version = "1", features = ["serde", "v4", "fast-rng"] }
|
||||
metrics = "0.20"
|
||||
|
||||
[profile.release]
|
||||
debug = true
|
||||
|
||||
@@ -25,7 +25,7 @@ use arrow_flight::{FlightData, Ticket};
|
||||
use common_error::prelude::*;
|
||||
use common_grpc::flight::{flight_messages_to_recordbatches, FlightDecoder, FlightMessage};
|
||||
use common_query::Output;
|
||||
use common_telemetry::logging;
|
||||
use common_telemetry::{logging, timer};
|
||||
use futures_util::{TryFutureExt, TryStreamExt};
|
||||
use prost::Message;
|
||||
use snafu::{ensure, ResultExt};
|
||||
@@ -33,7 +33,7 @@ use snafu::{ensure, ResultExt};
|
||||
use crate::error::{
|
||||
ConvertFlightDataSnafu, IllegalDatabaseResponseSnafu, IllegalFlightMessagesSnafu,
|
||||
};
|
||||
use crate::{error, Client, Result};
|
||||
use crate::{error, metrics, Client, Result};
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct Database {
|
||||
@@ -107,6 +107,7 @@ impl Database {
|
||||
}
|
||||
|
||||
pub async fn insert(&self, request: InsertRequest) -> Result<u32> {
|
||||
let _timer = timer!(metrics::METRIC_GRPC_INSERT);
|
||||
let mut client = self.client.make_database_client()?.inner;
|
||||
let request = GreptimeRequest {
|
||||
header: Some(RequestHeader {
|
||||
@@ -130,6 +131,7 @@ impl Database {
|
||||
}
|
||||
|
||||
pub async fn sql(&self, sql: &str) -> Result<Output> {
|
||||
let _timer = timer!(metrics::METRIC_GRPC_SQL);
|
||||
self.do_get(Request::Query(QueryRequest {
|
||||
query: Some(Query::Sql(sql.to_string())),
|
||||
}))
|
||||
@@ -137,6 +139,7 @@ impl Database {
|
||||
}
|
||||
|
||||
pub async fn logical_plan(&self, logical_plan: Vec<u8>) -> Result<Output> {
|
||||
let _timer = timer!(metrics::METRIC_GRPC_LOGICAL_PLAN);
|
||||
self.do_get(Request::Query(QueryRequest {
|
||||
query: Some(Query::LogicalPlan(logical_plan)),
|
||||
}))
|
||||
@@ -150,6 +153,7 @@ impl Database {
|
||||
end: &str,
|
||||
step: &str,
|
||||
) -> Result<Output> {
|
||||
let _timer = timer!(metrics::METRIC_GRPC_PROMQL_RANGE_QUERY);
|
||||
self.do_get(Request::Query(QueryRequest {
|
||||
query: Some(Query::PromRangeQuery(PromRangeQuery {
|
||||
query: promql.to_string(),
|
||||
@@ -162,6 +166,7 @@ impl Database {
|
||||
}
|
||||
|
||||
pub async fn create(&self, expr: CreateTableExpr) -> Result<Output> {
|
||||
let _timer = timer!(metrics::METRIC_GRPC_CREATE_TABLE);
|
||||
self.do_get(Request::Ddl(DdlRequest {
|
||||
expr: Some(DdlExpr::CreateTable(expr)),
|
||||
}))
|
||||
@@ -169,6 +174,7 @@ impl Database {
|
||||
}
|
||||
|
||||
pub async fn alter(&self, expr: AlterExpr) -> Result<Output> {
|
||||
let _timer = timer!(metrics::METRIC_GRPC_ALTER);
|
||||
self.do_get(Request::Ddl(DdlRequest {
|
||||
expr: Some(DdlExpr::Alter(expr)),
|
||||
}))
|
||||
@@ -176,6 +182,7 @@ impl Database {
|
||||
}
|
||||
|
||||
pub async fn drop_table(&self, expr: DropTableExpr) -> Result<Output> {
|
||||
let _timer = timer!(metrics::METRIC_GRPC_DROP_TABLE);
|
||||
self.do_get(Request::Ddl(DdlRequest {
|
||||
expr: Some(DdlExpr::DropTable(expr)),
|
||||
}))
|
||||
@@ -183,6 +190,7 @@ impl Database {
|
||||
}
|
||||
|
||||
pub async fn flush_table(&self, expr: FlushTableExpr) -> Result<Output> {
|
||||
let _timer = timer!(metrics::METRIC_GRPC_FLUSH_TABLE);
|
||||
self.do_get(Request::Ddl(DdlRequest {
|
||||
expr: Some(DdlExpr::FlushTable(expr)),
|
||||
}))
|
||||
@@ -190,6 +198,8 @@ impl Database {
|
||||
}
|
||||
|
||||
async fn do_get(&self, request: Request) -> Result<Output> {
|
||||
// FIXME(paomian): should be added some labels for metrics
|
||||
let _timer = timer!(metrics::METRIC_GRPC_DO_GET);
|
||||
let request = GreptimeRequest {
|
||||
header: Some(RequestHeader {
|
||||
catalog: self.catalog.clone(),
|
||||
|
||||
@@ -16,6 +16,7 @@ mod client;
|
||||
mod database;
|
||||
mod error;
|
||||
pub mod load_balance;
|
||||
mod metrics;
|
||||
|
||||
pub use api;
|
||||
pub use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
|
||||
24
src/client/src/metrics.rs
Normal file
24
src/client/src/metrics.rs
Normal file
@@ -0,0 +1,24 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! client metrics
|
||||
pub const METRIC_GRPC_CREATE_TABLE: &str = "grpc.create_table";
|
||||
pub const METRIC_GRPC_PROMQL_RANGE_QUERY: &str = "grpc.promql.range_query";
|
||||
pub const METRIC_GRPC_INSERT: &str = "grpc.insert";
|
||||
pub const METRIC_GRPC_SQL: &str = "grpc.sql";
|
||||
pub const METRIC_GRPC_LOGICAL_PLAN: &str = "grpc.logical_plan";
|
||||
pub const METRIC_GRPC_ALTER: &str = "grpc.alter";
|
||||
pub const METRIC_GRPC_DROP_TABLE: &str = "grpc.drop_table";
|
||||
pub const METRIC_GRPC_FLUSH_TABLE: &str = "grpc.flush_table";
|
||||
pub const METRIC_GRPC_DO_GET: &str = "grpc.do_get";
|
||||
@@ -8,7 +8,7 @@ license.workspace = true
|
||||
async-trait.workspace = true
|
||||
common-error = { path = "../error" }
|
||||
common-telemetry = { path = "../telemetry" }
|
||||
metrics = "0.20"
|
||||
metrics.workspace = true
|
||||
once_cell = "1.12"
|
||||
paste.workspace = true
|
||||
snafu.workspace = true
|
||||
|
||||
@@ -12,8 +12,8 @@ deadlock_detection = ["parking_lot"]
|
||||
backtrace = "0.3"
|
||||
common-error = { path = "../error" }
|
||||
console-subscriber = { version = "0.1", optional = true }
|
||||
metrics = "0.20.1"
|
||||
metrics-exporter-prometheus = { version = "0.11", default-features = false }
|
||||
metrics-exporter-prometheus = { git = "https://github.com/GreptimeTeam/metrics.git", rev = "174de287e9f7f9f57c0272be56c95df156489476", default-features = false }
|
||||
metrics.workspace = true
|
||||
once_cell = "1.10"
|
||||
opentelemetry = { version = "0.17", default-features = false, features = [
|
||||
"trace",
|
||||
|
||||
@@ -32,7 +32,9 @@ pub fn init_default_metrics_recorder() {
|
||||
|
||||
/// Init prometheus recorder.
|
||||
fn init_prometheus_recorder() {
|
||||
let recorder = PrometheusBuilder::new().build_recorder();
|
||||
let recorder = PrometheusBuilder::new()
|
||||
.add_global_prefix("greptime".to_string())
|
||||
.build_recorder();
|
||||
let mut h = PROMETHEUS_HANDLE.as_ref().write().unwrap();
|
||||
*h = Some(recorder.handle());
|
||||
// TODO(LFC): separate metrics for testing and metrics for production
|
||||
@@ -60,6 +62,7 @@ pub fn try_handle() -> Option<PrometheusHandle> {
|
||||
pub struct Timer {
|
||||
start: Instant,
|
||||
name: &'static str,
|
||||
labels: Vec<(String, String)>,
|
||||
}
|
||||
|
||||
impl Timer {
|
||||
@@ -67,9 +70,25 @@ impl Timer {
|
||||
Self {
|
||||
start: Instant::now(),
|
||||
name,
|
||||
labels: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_with_labels<S: Into<String> + Clone>(name: &'static str, labels: &[(S, S)]) -> Self {
|
||||
Self {
|
||||
start: Instant::now(),
|
||||
name,
|
||||
labels: labels
|
||||
.iter()
|
||||
.map(|(k, v)| (k.clone().into(), v.clone().into()))
|
||||
.collect::<Vec<_>>(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn labels_mut(&mut self) -> &mut Vec<(String, String)> {
|
||||
self.labels.as_mut()
|
||||
}
|
||||
|
||||
pub fn elapsed(&self) -> Duration {
|
||||
self.start.elapsed()
|
||||
}
|
||||
@@ -77,7 +96,11 @@ impl Timer {
|
||||
|
||||
impl Drop for Timer {
|
||||
fn drop(&mut self) {
|
||||
histogram!(self.name, self.start.elapsed());
|
||||
if !self.labels.is_empty() {
|
||||
histogram!(self.name, self.start.elapsed(), &self.labels);
|
||||
} else {
|
||||
histogram!(self.name, self.start.elapsed());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -86,6 +109,9 @@ macro_rules! timer {
|
||||
($name: expr) => {
|
||||
$crate::metric::Timer::new($name)
|
||||
};
|
||||
($name:expr,$labels:expr) => {
|
||||
$crate::metric::Timer::new_with_labels($name, $labels)
|
||||
};
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -109,4 +135,48 @@ mod tests {
|
||||
assert!(text.contains("test_elapsed_timer_a"));
|
||||
assert!(text.contains("test_elapsed_timer_b"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_elapsed_timer_with_label() {
|
||||
init_default_metrics_recorder();
|
||||
{
|
||||
let t = Timer::new("test_elapsed_timer_a");
|
||||
drop(t);
|
||||
}
|
||||
let handle = try_handle().unwrap();
|
||||
let text = handle.render();
|
||||
assert!(text.contains("test_elapsed_timer_a"));
|
||||
assert!(!text.contains("test_elapsed_timer_b"));
|
||||
let label_a = "label_a";
|
||||
let label_b = "label_b";
|
||||
let label_c = "label_c";
|
||||
let label_d = "label_d";
|
||||
let label_e = "label_e";
|
||||
assert!(!text.contains(label_a));
|
||||
assert!(!text.contains(label_b));
|
||||
|
||||
{
|
||||
let mut timer_b = timer!("test_elapsed_timer_b", &[(label_a, "a"), (label_b, "b")]);
|
||||
let labels = timer_b.labels_mut();
|
||||
labels.push((label_c.to_owned(), "d".to_owned()));
|
||||
}
|
||||
let text = handle.render();
|
||||
assert!(text.contains("test_elapsed_timer_a"));
|
||||
assert!(text.contains("test_elapsed_timer_b"));
|
||||
assert!(text.contains(label_a));
|
||||
assert!(text.contains(label_b));
|
||||
assert!(text.contains(label_c));
|
||||
|
||||
{
|
||||
let mut timer_c = timer!("test_elapsed_timer_c");
|
||||
let labels = timer_c.labels_mut();
|
||||
labels.push((label_d.to_owned(), "d".to_owned()));
|
||||
labels.push((label_e.to_owned(), "e".to_owned()));
|
||||
}
|
||||
|
||||
let text = handle.render();
|
||||
assert!(text.contains("test_elapsed_timer_c"));
|
||||
assert!(text.contains(label_d));
|
||||
assert!(text.contains(label_e));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,7 +38,7 @@ log = "0.4"
|
||||
log-store = { path = "../log-store" }
|
||||
meta-client = { path = "../meta-client" }
|
||||
meta-srv = { path = "../meta-srv", features = ["mock"] }
|
||||
metrics = "0.20"
|
||||
metrics.workspace = true
|
||||
mito = { path = "../mito", features = ["test"] }
|
||||
object-store = { path = "../object-store" }
|
||||
pin-project = "1.0"
|
||||
|
||||
@@ -27,7 +27,7 @@ datafusion-sql.workspace = true
|
||||
datatypes = { path = "../datatypes" }
|
||||
futures = "0.3"
|
||||
futures-util.workspace = true
|
||||
metrics = "0.20"
|
||||
metrics.workspace = true
|
||||
once_cell = "1.10"
|
||||
promql = { path = "../promql" }
|
||||
promql-parser = "0.1.0"
|
||||
|
||||
@@ -39,7 +39,7 @@ http-body = "0.4"
|
||||
humantime-serde = "1.1"
|
||||
hyper = { version = "0.14", features = ["full"] }
|
||||
influxdb_line_protocol = { git = "https://github.com/evenyag/influxdb_iox", branch = "feat/line-protocol" }
|
||||
metrics = "0.20"
|
||||
metrics.workspace = true
|
||||
mime_guess = "2.0"
|
||||
num_cpus = "1.13"
|
||||
once_cell = "1.16"
|
||||
|
||||
Reference in New Issue
Block a user