feat: Add more tags for OTLP metrics protocol (#2003)

* test: add integration tests for otlp

* feat: add resource and scope attributes as tag
This commit is contained in:
Ning Sun
2023-07-21 10:02:43 +08:00
committed by GitHub
parent 51fe074666
commit a7557b70f1
5 changed files with 252 additions and 41 deletions

View File

@@ -44,18 +44,21 @@ fn normalize_otlp_name(name: &str) -> String {
pub fn to_grpc_insert_requests(
request: ExportMetricsServiceRequest,
) -> Result<(InsertRequests, usize)> {
let metrics = request
.resource_metrics
.iter()
.flat_map(|resource_metrics| &resource_metrics.scope_metrics)
.flat_map(|scope_metrics| &scope_metrics.metrics);
let mut insert_batch = Vec::with_capacity(metrics.size_hint().0);
let mut insert_batch = Vec::new();
let mut rows = 0;
for metric in metrics {
if let Some(insert) = encode_metrics(metric)? {
rows += insert.row_count;
insert_batch.push(insert);
for resource in request.resource_metrics {
let resource_attrs = resource.resource.map(|r| r.attributes);
for scope in resource.scope_metrics {
let scope_attrs = scope.scope.map(|s| s.attributes);
for metric in scope.metrics {
if let Some(insert) =
encode_metrics(&metric, resource_attrs.as_ref(), scope_attrs.as_ref())?
{
rows += insert.row_count;
insert_batch.push(insert);
}
}
}
}
@@ -66,15 +69,23 @@ pub fn to_grpc_insert_requests(
Ok((inserts, rows as usize))
}
fn encode_metrics(metric: &Metric) -> Result<Option<InsertRequest>> {
fn encode_metrics(
metric: &Metric,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<Option<InsertRequest>> {
let name = &metric.name;
// note that we don't store description or unit, we might want to deal with
// these fields in the future.
if let Some(data) = &metric.data {
match data {
metric::Data::Gauge(gauge) => encode_gauge(name, gauge).map(Some),
metric::Data::Sum(sum) => encode_sum(name, sum).map(Some),
metric::Data::Summary(summary) => encode_summary(name, summary).map(Some),
metric::Data::Gauge(gauge) => {
encode_gauge(name, gauge, resource_attrs, scope_attrs).map(Some)
}
metric::Data::Sum(sum) => encode_sum(name, sum, resource_attrs, scope_attrs).map(Some),
metric::Data::Summary(summary) => {
encode_summary(name, summary, resource_attrs, scope_attrs).map(Some)
}
// TODO(sunng87) leave histogram for next release
metric::Data::Histogram(_hist) => Ok(None),
metric::Data::ExponentialHistogram(_hist) => Ok(None),
@@ -84,6 +95,15 @@ fn encode_metrics(metric: &Metric) -> Result<Option<InsertRequest>> {
}
}
fn write_attributes(lines: &mut LinesWriter, attrs: Option<&Vec<KeyValue>>) -> Result<()> {
if let Some(attrs) = attrs {
for attr in attrs {
write_attribute(lines, attr)?;
}
}
Ok(())
}
fn write_attribute(lines: &mut LinesWriter, attr: &KeyValue) -> Result<()> {
if let Some(val) = attr.value.as_ref().and_then(|v| v.value.as_ref()) {
match val {
@@ -136,16 +156,18 @@ fn write_data_point_value(
///
/// note that there can be multiple data points in the request, it's going to be
/// stored as multiple rows
fn encode_gauge(name: &str, gauge: &Gauge) -> Result<InsertRequest> {
fn encode_gauge(
name: &str,
gauge: &Gauge,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<InsertRequest> {
let mut lines = LinesWriter::with_lines(gauge.data_points.len());
for data_point in &gauge.data_points {
for attr in &data_point.attributes {
write_attribute(&mut lines, attr)?;
}
write_attributes(&mut lines, resource_attrs)?;
write_attributes(&mut lines, scope_attrs)?;
write_attributes(&mut lines, Some(data_point.attributes.as_ref()))?;
write_timestamp(&mut lines, data_point.time_unix_nano as i64)?;
write_data_point_value(&mut lines, GREPTIME_VALUE, &data_point.value)?;
lines.commit();
@@ -163,13 +185,18 @@ fn encode_gauge(name: &str, gauge: &Gauge) -> Result<InsertRequest> {
/// encode this sum metric
///
/// `aggregation_temporality` and `monotonic` are ignored for now
fn encode_sum(name: &str, sum: &Sum) -> Result<InsertRequest> {
fn encode_sum(
name: &str,
sum: &Sum,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<InsertRequest> {
let mut lines = LinesWriter::with_lines(sum.data_points.len());
for data_point in &sum.data_points {
for attr in &data_point.attributes {
write_attribute(&mut lines, attr)?;
}
write_attributes(&mut lines, resource_attrs)?;
write_attributes(&mut lines, scope_attrs)?;
write_attributes(&mut lines, Some(data_point.attributes.as_ref()))?;
write_timestamp(&mut lines, data_point.time_unix_nano as i64)?;
@@ -289,13 +316,18 @@ fn encode_exponential_histogram(name: &str, hist: &ExponentialHistogram) -> Resu
})
}
fn encode_summary(name: &str, summary: &Summary) -> Result<InsertRequest> {
fn encode_summary(
name: &str,
summary: &Summary,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<InsertRequest> {
let mut lines = LinesWriter::with_lines(summary.data_points.len());
for data_point in &summary.data_points {
for attr in &data_point.attributes {
write_attribute(&mut lines, attr)?;
}
write_attributes(&mut lines, resource_attrs)?;
write_attributes(&mut lines, scope_attrs)?;
write_attributes(&mut lines, Some(data_point.attributes.as_ref()))?;
write_timestamp(&mut lines, data_point.time_unix_nano as i64)?;
@@ -303,14 +335,14 @@ fn encode_summary(name: &str, summary: &Summary) -> Result<InsertRequest> {
// here we don't store bucket boundary
lines
.write_f64(
&format!("p{:02}", quantile.quantile * 100f64),
&format!("greptime_p{:02}", quantile.quantile * 100f64),
quantile.value,
)
.context(error::OtlpMetricsWriteSnafu)?;
}
lines
.write_u64("count", data_point.count)
.write_u64("greptime_count", data_point.count)
.context(error::OtlpMetricsWriteSnafu)?;
lines.commit();
@@ -370,18 +402,30 @@ mod tests {
},
];
let gauge = Gauge { data_points };
let inserts = encode_gauge("datamon", &gauge).unwrap();
let inserts = encode_gauge(
"datamon",
&gauge,
Some(&vec![keyvalue("resource", "app")]),
Some(&vec![keyvalue("scope", "otel")]),
)
.unwrap();
assert_eq!(inserts.table_name, "datamon");
assert_eq!(inserts.row_count, 2);
assert_eq!(inserts.columns.len(), 3);
assert_eq!(inserts.columns.len(), 5);
assert_eq!(
inserts
.columns
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec!["host", "greptime_timestamp", "greptime_value"]
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"greptime_value"
]
);
}
@@ -405,18 +449,30 @@ mod tests {
data_points,
..Default::default()
};
let inserts = encode_sum("datamon", &sum).unwrap();
let inserts = encode_sum(
"datamon",
&sum,
Some(&vec![keyvalue("resource", "app")]),
Some(&vec![keyvalue("scope", "otel")]),
)
.unwrap();
assert_eq!(inserts.table_name, "datamon");
assert_eq!(inserts.row_count, 2);
assert_eq!(inserts.columns.len(), 3);
assert_eq!(inserts.columns.len(), 5);
assert_eq!(
inserts
.columns
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec!["host", "greptime_timestamp", "greptime_value"]
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"greptime_value"
]
);
}
@@ -440,18 +496,32 @@ mod tests {
..Default::default()
}];
let summary = Summary { data_points };
let inserts = encode_summary("datamon", &summary).unwrap();
let inserts = encode_summary(
"datamon",
&summary,
Some(&vec![keyvalue("resource", "app")]),
Some(&vec![keyvalue("scope", "otel")]),
)
.unwrap();
assert_eq!(inserts.table_name, "datamon");
assert_eq!(inserts.row_count, 1);
assert_eq!(inserts.columns.len(), 5);
assert_eq!(inserts.columns.len(), 7);
assert_eq!(
inserts
.columns
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec!["host", "greptime_timestamp", "p90", "p95", "count"]
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"greptime_p90",
"greptime_p95",
"greptime_count"
]
);
}
}