From a7557b70f1593fa3edeb4c69bd0816b0e7046bcd Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Fri, 21 Jul 2023 10:02:43 +0800 Subject: [PATCH] feat: Add more tags for OTLP metrics protocol (#2003) * test: add integration tests for otlp * feat: add resource and scope attributes as tag --- Cargo.lock | 1 + src/servers/src/otlp.rs | 152 +++++++++++++++++++++++++--------- tests-integration/Cargo.toml | 1 + tests-integration/src/lib.rs | 1 + tests-integration/src/otlp.rs | 138 ++++++++++++++++++++++++++++++ 5 files changed, 252 insertions(+), 41 deletions(-) create mode 100644 tests-integration/src/otlp.rs diff --git a/Cargo.lock b/Cargo.lock index 436208ba1d..041a89e487 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9842,6 +9842,7 @@ dependencies = [ "mito", "object-store", "once_cell", + "opentelemetry-proto", "partition", "paste", "prost", diff --git a/src/servers/src/otlp.rs b/src/servers/src/otlp.rs index e65f33f5a3..b8ac49d007 100644 --- a/src/servers/src/otlp.rs +++ b/src/servers/src/otlp.rs @@ -44,18 +44,21 @@ fn normalize_otlp_name(name: &str) -> String { pub fn to_grpc_insert_requests( request: ExportMetricsServiceRequest, ) -> Result<(InsertRequests, usize)> { - let metrics = request - .resource_metrics - .iter() - .flat_map(|resource_metrics| &resource_metrics.scope_metrics) - .flat_map(|scope_metrics| &scope_metrics.metrics); - - let mut insert_batch = Vec::with_capacity(metrics.size_hint().0); + let mut insert_batch = Vec::new(); let mut rows = 0; - for metric in metrics { - if let Some(insert) = encode_metrics(metric)? { - rows += insert.row_count; - insert_batch.push(insert); + + for resource in request.resource_metrics { + let resource_attrs = resource.resource.map(|r| r.attributes); + for scope in resource.scope_metrics { + let scope_attrs = scope.scope.map(|s| s.attributes); + for metric in scope.metrics { + if let Some(insert) = + encode_metrics(&metric, resource_attrs.as_ref(), scope_attrs.as_ref())? + { + rows += insert.row_count; + insert_batch.push(insert); + } + } } } @@ -66,15 +69,23 @@ pub fn to_grpc_insert_requests( Ok((inserts, rows as usize)) } -fn encode_metrics(metric: &Metric) -> Result> { +fn encode_metrics( + metric: &Metric, + resource_attrs: Option<&Vec>, + scope_attrs: Option<&Vec>, +) -> Result> { let name = &metric.name; // note that we don't store description or unit, we might want to deal with // these fields in the future. if let Some(data) = &metric.data { match data { - metric::Data::Gauge(gauge) => encode_gauge(name, gauge).map(Some), - metric::Data::Sum(sum) => encode_sum(name, sum).map(Some), - metric::Data::Summary(summary) => encode_summary(name, summary).map(Some), + metric::Data::Gauge(gauge) => { + encode_gauge(name, gauge, resource_attrs, scope_attrs).map(Some) + } + metric::Data::Sum(sum) => encode_sum(name, sum, resource_attrs, scope_attrs).map(Some), + metric::Data::Summary(summary) => { + encode_summary(name, summary, resource_attrs, scope_attrs).map(Some) + } // TODO(sunng87) leave histogram for next release metric::Data::Histogram(_hist) => Ok(None), metric::Data::ExponentialHistogram(_hist) => Ok(None), @@ -84,6 +95,15 @@ fn encode_metrics(metric: &Metric) -> Result> { } } +fn write_attributes(lines: &mut LinesWriter, attrs: Option<&Vec>) -> Result<()> { + if let Some(attrs) = attrs { + for attr in attrs { + write_attribute(lines, attr)?; + } + } + Ok(()) +} + fn write_attribute(lines: &mut LinesWriter, attr: &KeyValue) -> Result<()> { if let Some(val) = attr.value.as_ref().and_then(|v| v.value.as_ref()) { match val { @@ -136,16 +156,18 @@ fn write_data_point_value( /// /// note that there can be multiple data points in the request, it's going to be /// stored as multiple rows -fn encode_gauge(name: &str, gauge: &Gauge) -> Result { +fn encode_gauge( + name: &str, + gauge: &Gauge, + resource_attrs: Option<&Vec>, + scope_attrs: Option<&Vec>, +) -> Result { let mut lines = LinesWriter::with_lines(gauge.data_points.len()); - for data_point in &gauge.data_points { - for attr in &data_point.attributes { - write_attribute(&mut lines, attr)?; - } - + write_attributes(&mut lines, resource_attrs)?; + write_attributes(&mut lines, scope_attrs)?; + write_attributes(&mut lines, Some(data_point.attributes.as_ref()))?; write_timestamp(&mut lines, data_point.time_unix_nano as i64)?; - write_data_point_value(&mut lines, GREPTIME_VALUE, &data_point.value)?; lines.commit(); @@ -163,13 +185,18 @@ fn encode_gauge(name: &str, gauge: &Gauge) -> Result { /// encode this sum metric /// /// `aggregation_temporality` and `monotonic` are ignored for now -fn encode_sum(name: &str, sum: &Sum) -> Result { +fn encode_sum( + name: &str, + sum: &Sum, + resource_attrs: Option<&Vec>, + scope_attrs: Option<&Vec>, +) -> Result { let mut lines = LinesWriter::with_lines(sum.data_points.len()); for data_point in &sum.data_points { - for attr in &data_point.attributes { - write_attribute(&mut lines, attr)?; - } + write_attributes(&mut lines, resource_attrs)?; + write_attributes(&mut lines, scope_attrs)?; + write_attributes(&mut lines, Some(data_point.attributes.as_ref()))?; write_timestamp(&mut lines, data_point.time_unix_nano as i64)?; @@ -289,13 +316,18 @@ fn encode_exponential_histogram(name: &str, hist: &ExponentialHistogram) -> Resu }) } -fn encode_summary(name: &str, summary: &Summary) -> Result { +fn encode_summary( + name: &str, + summary: &Summary, + resource_attrs: Option<&Vec>, + scope_attrs: Option<&Vec>, +) -> Result { let mut lines = LinesWriter::with_lines(summary.data_points.len()); for data_point in &summary.data_points { - for attr in &data_point.attributes { - write_attribute(&mut lines, attr)?; - } + write_attributes(&mut lines, resource_attrs)?; + write_attributes(&mut lines, scope_attrs)?; + write_attributes(&mut lines, Some(data_point.attributes.as_ref()))?; write_timestamp(&mut lines, data_point.time_unix_nano as i64)?; @@ -303,14 +335,14 @@ fn encode_summary(name: &str, summary: &Summary) -> Result { // here we don't store bucket boundary lines .write_f64( - &format!("p{:02}", quantile.quantile * 100f64), + &format!("greptime_p{:02}", quantile.quantile * 100f64), quantile.value, ) .context(error::OtlpMetricsWriteSnafu)?; } lines - .write_u64("count", data_point.count) + .write_u64("greptime_count", data_point.count) .context(error::OtlpMetricsWriteSnafu)?; lines.commit(); @@ -370,18 +402,30 @@ mod tests { }, ]; let gauge = Gauge { data_points }; - let inserts = encode_gauge("datamon", &gauge).unwrap(); + let inserts = encode_gauge( + "datamon", + &gauge, + Some(&vec![keyvalue("resource", "app")]), + Some(&vec![keyvalue("scope", "otel")]), + ) + .unwrap(); assert_eq!(inserts.table_name, "datamon"); assert_eq!(inserts.row_count, 2); - assert_eq!(inserts.columns.len(), 3); + assert_eq!(inserts.columns.len(), 5); assert_eq!( inserts .columns .iter() .map(|c| &c.column_name) .collect::>(), - vec!["host", "greptime_timestamp", "greptime_value"] + vec![ + "resource", + "scope", + "host", + "greptime_timestamp", + "greptime_value" + ] ); } @@ -405,18 +449,30 @@ mod tests { data_points, ..Default::default() }; - let inserts = encode_sum("datamon", &sum).unwrap(); + let inserts = encode_sum( + "datamon", + &sum, + Some(&vec![keyvalue("resource", "app")]), + Some(&vec![keyvalue("scope", "otel")]), + ) + .unwrap(); assert_eq!(inserts.table_name, "datamon"); assert_eq!(inserts.row_count, 2); - assert_eq!(inserts.columns.len(), 3); + assert_eq!(inserts.columns.len(), 5); assert_eq!( inserts .columns .iter() .map(|c| &c.column_name) .collect::>(), - vec!["host", "greptime_timestamp", "greptime_value"] + vec![ + "resource", + "scope", + "host", + "greptime_timestamp", + "greptime_value" + ] ); } @@ -440,18 +496,32 @@ mod tests { ..Default::default() }]; let summary = Summary { data_points }; - let inserts = encode_summary("datamon", &summary).unwrap(); + let inserts = encode_summary( + "datamon", + &summary, + Some(&vec![keyvalue("resource", "app")]), + Some(&vec![keyvalue("scope", "otel")]), + ) + .unwrap(); assert_eq!(inserts.table_name, "datamon"); assert_eq!(inserts.row_count, 1); - assert_eq!(inserts.columns.len(), 5); + assert_eq!(inserts.columns.len(), 7); assert_eq!( inserts .columns .iter() .map(|c| &c.column_name) .collect::>(), - vec!["host", "greptime_timestamp", "p90", "p95", "count"] + vec![ + "resource", + "scope", + "host", + "greptime_timestamp", + "greptime_p90", + "greptime_p95", + "greptime_count" + ] ); } } diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index e4cc07dde3..9eb044a094 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -69,3 +69,4 @@ paste.workspace = true prost.workspace = true script = { path = "../src/script" } store-api = { path = "../src/store-api" } +opentelemetry-proto.workspace = true diff --git a/tests-integration/src/lib.rs b/tests-integration/src/lib.rs index b871a9768a..46981cf439 100644 --- a/tests-integration/src/lib.rs +++ b/tests-integration/src/lib.rs @@ -18,6 +18,7 @@ mod grpc; mod influxdb; mod instance; mod opentsdb; +mod otlp; mod prom_store; pub mod test_util; diff --git a/tests-integration/src/otlp.rs b/tests-integration/src/otlp.rs new file mode 100644 index 0000000000..e1fcf03cbb --- /dev/null +++ b/tests-integration/src/otlp.rs @@ -0,0 +1,138 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use client::DEFAULT_CATALOG_NAME; + use common_query::Output; + use common_recordbatch::RecordBatches; + use frontend::instance::Instance; + use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; + use opentelemetry_proto::tonic::common::v1::any_value::Value as Val; + use opentelemetry_proto::tonic::common::v1::{AnyValue, InstrumentationScope, KeyValue}; + use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value; + use opentelemetry_proto::tonic::metrics::v1::{metric, NumberDataPoint, *}; + use opentelemetry_proto::tonic::resource::v1::Resource; + use servers::query_handler::sql::SqlQueryHandler; + use servers::query_handler::OpenTelemetryProtocolHandler; + use session::context::QueryContext; + + use crate::tests; + + #[tokio::test(flavor = "multi_thread")] + pub async fn test_otlp_on_standalone() { + let standalone = tests::create_standalone_instance("test_standalone_otlp").await; + let instance = &standalone.instance; + + test_otlp(instance).await; + } + + #[tokio::test(flavor = "multi_thread")] + pub async fn test_otlp_on_distributed() { + let instance = tests::create_distributed_instance("test_standalone_otlp").await; + + test_otlp(&instance.frontend()).await; + } + + async fn test_otlp(instance: &Arc) { + let req = build_request(); + let db = "otlp"; + let ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, db)); + + assert!(SqlQueryHandler::do_query( + instance.as_ref(), + &format!("CREATE DATABASE IF NOT EXISTS {db}"), + ctx.clone(), + ) + .await + .get(0) + .unwrap() + .is_ok()); + + let resp = instance.metrics(req, ctx.clone()).await.unwrap(); + assert!(resp.partial_success.is_none()); + + let mut output = instance + .do_query( + "SELECT * FROM my_test_metric ORDER BY greptime_timestamp", + ctx.clone(), + ) + .await; + let output = output.remove(0).unwrap(); + let Output::Stream(stream) = output else { unreachable!() }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!( + recordbatches.pretty_print().unwrap(), + "\ ++------------+-------+------------+---------------------+----------------+ +| resource | scope | host | greptime_timestamp | greptime_value | ++------------+-------+------------+---------------------+----------------+ +| greptimedb | otel | testserver | 1970-01-01T00:00:00 | 105.0 | +| greptimedb | otel | testsevrer | 1970-01-01T00:00:00 | 100.0 | ++------------+-------+------------+---------------------+----------------+", + ); + } + + fn build_request() -> ExportMetricsServiceRequest { + let data_points = vec![ + NumberDataPoint { + attributes: vec![keyvalue("host", "testsevrer")], + time_unix_nano: 100, + value: Some(Value::AsInt(100)), + ..Default::default() + }, + NumberDataPoint { + attributes: vec![keyvalue("host", "testserver")], + time_unix_nano: 105, + value: Some(Value::AsInt(105)), + ..Default::default() + }, + ]; + let gauge = Gauge { data_points }; + + ExportMetricsServiceRequest { + resource_metrics: vec![ResourceMetrics { + scope_metrics: vec![ScopeMetrics { + metrics: vec![Metric { + name: "my.test.metric".into(), + description: "my ignored desc".into(), + unit: "my ignored unit".into(), + data: Some(metric::Data::Gauge(gauge)), + }], + scope: Some(InstrumentationScope { + attributes: vec![keyvalue("scope", "otel")], + ..Default::default() + }), + ..Default::default() + }], + resource: Some(Resource { + attributes: vec![keyvalue("resource", "greptimedb")], + dropped_attributes_count: 0, + }), + ..Default::default() + }], + } + } + + fn keyvalue(key: &str, value: &str) -> KeyValue { + KeyValue { + key: key.into(), + value: Some(AnyValue { + value: Some(Val::StringValue(value.into())), + }), + } + } +}