mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-16 21:10:38 +00:00
feat: add otlp to prometheus naming translation options (#8113)
* chore: extract otlp translator Signed-off-by: shuiyisong <xixing.sys@gmail.com> * feat: add header parsing Signed-off-by: shuiyisong <xixing.sys@gmail.com> * chore: add tests Signed-off-by: shuiyisong <xixing.sys@gmail.com> * chore: add tests Signed-off-by: shuiyisong <xixing.sys@gmail.com> * chore: simplify and merge tests Signed-off-by: shuiyisong <xixing.sys@gmail.com> --------- Signed-off-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
@@ -20,6 +20,7 @@ use axum::http::StatusCode;
|
||||
use axum::http::request::Parts;
|
||||
use http::HeaderMap;
|
||||
use pipeline::{GreptimePipelineParams, SelectInfo, truthy};
|
||||
use session::protocol_ctx::OtlpMetricTranslationStrategy;
|
||||
|
||||
use crate::http::header::constants::{
|
||||
GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME,
|
||||
@@ -27,7 +28,8 @@ use crate::http::header::constants::{
|
||||
GREPTIME_OTLP_METRIC_IGNORE_RESOURCE_ATTRS_HEADER_NAME,
|
||||
GREPTIME_OTLP_METRIC_PROMOTE_ALL_RESOURCE_ATTRS_HEADER_NAME,
|
||||
GREPTIME_OTLP_METRIC_PROMOTE_RESOURCE_ATTRS_HEADER_NAME,
|
||||
GREPTIME_OTLP_METRIC_PROMOTE_SCOPE_ATTRS_HEADER_NAME, GREPTIME_PIPELINE_NAME_HEADER_NAME,
|
||||
GREPTIME_OTLP_METRIC_PROMOTE_SCOPE_ATTRS_HEADER_NAME,
|
||||
GREPTIME_OTLP_METRIC_TRANSLATION_STRATEGY_HEADER_NAME, GREPTIME_PIPELINE_NAME_HEADER_NAME,
|
||||
GREPTIME_PIPELINE_PARAMS_HEADER, GREPTIME_PIPELINE_VERSION_HEADER_NAME,
|
||||
GREPTIME_TRACE_TABLE_NAME_HEADER_NAME,
|
||||
};
|
||||
@@ -147,6 +149,9 @@ pub struct OtlpMetricOptions {
|
||||
/// Persist scope attributes to the table
|
||||
/// If false, persist none
|
||||
pub promote_scope_attrs: bool,
|
||||
|
||||
/// Metric and label name translation strategy.
|
||||
pub metric_translation_strategy: OtlpMetricTranslationStrategy,
|
||||
}
|
||||
|
||||
impl<S> FromRequestParts<S> for OtlpMetricOptions
|
||||
@@ -181,14 +186,39 @@ where
|
||||
.map(truthy)
|
||||
.unwrap_or(false);
|
||||
|
||||
let metric_translation_strategy = string_value_from_header(
|
||||
headers,
|
||||
&[GREPTIME_OTLP_METRIC_TRANSLATION_STRATEGY_HEADER_NAME],
|
||||
)?
|
||||
.map(|value| parse_otlp_metric_translation_strategy(&value))
|
||||
.transpose()?
|
||||
.unwrap_or_default();
|
||||
|
||||
Ok(OtlpMetricOptions {
|
||||
promote_all_resource_attrs,
|
||||
resource_attrs,
|
||||
promote_scope_attrs,
|
||||
metric_translation_strategy,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_otlp_metric_translation_strategy(
|
||||
value: &str,
|
||||
) -> Result<OtlpMetricTranslationStrategy, (StatusCode, String)> {
|
||||
value.parse().map_err(|_| {
|
||||
(
|
||||
StatusCode::BAD_REQUEST,
|
||||
format!(
|
||||
"`{}` header value `{}` is invalid. Expected one of: {}.",
|
||||
GREPTIME_OTLP_METRIC_TRANSLATION_STRATEGY_HEADER_NAME,
|
||||
value,
|
||||
OtlpMetricTranslationStrategy::VALUES.join(", ")
|
||||
),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn string_value_from_header(
|
||||
headers: &HeaderMap,
|
||||
@@ -208,3 +238,95 @@ fn string_value_from_header(
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use axum::http::Request;
|
||||
use session::protocol_ctx::OtlpMetricTranslationStrategy;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_parse_otlp_metric_translation_strategy() {
|
||||
let cases = [
|
||||
(
|
||||
"UnderscoreEscapingWithSuffixes",
|
||||
OtlpMetricTranslationStrategy::UnderscoreEscapingWithSuffixes,
|
||||
),
|
||||
(
|
||||
"UnderscoreEscapingWithoutSuffixes",
|
||||
OtlpMetricTranslationStrategy::UnderscoreEscapingWithoutSuffixes,
|
||||
),
|
||||
(
|
||||
"NoUTF8EscapingWithSuffixes",
|
||||
OtlpMetricTranslationStrategy::NoUtf8EscapingWithSuffixes,
|
||||
),
|
||||
(
|
||||
"NoTranslation",
|
||||
OtlpMetricTranslationStrategy::NoTranslation,
|
||||
),
|
||||
];
|
||||
|
||||
for (value, expected) in cases {
|
||||
assert_eq!(
|
||||
parse_otlp_metric_translation_strategy(value).unwrap(),
|
||||
expected
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_otlp_metric_translation_strategy_rejects_invalid_value() {
|
||||
let err = parse_otlp_metric_translation_strategy("no_translation").unwrap_err();
|
||||
assert_eq!(err.0, StatusCode::BAD_REQUEST);
|
||||
assert!(
|
||||
err.1
|
||||
.contains(GREPTIME_OTLP_METRIC_TRANSLATION_STRATEGY_HEADER_NAME)
|
||||
);
|
||||
assert!(err.1.contains("no_translation"));
|
||||
assert!(err.1.contains("NoTranslation"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_otlp_metric_options_extracts_translation_strategy() {
|
||||
let (mut parts, _) = Request::builder()
|
||||
.header(
|
||||
GREPTIME_OTLP_METRIC_TRANSLATION_STRATEGY_HEADER_NAME,
|
||||
"NoTranslation",
|
||||
)
|
||||
.body(())
|
||||
.unwrap()
|
||||
.into_parts();
|
||||
|
||||
let opts = OtlpMetricOptions::from_request_parts(&mut parts, &())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
opts.metric_translation_strategy,
|
||||
OtlpMetricTranslationStrategy::NoTranslation
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_otlp_metric_options_rejects_invalid_translation_strategy() {
|
||||
let (mut parts, _) = Request::builder()
|
||||
.header(
|
||||
GREPTIME_OTLP_METRIC_TRANSLATION_STRATEGY_HEADER_NAME,
|
||||
"no_translation",
|
||||
)
|
||||
.body(())
|
||||
.unwrap()
|
||||
.into_parts();
|
||||
|
||||
let err = match OtlpMetricOptions::from_request_parts(&mut parts, &()).await {
|
||||
Ok(_) => panic!("invalid metric translation strategy should be rejected"),
|
||||
Err(err) => err,
|
||||
};
|
||||
assert_eq!(err.0, StatusCode::BAD_REQUEST);
|
||||
assert!(
|
||||
err.1
|
||||
.contains(GREPTIME_OTLP_METRIC_TRANSLATION_STRATEGY_HEADER_NAME)
|
||||
);
|
||||
assert!(err.1.contains("no_translation"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,6 +68,8 @@ pub mod constants {
|
||||
"x-greptime-otlp-metric-ignore-resource-attrs";
|
||||
pub const GREPTIME_OTLP_METRIC_PROMOTE_SCOPE_ATTRS_HEADER_NAME: &str =
|
||||
"x-greptime-otlp-metric-promote-scope-attrs";
|
||||
pub const GREPTIME_OTLP_METRIC_TRANSLATION_STRATEGY_HEADER_NAME: &str =
|
||||
"x-greptime-otlp-metric-translation-strategy";
|
||||
|
||||
/// The header key that contains the pipeline params.
|
||||
pub const GREPTIME_PIPELINE_PARAMS_HEADER: &str = "x-greptime-pipeline-params";
|
||||
|
||||
@@ -114,6 +114,7 @@ pub async fn metrics(
|
||||
// set is_legacy later
|
||||
is_legacy: false,
|
||||
metric_type: MetricType::Init,
|
||||
metric_translation_strategy: http_opts.metric_translation_strategy,
|
||||
}));
|
||||
let query_ctx = Arc::new(query_ctx);
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use ahash::{HashMap, HashSet};
|
||||
use ahash::HashSet;
|
||||
use api::v1::{RowInsertRequests, Value};
|
||||
use common_grpc::precision::Precision;
|
||||
use common_query::prelude::{GREPTIME_COUNT, greptime_timestamp, greptime_value};
|
||||
@@ -20,13 +20,17 @@ use lazy_static::lazy_static;
|
||||
use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
|
||||
use otel_arrow_rust::proto::opentelemetry::common::v1::{AnyValue, KeyValue, any_value};
|
||||
use otel_arrow_rust::proto::opentelemetry::metrics::v1::{metric, number_data_point, *};
|
||||
use regex::Regex;
|
||||
use session::protocol_ctx::{MetricType, OtlpMetricCtx};
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::otlp::trace::{KEY_SERVICE_INSTANCE_ID, KEY_SERVICE_NAME};
|
||||
use crate::row_writer::{self, MultiTableData, TableData};
|
||||
|
||||
mod translator;
|
||||
|
||||
pub use translator::legacy_normalize_otlp_name;
|
||||
use translator::{translate_label_name, translate_metric_name};
|
||||
|
||||
/// the default column count for table writer
|
||||
const APPROXIMATE_COLUMN_COUNT: usize = 8;
|
||||
|
||||
@@ -36,11 +40,6 @@ const SUM_TABLE_SUFFIX: &str = "_sum";
|
||||
const JOB_KEY: &str = "job";
|
||||
const INSTANCE_KEY: &str = "instance";
|
||||
|
||||
const UNDERSCORE: &str = "_";
|
||||
const DOUBLE_UNDERSCORE: &str = "__";
|
||||
const TOTAL: &str = "total";
|
||||
const RATIO: &str = "ratio";
|
||||
|
||||
// see: https://prometheus.io/docs/guides/opentelemetry/#promoting-resource-attributes
|
||||
const DEFAULT_PROMOTE_ATTRS: [&str; 19] = [
|
||||
"service.instance.id",
|
||||
@@ -67,48 +66,6 @@ const DEFAULT_PROMOTE_ATTRS: [&str; 19] = [
|
||||
lazy_static! {
|
||||
static ref DEFAULT_PROMOTE_ATTRS_SET: HashSet<String> =
|
||||
HashSet::from_iter(DEFAULT_PROMOTE_ATTRS.iter().map(|s| s.to_string()));
|
||||
static ref NON_ALPHA_NUM_CHAR: Regex = Regex::new(r"[^a-zA-Z0-9]").unwrap();
|
||||
static ref UNIT_MAP: HashMap<String, String> = [
|
||||
// Time
|
||||
("d", "days"),
|
||||
("h", "hours"),
|
||||
("min", "minutes"),
|
||||
("s", "seconds"),
|
||||
("ms", "milliseconds"),
|
||||
("us", "microseconds"),
|
||||
("ns", "nanoseconds"),
|
||||
// Bytes
|
||||
("By", "bytes"),
|
||||
("KiBy", "kibibytes"),
|
||||
("MiBy", "mebibytes"),
|
||||
("GiBy", "gibibytes"),
|
||||
("TiBy", "tibibytes"),
|
||||
("KBy", "kilobytes"),
|
||||
("MBy", "megabytes"),
|
||||
("GBy", "gigabytes"),
|
||||
("TBy", "terabytes"),
|
||||
// SI
|
||||
("m", "meters"),
|
||||
("V", "volts"),
|
||||
("A", "amperes"),
|
||||
("J", "joules"),
|
||||
("W", "watts"),
|
||||
("g", "grams"),
|
||||
// Misc
|
||||
("Cel", "celsius"),
|
||||
("Hz", "hertz"),
|
||||
("1", ""),
|
||||
("%", "percent"),
|
||||
].iter().map(|(k, v)| (k.to_string(), v.to_string())).collect();
|
||||
static ref PER_UNIT_MAP: HashMap<String, String> = [
|
||||
("s", "second"),
|
||||
("m", "minute"),
|
||||
("h", "hour"),
|
||||
("d", "day"),
|
||||
("w", "week"),
|
||||
("mo", "month"),
|
||||
("y", "year"),
|
||||
].iter().map(|(k, v)| (k.to_string(), v.to_string())).collect();
|
||||
}
|
||||
|
||||
const OTEL_SCOPE_NAME: &str = "name";
|
||||
@@ -248,120 +205,6 @@ fn process_scope_attrs(scope: &ScopeMetrics, metric_ctx: &OtlpMetricCtx) -> Opti
|
||||
})
|
||||
}
|
||||
|
||||
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/145942706622aba5c276ca47f48df438228bfea4/pkg/translator/prometheus/normalize_name.go#L55
|
||||
pub fn normalize_metric_name(metric: &Metric, metric_type: &MetricType) -> String {
|
||||
// Split metric name in "tokens" (remove all non-alphanumeric), filtering out empty strings
|
||||
let mut name_tokens: Vec<String> = NON_ALPHA_NUM_CHAR
|
||||
.split(&metric.name)
|
||||
.filter_map(|s| {
|
||||
let trimmed = s.trim();
|
||||
if trimmed.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(trimmed.to_string())
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Append unit if it exists
|
||||
if !metric.unit.is_empty() {
|
||||
let (main, per) = build_unit_suffix(&metric.unit);
|
||||
if let Some(main) = main
|
||||
&& !name_tokens.contains(&main)
|
||||
{
|
||||
name_tokens.push(main);
|
||||
}
|
||||
if let Some(per) = per
|
||||
&& !name_tokens.contains(&per)
|
||||
{
|
||||
name_tokens.push("per".to_string());
|
||||
name_tokens.push(per);
|
||||
}
|
||||
}
|
||||
|
||||
// Append _total for Counters (monotonic sums)
|
||||
if matches!(metric_type, MetricType::MonotonicSum) {
|
||||
// Remove existing "total" tokens first, then append
|
||||
name_tokens.retain(|t| t != TOTAL);
|
||||
name_tokens.push(TOTAL.to_string());
|
||||
}
|
||||
|
||||
// Append _ratio for metrics with unit "1" (gauges only)
|
||||
if metric.unit == "1" && matches!(metric_type, MetricType::Gauge) {
|
||||
// Remove existing "ratio" tokens first, then append
|
||||
name_tokens.retain(|t| t != RATIO);
|
||||
name_tokens.push(RATIO.to_string());
|
||||
}
|
||||
|
||||
// Build the string from the tokens, separated with underscores
|
||||
let name = name_tokens.join(UNDERSCORE);
|
||||
|
||||
// Metric name cannot start with a digit, so prefix it with "_" in this case
|
||||
if let Some((_, first)) = name.char_indices().next()
|
||||
&& first.is_ascii_digit()
|
||||
{
|
||||
format!("_{}", name)
|
||||
} else {
|
||||
name
|
||||
}
|
||||
}
|
||||
|
||||
fn build_unit_suffix(unit: &str) -> (Option<String>, Option<String>) {
|
||||
let (main, per) = unit.split_once('/').unwrap_or((unit, ""));
|
||||
(check_unit(main, &UNIT_MAP), check_unit(per, &PER_UNIT_MAP))
|
||||
}
|
||||
|
||||
fn check_unit(unit_str: &str, unit_map: &HashMap<String, String>) -> Option<String> {
|
||||
let u = unit_str.trim();
|
||||
// Skip units that are empty, contain "{" or "}" characters
|
||||
if !u.is_empty() && !u.contains('{') && !u.contains('}') {
|
||||
let u = unit_map.get(u).map(|s| s.as_ref()).unwrap_or(u);
|
||||
let u = clean_unit_name(u);
|
||||
if !u.is_empty() {
|
||||
return Some(u);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn clean_unit_name(name: &str) -> String {
|
||||
// Split on non-alphanumeric characters, filter out empty strings, then join with underscores
|
||||
// This matches the Go implementation: strings.FieldsFunc + strings.Join
|
||||
NON_ALPHA_NUM_CHAR
|
||||
.split(name)
|
||||
.filter(|s| !s.is_empty())
|
||||
.collect::<Vec<&str>>()
|
||||
.join(UNDERSCORE)
|
||||
}
|
||||
|
||||
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/145942706622aba5c276ca47f48df438228bfea4/pkg/translator/prometheus/normalize_label.go#L27
|
||||
pub fn normalize_label_name(name: &str) -> String {
|
||||
if name.is_empty() {
|
||||
return name.to_string();
|
||||
}
|
||||
|
||||
let n = NON_ALPHA_NUM_CHAR.replace_all(name, UNDERSCORE);
|
||||
if let Some((_, first)) = n.char_indices().next()
|
||||
&& first.is_ascii_digit()
|
||||
{
|
||||
return format!("key_{}", n);
|
||||
}
|
||||
if n.starts_with(UNDERSCORE) && !n.starts_with(DOUBLE_UNDERSCORE) {
|
||||
return format!("key{}", n);
|
||||
}
|
||||
n.to_string()
|
||||
}
|
||||
|
||||
/// Normalize otlp instrumentation, metric and attribute names
|
||||
///
|
||||
/// <https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#instrument-name-syntax>
|
||||
/// - since the name are case-insensitive, we transform them to lowercase for
|
||||
/// better sql usability
|
||||
/// - replace `.` and `-` with `_`
|
||||
pub fn legacy_normalize_otlp_name(name: &str) -> String {
|
||||
name.to_lowercase().replace(['.', '-'], "_")
|
||||
}
|
||||
|
||||
fn encode_metrics(
|
||||
table_writer: &mut MultiTableData,
|
||||
metric: &Metric,
|
||||
@@ -372,7 +215,11 @@ fn encode_metrics(
|
||||
let name = if metric_ctx.is_legacy {
|
||||
legacy_normalize_otlp_name(&metric.name)
|
||||
} else {
|
||||
normalize_metric_name(metric, &metric_ctx.metric_type)
|
||||
translate_metric_name(
|
||||
metric,
|
||||
&metric_ctx.metric_type,
|
||||
metric_ctx.metric_translation_strategy,
|
||||
)
|
||||
};
|
||||
|
||||
// note that we don't store description or unit, we might want to deal with
|
||||
@@ -440,6 +287,7 @@ fn write_attributes(
|
||||
row: &mut Vec<Value>,
|
||||
attrs: Option<&Vec<KeyValue>>,
|
||||
attribute_type: AttributeType,
|
||||
metric_ctx: &OtlpMetricCtx,
|
||||
) -> Result<()> {
|
||||
let Some(attrs) = attrs else {
|
||||
return Ok(());
|
||||
@@ -452,10 +300,13 @@ fn write_attributes(
|
||||
.and_then(|val| {
|
||||
let key = match attribute_type {
|
||||
AttributeType::Resource | AttributeType::DataPoint => {
|
||||
normalize_label_name(&attr.key)
|
||||
translate_label_name(&attr.key, metric_ctx.metric_translation_strategy)
|
||||
}
|
||||
AttributeType::Scope => {
|
||||
format!("otel_scope_{}", normalize_label_name(&attr.key))
|
||||
format!(
|
||||
"otel_scope_{}",
|
||||
translate_label_name(&attr.key, metric_ctx.metric_translation_strategy)
|
||||
)
|
||||
}
|
||||
AttributeType::Legacy => legacy_normalize_otlp_name(&attr.key),
|
||||
};
|
||||
@@ -526,14 +377,38 @@ fn write_tags_and_timestamp(
|
||||
metric_ctx: &OtlpMetricCtx,
|
||||
) -> Result<()> {
|
||||
if metric_ctx.is_legacy {
|
||||
write_attributes(table, row, resource_attrs, AttributeType::Legacy)?;
|
||||
write_attributes(table, row, scope_attrs, AttributeType::Legacy)?;
|
||||
write_attributes(table, row, data_point_attrs, AttributeType::Legacy)?;
|
||||
write_attributes(
|
||||
table,
|
||||
row,
|
||||
resource_attrs,
|
||||
AttributeType::Legacy,
|
||||
metric_ctx,
|
||||
)?;
|
||||
write_attributes(table, row, scope_attrs, AttributeType::Legacy, metric_ctx)?;
|
||||
write_attributes(
|
||||
table,
|
||||
row,
|
||||
data_point_attrs,
|
||||
AttributeType::Legacy,
|
||||
metric_ctx,
|
||||
)?;
|
||||
} else {
|
||||
// TODO(shuiyisong): check `__type__` and `__unit__` tags in prometheus
|
||||
write_attributes(table, row, resource_attrs, AttributeType::Resource)?;
|
||||
write_attributes(table, row, scope_attrs, AttributeType::Scope)?;
|
||||
write_attributes(table, row, data_point_attrs, AttributeType::DataPoint)?;
|
||||
write_attributes(
|
||||
table,
|
||||
row,
|
||||
resource_attrs,
|
||||
AttributeType::Resource,
|
||||
metric_ctx,
|
||||
)?;
|
||||
write_attributes(table, row, scope_attrs, AttributeType::Scope, metric_ctx)?;
|
||||
write_attributes(
|
||||
table,
|
||||
row,
|
||||
data_point_attrs,
|
||||
AttributeType::DataPoint,
|
||||
metric_ctx,
|
||||
)?;
|
||||
}
|
||||
|
||||
write_timestamp(table, row, timestamp_nanos, metric_ctx.is_legacy)?;
|
||||
@@ -880,570 +755,6 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_legacy_normalize_otlp_name() {
|
||||
assert_eq!(
|
||||
legacy_normalize_otlp_name("jvm.memory.free"),
|
||||
"jvm_memory_free"
|
||||
);
|
||||
assert_eq!(
|
||||
legacy_normalize_otlp_name("jvm-memory-free"),
|
||||
"jvm_memory_free"
|
||||
);
|
||||
assert_eq!(
|
||||
legacy_normalize_otlp_name("jvm_memory_free"),
|
||||
"jvm_memory_free"
|
||||
);
|
||||
assert_eq!(
|
||||
legacy_normalize_otlp_name("JVM_MEMORY_FREE"),
|
||||
"jvm_memory_free"
|
||||
);
|
||||
assert_eq!(
|
||||
legacy_normalize_otlp_name("JVM_memory_FREE"),
|
||||
"jvm_memory_free"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_normalize_metric_name() {
|
||||
let test_cases = vec![
|
||||
// Default case
|
||||
(Metric::default(), MetricType::Init, ""),
|
||||
// Basic metric with just name
|
||||
(
|
||||
Metric {
|
||||
name: "foo".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::Init,
|
||||
"foo",
|
||||
),
|
||||
// Metric with unit "s" should append "seconds"
|
||||
(
|
||||
Metric {
|
||||
name: "foo".to_string(),
|
||||
unit: "s".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::Init,
|
||||
"foo_seconds",
|
||||
),
|
||||
// Metric already ending with unit suffix should not duplicate
|
||||
(
|
||||
Metric {
|
||||
name: "foo_seconds".to_string(),
|
||||
unit: "s".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::Init,
|
||||
"foo_seconds",
|
||||
),
|
||||
// Monotonic sum should append "total"
|
||||
(
|
||||
Metric {
|
||||
name: "foo".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::MonotonicSum,
|
||||
"foo_total",
|
||||
),
|
||||
// Metric already ending with "total" should not duplicate
|
||||
(
|
||||
Metric {
|
||||
name: "foo_total".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::MonotonicSum,
|
||||
"foo_total",
|
||||
),
|
||||
// Monotonic sum with unit should append both unit and "total"
|
||||
(
|
||||
Metric {
|
||||
name: "foo".to_string(),
|
||||
unit: "s".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::MonotonicSum,
|
||||
"foo_seconds_total",
|
||||
),
|
||||
// Metric with unit suffix and monotonic sum
|
||||
(
|
||||
Metric {
|
||||
name: "foo_seconds".to_string(),
|
||||
unit: "s".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::MonotonicSum,
|
||||
"foo_seconds_total",
|
||||
),
|
||||
// Metric already ending with "total" and has unit
|
||||
(
|
||||
Metric {
|
||||
name: "foo_total".to_string(),
|
||||
unit: "s".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::MonotonicSum,
|
||||
"foo_seconds_total",
|
||||
),
|
||||
// Metric already ending with both unit and "total"
|
||||
(
|
||||
Metric {
|
||||
name: "foo_seconds_total".to_string(),
|
||||
unit: "s".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::MonotonicSum,
|
||||
"foo_seconds_total",
|
||||
),
|
||||
// Metric with unusual order (total_seconds) should be normalized
|
||||
(
|
||||
Metric {
|
||||
name: "foo_total_seconds".to_string(),
|
||||
unit: "s".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::MonotonicSum,
|
||||
"foo_seconds_total",
|
||||
),
|
||||
// Gauge with unit "1" should append "ratio"
|
||||
(
|
||||
Metric {
|
||||
name: "foo".to_string(),
|
||||
unit: "1".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::Gauge,
|
||||
"foo_ratio",
|
||||
),
|
||||
// Complex unit like "m/s" should be converted to "meters_per_second"
|
||||
(
|
||||
Metric {
|
||||
name: "foo".to_string(),
|
||||
unit: "m/s".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::Init,
|
||||
"foo_meters_per_second",
|
||||
),
|
||||
// Metric with partial unit match
|
||||
(
|
||||
Metric {
|
||||
name: "foo_second".to_string(),
|
||||
unit: "m/s".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::Init,
|
||||
"foo_second_meters",
|
||||
),
|
||||
// Metric already containing the main unit
|
||||
(
|
||||
Metric {
|
||||
name: "foo_meters".to_string(),
|
||||
unit: "m/s".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::Init,
|
||||
"foo_meters_per_second",
|
||||
),
|
||||
];
|
||||
|
||||
for (metric, metric_type, expected) in test_cases {
|
||||
let result = normalize_metric_name(&metric, &metric_type);
|
||||
assert_eq!(
|
||||
result, expected,
|
||||
"Failed for metric name: '{}', unit: '{}', type: {:?}",
|
||||
metric.name, metric.unit, metric_type
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_normalize_metric_name_edge_cases() {
|
||||
let test_cases = vec![
|
||||
// Edge case: name with multiple non-alphanumeric chars in a row
|
||||
(
|
||||
Metric {
|
||||
name: "foo--bar__baz".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::Init,
|
||||
"foo_bar_baz",
|
||||
),
|
||||
// Edge case: name starting and ending with non-alphanumeric
|
||||
(
|
||||
Metric {
|
||||
name: "-foo_bar-".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::Init,
|
||||
"foo_bar",
|
||||
),
|
||||
// Edge case: name with only special chars (should be empty)
|
||||
(
|
||||
Metric {
|
||||
name: "--___--".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::Init,
|
||||
"",
|
||||
),
|
||||
// Edge case: name starting with digit
|
||||
(
|
||||
Metric {
|
||||
name: "2xx_requests".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::Init,
|
||||
"_2xx_requests",
|
||||
),
|
||||
];
|
||||
|
||||
for (metric, metric_type, expected) in test_cases {
|
||||
let result = normalize_metric_name(&metric, &metric_type);
|
||||
assert_eq!(
|
||||
result, expected,
|
||||
"Failed for metric name: '{}', unit: '{}', type: {:?}",
|
||||
metric.name, metric.unit, metric_type
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_normalize_label_name() {
|
||||
let test_cases = vec![
|
||||
("", ""),
|
||||
("foo", "foo"),
|
||||
("foo_bar/baz:abc", "foo_bar_baz_abc"),
|
||||
("1foo", "key_1foo"),
|
||||
("_foo", "key_foo"),
|
||||
("__bar", "__bar"),
|
||||
];
|
||||
|
||||
for (input, expected) in test_cases {
|
||||
let result = normalize_label_name(input);
|
||||
assert_eq!(
|
||||
result, expected,
|
||||
"unexpected result for input '{}'; got '{}'; want '{}'",
|
||||
input, result, expected
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_clean_unit_name() {
|
||||
// Test the improved clean_unit_name function
|
||||
assert_eq!(clean_unit_name("faults"), "faults");
|
||||
assert_eq!(clean_unit_name("{faults}"), "faults"); // clean_unit_name still processes braces internally
|
||||
assert_eq!(clean_unit_name("req/sec"), "req_sec");
|
||||
assert_eq!(clean_unit_name("m/s"), "m_s");
|
||||
assert_eq!(clean_unit_name("___test___"), "test");
|
||||
assert_eq!(
|
||||
clean_unit_name("multiple__underscores"),
|
||||
"multiple_underscores"
|
||||
);
|
||||
assert_eq!(clean_unit_name(""), "");
|
||||
assert_eq!(clean_unit_name("___"), "");
|
||||
assert_eq!(clean_unit_name("bytes.per.second"), "bytes_per_second");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_normalize_metric_name_braced_units() {
|
||||
// Test that units with braces are rejected (not processed)
|
||||
let test_cases = vec![
|
||||
(
|
||||
Metric {
|
||||
name: "test.metric".to_string(),
|
||||
unit: "{faults}".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::MonotonicSum,
|
||||
"test_metric_total", // braced units are rejected, no unit suffix added
|
||||
),
|
||||
(
|
||||
Metric {
|
||||
name: "test.metric".to_string(),
|
||||
unit: "{operations}".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::Gauge,
|
||||
"test_metric", // braced units are rejected, no unit suffix added
|
||||
),
|
||||
(
|
||||
Metric {
|
||||
name: "test.metric".to_string(),
|
||||
unit: "{}".to_string(), // empty braces should be ignored due to contains('{') || contains('}')
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::Gauge,
|
||||
"test_metric",
|
||||
),
|
||||
(
|
||||
Metric {
|
||||
name: "test.metric".to_string(),
|
||||
unit: "faults".to_string(), // no braces, should work normally
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::Gauge,
|
||||
"test_metric_faults",
|
||||
),
|
||||
];
|
||||
|
||||
for (metric, metric_type, expected) in test_cases {
|
||||
let result = normalize_metric_name(&metric, &metric_type);
|
||||
assert_eq!(
|
||||
result, expected,
|
||||
"Failed for metric name: '{}', unit: '{}', type: {:?}. Got: '{}', Expected: '{}'",
|
||||
metric.name, metric.unit, metric_type, result, expected
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_normalize_metric_name_with_testdata() {
|
||||
// Test cases extracted from real OTLP metrics data from testdata.txt
|
||||
let test_cases = vec![
|
||||
// Basic system metrics with various units
|
||||
(
|
||||
Metric {
|
||||
name: "system.paging.faults".to_string(),
|
||||
unit: "{faults}".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::MonotonicSum,
|
||||
"system_paging_faults_total", // braced units are rejected, no unit suffix added
|
||||
),
|
||||
(
|
||||
Metric {
|
||||
name: "system.paging.operations".to_string(),
|
||||
unit: "{operations}".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::MonotonicSum,
|
||||
"system_paging_operations_total", // braced units are rejected, no unit suffix added
|
||||
),
|
||||
(
|
||||
Metric {
|
||||
name: "system.paging.usage".to_string(),
|
||||
unit: "By".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::NonMonotonicSum,
|
||||
"system_paging_usage_bytes",
|
||||
),
|
||||
// Load average metrics - gauge with custom unit
|
||||
(
|
||||
Metric {
|
||||
name: "system.cpu.load_average.15m".to_string(),
|
||||
unit: "{thread}".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::Gauge,
|
||||
"system_cpu_load_average_15m", // braced units are rejected, no unit suffix added
|
||||
),
|
||||
(
|
||||
Metric {
|
||||
name: "system.cpu.load_average.1m".to_string(),
|
||||
unit: "{thread}".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::Gauge,
|
||||
"system_cpu_load_average_1m", // braced units are rejected, no unit suffix added
|
||||
),
|
||||
// Disk I/O with bytes unit
|
||||
(
|
||||
Metric {
|
||||
name: "system.disk.io".to_string(),
|
||||
unit: "By".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::MonotonicSum,
|
||||
"system_disk_io_bytes_total",
|
||||
),
|
||||
// Time-based metrics with seconds unit
|
||||
(
|
||||
Metric {
|
||||
name: "system.disk.io_time".to_string(),
|
||||
unit: "s".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::MonotonicSum,
|
||||
"system_disk_io_time_seconds_total",
|
||||
),
|
||||
(
|
||||
Metric {
|
||||
name: "system.disk.operation_time".to_string(),
|
||||
unit: "s".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::MonotonicSum,
|
||||
"system_disk_operation_time_seconds_total",
|
||||
),
|
||||
// CPU time metric
|
||||
(
|
||||
Metric {
|
||||
name: "system.cpu.time".to_string(),
|
||||
unit: "s".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::MonotonicSum,
|
||||
"system_cpu_time_seconds_total",
|
||||
),
|
||||
// Process counts
|
||||
(
|
||||
Metric {
|
||||
name: "system.processes.count".to_string(),
|
||||
unit: "{processes}".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::NonMonotonicSum,
|
||||
"system_processes_count", // braced units are rejected, no unit suffix added
|
||||
),
|
||||
(
|
||||
Metric {
|
||||
name: "system.processes.created".to_string(),
|
||||
unit: "{processes}".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::MonotonicSum,
|
||||
"system_processes_created_total", // braced units are rejected, no unit suffix added
|
||||
),
|
||||
// Memory usage with bytes
|
||||
(
|
||||
Metric {
|
||||
name: "system.memory.usage".to_string(),
|
||||
unit: "By".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::NonMonotonicSum,
|
||||
"system_memory_usage_bytes",
|
||||
),
|
||||
// Uptime as gauge
|
||||
(
|
||||
Metric {
|
||||
name: "system.uptime".to_string(),
|
||||
unit: "s".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::Gauge,
|
||||
"system_uptime_seconds",
|
||||
),
|
||||
// Network metrics
|
||||
(
|
||||
Metric {
|
||||
name: "system.network.connections".to_string(),
|
||||
unit: "{connections}".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::NonMonotonicSum,
|
||||
"system_network_connections", // braced units are rejected, no unit suffix added
|
||||
),
|
||||
(
|
||||
Metric {
|
||||
name: "system.network.dropped".to_string(),
|
||||
unit: "{packets}".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::MonotonicSum,
|
||||
"system_network_dropped_total", // braced units are rejected, no unit suffix added
|
||||
),
|
||||
(
|
||||
Metric {
|
||||
name: "system.network.errors".to_string(),
|
||||
unit: "{errors}".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::MonotonicSum,
|
||||
"system_network_errors_total", // braced units are rejected, no unit suffix added
|
||||
),
|
||||
(
|
||||
Metric {
|
||||
name: "system.network.io".to_string(),
|
||||
unit: "By".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::MonotonicSum,
|
||||
"system_network_io_bytes_total",
|
||||
),
|
||||
(
|
||||
Metric {
|
||||
name: "system.network.packets".to_string(),
|
||||
unit: "{packets}".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::MonotonicSum,
|
||||
"system_network_packets_total", // braced units are rejected, no unit suffix added
|
||||
),
|
||||
// Filesystem metrics
|
||||
(
|
||||
Metric {
|
||||
name: "system.filesystem.inodes.usage".to_string(),
|
||||
unit: "{inodes}".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::NonMonotonicSum,
|
||||
"system_filesystem_inodes_usage", // braced units are rejected, no unit suffix added
|
||||
),
|
||||
(
|
||||
Metric {
|
||||
name: "system.filesystem.usage".to_string(),
|
||||
unit: "By".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::NonMonotonicSum,
|
||||
"system_filesystem_usage_bytes",
|
||||
),
|
||||
// Edge cases with special characters and numbers
|
||||
(
|
||||
Metric {
|
||||
name: "system.load.1".to_string(),
|
||||
unit: "1".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::Gauge,
|
||||
"system_load_1_ratio",
|
||||
),
|
||||
(
|
||||
Metric {
|
||||
name: "http.request.2xx".to_string(),
|
||||
unit: "{requests}".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::MonotonicSum,
|
||||
"http_request_2xx_total", // braced units are rejected, no unit suffix added
|
||||
),
|
||||
// Metric with dots and underscores mixed
|
||||
(
|
||||
Metric {
|
||||
name: "jvm.memory.heap_usage".to_string(),
|
||||
unit: "By".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::Gauge,
|
||||
"jvm_memory_heap_usage_bytes",
|
||||
),
|
||||
// Complex unit with per-second
|
||||
(
|
||||
Metric {
|
||||
name: "http.request.rate".to_string(),
|
||||
unit: "1/s".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::Gauge,
|
||||
"http_request_rate_per_second",
|
||||
),
|
||||
];
|
||||
|
||||
for (metric, metric_type, expected) in test_cases {
|
||||
let result = normalize_metric_name(&metric, &metric_type);
|
||||
assert_eq!(
|
||||
result, expected,
|
||||
"Failed for metric name: '{}', unit: '{}', type: {:?}. Got: '{}', Expected: '{}'",
|
||||
metric.name, metric.unit, metric_type, result, expected
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn keyvalue(key: &str, value: &str) -> KeyValue {
|
||||
KeyValue {
|
||||
key: key.into(),
|
||||
|
||||
535
src/servers/src/otlp/metrics/translator.rs
Normal file
535
src/servers/src/otlp/metrics/translator.rs
Normal file
@@ -0,0 +1,535 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use ahash::HashMap;
|
||||
use lazy_static::lazy_static;
|
||||
use otel_arrow_rust::proto::opentelemetry::metrics::v1::Metric;
|
||||
use regex::Regex;
|
||||
use session::protocol_ctx::{MetricType, OtlpMetricTranslationStrategy};
|
||||
|
||||
const UNDERSCORE: &str = "_";
|
||||
const DOUBLE_UNDERSCORE: &str = "__";
|
||||
const TOTAL: &str = "total";
|
||||
const RATIO: &str = "ratio";
|
||||
const PER_PREFIX: &str = "per_";
|
||||
|
||||
lazy_static! {
|
||||
static ref NON_ALPHA_NUM_CHAR: Regex = Regex::new(r"[^a-zA-Z0-9]").unwrap();
|
||||
static ref UNIT_MAP: HashMap<String, String> = [
|
||||
// Time
|
||||
("d", "days"),
|
||||
("h", "hours"),
|
||||
("min", "minutes"),
|
||||
("s", "seconds"),
|
||||
("ms", "milliseconds"),
|
||||
("us", "microseconds"),
|
||||
("ns", "nanoseconds"),
|
||||
// Bytes
|
||||
("By", "bytes"),
|
||||
("KiBy", "kibibytes"),
|
||||
("MiBy", "mebibytes"),
|
||||
("GiBy", "gibibytes"),
|
||||
("TiBy", "tibibytes"),
|
||||
("KBy", "kilobytes"),
|
||||
("MBy", "megabytes"),
|
||||
("GBy", "gigabytes"),
|
||||
("TBy", "terabytes"),
|
||||
// SI
|
||||
("m", "meters"),
|
||||
("V", "volts"),
|
||||
("A", "amperes"),
|
||||
("J", "joules"),
|
||||
("W", "watts"),
|
||||
("g", "grams"),
|
||||
// Misc
|
||||
("Cel", "celsius"),
|
||||
("Hz", "hertz"),
|
||||
("1", ""),
|
||||
("%", "percent"),
|
||||
]
|
||||
.iter()
|
||||
.map(|(k, v)| (k.to_string(), v.to_string()))
|
||||
.collect();
|
||||
static ref PER_UNIT_MAP: HashMap<String, String> = [
|
||||
("s", "second"),
|
||||
("m", "minute"),
|
||||
("h", "hour"),
|
||||
("d", "day"),
|
||||
("w", "week"),
|
||||
("mo", "month"),
|
||||
("y", "year"),
|
||||
]
|
||||
.iter()
|
||||
.map(|(k, v)| (k.to_string(), v.to_string()))
|
||||
.collect();
|
||||
}
|
||||
|
||||
pub fn translate_metric_name(
|
||||
metric: &Metric,
|
||||
metric_type: &MetricType,
|
||||
strategy: OtlpMetricTranslationStrategy,
|
||||
) -> String {
|
||||
match (strategy.should_escape(), strategy.should_add_suffixes()) {
|
||||
(true, true) => normalize_metric_name(metric, metric_type),
|
||||
(true, false) => normalize_metric_name_without_suffixes(&metric.name),
|
||||
(false, true) => build_utf8_metric_name(&metric.name, &metric.unit, metric_type),
|
||||
(false, false) => metric.name.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn translate_label_name(name: &str, strategy: OtlpMetricTranslationStrategy) -> String {
|
||||
if strategy.should_escape() {
|
||||
normalize_label_name(name)
|
||||
} else {
|
||||
name.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/145942706622aba5c276ca47f48df438228bfea4/pkg/translator/prometheus/normalize_name.go#L55
|
||||
pub fn normalize_metric_name(metric: &Metric, metric_type: &MetricType) -> String {
|
||||
normalize_metric_name_with_suffixes(&metric.name, &metric.unit, metric_type)
|
||||
}
|
||||
|
||||
fn normalize_metric_name_with_suffixes(name: &str, unit: &str, metric_type: &MetricType) -> String {
|
||||
let mut name_tokens = metric_name_tokens(name);
|
||||
|
||||
if !unit.is_empty() {
|
||||
let (main, per) = build_clean_unit_suffix(unit);
|
||||
if let Some(main) = main
|
||||
&& !name_tokens.contains(&main)
|
||||
{
|
||||
name_tokens.push(main);
|
||||
}
|
||||
if let Some(per) = per
|
||||
&& !name_tokens.contains(&per)
|
||||
{
|
||||
name_tokens.push("per".to_string());
|
||||
name_tokens.push(per);
|
||||
}
|
||||
}
|
||||
|
||||
if matches!(metric_type, MetricType::MonotonicSum) {
|
||||
name_tokens.retain(|t| t != TOTAL);
|
||||
name_tokens.push(TOTAL.to_string());
|
||||
}
|
||||
|
||||
if unit == "1" && matches!(metric_type, MetricType::Gauge) {
|
||||
name_tokens.retain(|t| t != RATIO);
|
||||
name_tokens.push(RATIO.to_string());
|
||||
}
|
||||
|
||||
prefix_digit_metric_name(name_tokens.join(UNDERSCORE))
|
||||
}
|
||||
|
||||
fn normalize_metric_name_without_suffixes(name: &str) -> String {
|
||||
prefix_digit_metric_name(metric_name_tokens(name).join(UNDERSCORE))
|
||||
}
|
||||
|
||||
fn metric_name_tokens(name: &str) -> Vec<String> {
|
||||
NON_ALPHA_NUM_CHAR
|
||||
.split(name)
|
||||
.filter_map(|s| {
|
||||
let trimmed = s.trim();
|
||||
if trimmed.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(trimmed.to_string())
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn prefix_digit_metric_name(name: String) -> String {
|
||||
if let Some((_, first)) = name.char_indices().next()
|
||||
&& first.is_ascii_digit()
|
||||
{
|
||||
format!("_{}", name)
|
||||
} else {
|
||||
name
|
||||
}
|
||||
}
|
||||
|
||||
fn build_utf8_metric_name(input_name: &str, unit: &str, metric_type: &MetricType) -> String {
|
||||
let mut name = input_name.to_string();
|
||||
|
||||
let append_ratio = unit == "1" && matches!(metric_type, MetricType::Gauge);
|
||||
if append_ratio {
|
||||
name = trim_suffix_and_delimiter(&name, RATIO);
|
||||
}
|
||||
|
||||
let append_total = matches!(metric_type, MetricType::MonotonicSum);
|
||||
if append_total {
|
||||
name = trim_suffix_and_delimiter(&name, TOTAL);
|
||||
}
|
||||
|
||||
let (main_unit_suffix, per_unit_suffix) = build_unit_suffixes(unit);
|
||||
let append_per = !per_unit_suffix.is_empty();
|
||||
if append_per {
|
||||
name = trim_suffix_and_delimiter(&name, &per_unit_suffix);
|
||||
}
|
||||
|
||||
if !main_unit_suffix.is_empty() && !name.ends_with(&main_unit_suffix) {
|
||||
name.push('_');
|
||||
name.push_str(&main_unit_suffix);
|
||||
}
|
||||
if append_per {
|
||||
name.push('_');
|
||||
name.push_str(&per_unit_suffix);
|
||||
}
|
||||
if append_total {
|
||||
name.push_str("_total");
|
||||
}
|
||||
if append_ratio {
|
||||
name.push_str("_ratio");
|
||||
}
|
||||
|
||||
name
|
||||
}
|
||||
|
||||
fn trim_suffix_and_delimiter(name: &str, suffix: &str) -> String {
|
||||
name.strip_suffix(suffix)
|
||||
.and_then(|prefix| prefix.strip_suffix('_'))
|
||||
.filter(|prefix| !prefix.is_empty())
|
||||
.unwrap_or(name)
|
||||
.to_string()
|
||||
}
|
||||
|
||||
fn build_clean_unit_suffix(unit: &str) -> (Option<String>, Option<String>) {
|
||||
let (main, per) = build_unit_suffixes(unit);
|
||||
let main = clean_unit_name(&main);
|
||||
let per = per
|
||||
.strip_prefix(PER_PREFIX)
|
||||
.map(clean_unit_name)
|
||||
.unwrap_or_default();
|
||||
|
||||
(
|
||||
(!main.is_empty()).then_some(main),
|
||||
(!per.is_empty()).then_some(per),
|
||||
)
|
||||
}
|
||||
|
||||
fn build_unit_suffixes(unit: &str) -> (String, String) {
|
||||
let (main, per) = unit.split_once('/').unwrap_or((unit, ""));
|
||||
let main_unit_suffix = unit_suffix(main, &UNIT_MAP);
|
||||
let per_unit_suffix = unit_suffix(per, &PER_UNIT_MAP);
|
||||
|
||||
if per_unit_suffix.is_empty() {
|
||||
(main_unit_suffix, per_unit_suffix)
|
||||
} else {
|
||||
(main_unit_suffix, format!("{PER_PREFIX}{per_unit_suffix}"))
|
||||
}
|
||||
}
|
||||
|
||||
fn unit_suffix(unit_str: &str, unit_map: &HashMap<String, String>) -> String {
|
||||
let unit = unit_str.trim();
|
||||
if unit.is_empty() || unit.contains('{') || unit.contains('}') {
|
||||
return String::new();
|
||||
}
|
||||
|
||||
unit_map
|
||||
.get(unit)
|
||||
.map(|s| s.as_ref())
|
||||
.unwrap_or(unit)
|
||||
.to_string()
|
||||
}
|
||||
|
||||
pub(crate) fn clean_unit_name(name: &str) -> String {
|
||||
NON_ALPHA_NUM_CHAR
|
||||
.split(name)
|
||||
.filter(|s| !s.is_empty())
|
||||
.collect::<Vec<&str>>()
|
||||
.join(UNDERSCORE)
|
||||
.trim_matches('_')
|
||||
.to_string()
|
||||
}
|
||||
|
||||
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/145942706622aba5c276ca47f48df438228bfea4/pkg/translator/prometheus/normalize_label.go#L27
|
||||
pub fn normalize_label_name(name: &str) -> String {
|
||||
if name.is_empty() {
|
||||
return name.to_string();
|
||||
}
|
||||
|
||||
let n = NON_ALPHA_NUM_CHAR.replace_all(name, UNDERSCORE);
|
||||
if let Some((_, first)) = n.char_indices().next()
|
||||
&& first.is_ascii_digit()
|
||||
{
|
||||
return format!("key_{}", n);
|
||||
}
|
||||
if n.starts_with(UNDERSCORE) && !n.starts_with(DOUBLE_UNDERSCORE) {
|
||||
return format!("key{}", n);
|
||||
}
|
||||
n.to_string()
|
||||
}
|
||||
|
||||
/// Normalize otlp instrumentation, metric and attribute names
|
||||
///
|
||||
/// <https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#instrument-name-syntax>
|
||||
/// - since the name are case-insensitive, we transform them to lowercase for
|
||||
/// better sql usability
|
||||
/// - replace `.` and `-` with `_`
|
||||
pub fn legacy_normalize_otlp_name(name: &str) -> String {
|
||||
name.to_lowercase().replace(['.', '-'], "_")
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use otel_arrow_rust::proto::opentelemetry::metrics::v1::Metric;
|
||||
use session::protocol_ctx::OtlpMetricTranslationStrategy::{
|
||||
NoTranslation, NoUtf8EscapingWithSuffixes, UnderscoreEscapingWithSuffixes,
|
||||
UnderscoreEscapingWithoutSuffixes,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_legacy_normalize_otlp_name() {
|
||||
assert_eq!(
|
||||
legacy_normalize_otlp_name("jvm.memory.free"),
|
||||
"jvm_memory_free"
|
||||
);
|
||||
assert_eq!(
|
||||
legacy_normalize_otlp_name("jvm-memory-free"),
|
||||
"jvm_memory_free"
|
||||
);
|
||||
assert_eq!(
|
||||
legacy_normalize_otlp_name("jvm_memory_free"),
|
||||
"jvm_memory_free"
|
||||
);
|
||||
assert_eq!(
|
||||
legacy_normalize_otlp_name("JVM_MEMORY_FREE"),
|
||||
"jvm_memory_free"
|
||||
);
|
||||
assert_eq!(
|
||||
legacy_normalize_otlp_name("JVM_memory_FREE"),
|
||||
"jvm_memory_free"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_translate_metric_name_strategies() {
|
||||
let metric = Metric {
|
||||
name: "http.server.duration_total".to_string(),
|
||||
unit: "s".to_string(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
translate_metric_name(
|
||||
&metric,
|
||||
&MetricType::MonotonicSum,
|
||||
UnderscoreEscapingWithSuffixes
|
||||
),
|
||||
"http_server_duration_seconds_total"
|
||||
);
|
||||
assert_eq!(
|
||||
translate_metric_name(
|
||||
&metric,
|
||||
&MetricType::MonotonicSum,
|
||||
UnderscoreEscapingWithoutSuffixes,
|
||||
),
|
||||
"http_server_duration_total"
|
||||
);
|
||||
assert_eq!(
|
||||
translate_metric_name(
|
||||
&metric,
|
||||
&MetricType::MonotonicSum,
|
||||
NoUtf8EscapingWithSuffixes
|
||||
),
|
||||
"http.server.duration_seconds_total"
|
||||
);
|
||||
assert_eq!(
|
||||
translate_metric_name(&metric, &MetricType::MonotonicSum, NoTranslation),
|
||||
"http.server.duration_total"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_translate_metric_name_no_utf8_suffix_ordering() {
|
||||
let metric = Metric {
|
||||
name: "request.rate_per_second_total".to_string(),
|
||||
unit: "1/s".to_string(),
|
||||
..Default::default()
|
||||
};
|
||||
assert_eq!(
|
||||
translate_metric_name(
|
||||
&metric,
|
||||
&MetricType::MonotonicSum,
|
||||
NoUtf8EscapingWithSuffixes
|
||||
),
|
||||
"request.rate_per_second_total"
|
||||
);
|
||||
|
||||
let metric = Metric {
|
||||
name: "cpu.utilization_ratio".to_string(),
|
||||
unit: "1".to_string(),
|
||||
..Default::default()
|
||||
};
|
||||
assert_eq!(
|
||||
translate_metric_name(&metric, &MetricType::Gauge, NoUtf8EscapingWithSuffixes),
|
||||
"cpu.utilization_ratio"
|
||||
);
|
||||
|
||||
let metric = Metric {
|
||||
name: "subtotal".to_string(),
|
||||
..Default::default()
|
||||
};
|
||||
assert_eq!(
|
||||
translate_metric_name(
|
||||
&metric,
|
||||
&MetricType::MonotonicSum,
|
||||
NoUtf8EscapingWithSuffixes
|
||||
),
|
||||
"subtotal_total"
|
||||
);
|
||||
|
||||
let metric = Metric {
|
||||
name: "utilizationratio".to_string(),
|
||||
unit: "1".to_string(),
|
||||
..Default::default()
|
||||
};
|
||||
assert_eq!(
|
||||
translate_metric_name(&metric, &MetricType::Gauge, NoUtf8EscapingWithSuffixes),
|
||||
"utilizationratio_ratio"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_translate_metric_name_prometheus_style_units_for_all_strategies() {
|
||||
let cases = [
|
||||
(
|
||||
Metric {
|
||||
name: "duration.latency".to_string(),
|
||||
unit: "ms".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::Gauge,
|
||||
[
|
||||
(
|
||||
UnderscoreEscapingWithSuffixes,
|
||||
"duration_latency_milliseconds",
|
||||
),
|
||||
(UnderscoreEscapingWithoutSuffixes, "duration_latency"),
|
||||
(NoUtf8EscapingWithSuffixes, "duration.latency_milliseconds"),
|
||||
(NoTranslation, "duration.latency"),
|
||||
],
|
||||
),
|
||||
(
|
||||
Metric {
|
||||
name: "disk.io".to_string(),
|
||||
unit: "By".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::MonotonicSum,
|
||||
[
|
||||
(UnderscoreEscapingWithSuffixes, "disk_io_bytes_total"),
|
||||
(UnderscoreEscapingWithoutSuffixes, "disk_io"),
|
||||
(NoUtf8EscapingWithSuffixes, "disk.io_bytes_total"),
|
||||
(NoTranslation, "disk.io"),
|
||||
],
|
||||
),
|
||||
(
|
||||
Metric {
|
||||
name: "cpu.utilization".to_string(),
|
||||
unit: "%".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::Gauge,
|
||||
[
|
||||
(UnderscoreEscapingWithSuffixes, "cpu_utilization_percent"),
|
||||
(UnderscoreEscapingWithoutSuffixes, "cpu_utilization"),
|
||||
(NoUtf8EscapingWithSuffixes, "cpu.utilization_percent"),
|
||||
(NoTranslation, "cpu.utilization"),
|
||||
],
|
||||
),
|
||||
(
|
||||
Metric {
|
||||
name: "request.rate".to_string(),
|
||||
unit: "1/s".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::MonotonicSum,
|
||||
[
|
||||
(
|
||||
UnderscoreEscapingWithSuffixes,
|
||||
"request_rate_per_second_total",
|
||||
),
|
||||
(UnderscoreEscapingWithoutSuffixes, "request_rate"),
|
||||
(NoUtf8EscapingWithSuffixes, "request.rate_per_second_total"),
|
||||
(NoTranslation, "request.rate"),
|
||||
],
|
||||
),
|
||||
(
|
||||
Metric {
|
||||
name: "queue.depth".to_string(),
|
||||
unit: "{items}".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
MetricType::Gauge,
|
||||
[
|
||||
(UnderscoreEscapingWithSuffixes, "queue_depth"),
|
||||
(UnderscoreEscapingWithoutSuffixes, "queue_depth"),
|
||||
(NoUtf8EscapingWithSuffixes, "queue.depth"),
|
||||
(NoTranslation, "queue.depth"),
|
||||
],
|
||||
),
|
||||
];
|
||||
|
||||
for (metric, metric_type, expectations) in cases {
|
||||
for (strategy, expected) in expectations {
|
||||
assert_eq!(
|
||||
translate_metric_name(&metric, &metric_type, strategy),
|
||||
expected,
|
||||
"metric: {}, unit: {}, type: {:?}, strategy: {:?}",
|
||||
metric.name,
|
||||
metric.unit,
|
||||
metric_type,
|
||||
strategy
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_translate_label_name_strategies() {
|
||||
assert_eq!(
|
||||
translate_label_name("service.name", UnderscoreEscapingWithSuffixes),
|
||||
"service_name"
|
||||
);
|
||||
assert_eq!(
|
||||
translate_label_name("_foo", UnderscoreEscapingWithoutSuffixes),
|
||||
"key_foo"
|
||||
);
|
||||
assert_eq!(
|
||||
translate_label_name("service.name", NoUtf8EscapingWithSuffixes),
|
||||
"service.name"
|
||||
);
|
||||
assert_eq!(translate_label_name("_foo", NoTranslation), "_foo");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_clean_unit_name() {
|
||||
assert_eq!(clean_unit_name("faults"), "faults");
|
||||
assert_eq!(clean_unit_name("{faults}"), "faults");
|
||||
assert_eq!(clean_unit_name("req/sec"), "req_sec");
|
||||
assert_eq!(clean_unit_name("m/s"), "m_s");
|
||||
assert_eq!(clean_unit_name("___test___"), "test");
|
||||
assert_eq!(
|
||||
clean_unit_name("multiple__underscores"),
|
||||
"multiple_underscores"
|
||||
);
|
||||
assert_eq!(clean_unit_name(""), "");
|
||||
assert_eq!(clean_unit_name("___"), "");
|
||||
assert_eq!(clean_unit_name("bytes.per.second"), "bytes_per_second");
|
||||
}
|
||||
}
|
||||
@@ -55,6 +55,7 @@ pub struct OtlpMetricCtx {
|
||||
pub with_metric_engine: bool,
|
||||
pub is_legacy: bool,
|
||||
pub metric_type: MetricType,
|
||||
pub metric_translation_strategy: OtlpMetricTranslationStrategy,
|
||||
}
|
||||
|
||||
impl OtlpMetricCtx {
|
||||
@@ -75,3 +76,64 @@ pub enum MetricType {
|
||||
ExponentialHistogram,
|
||||
Summary,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
|
||||
pub enum OtlpMetricTranslationStrategy {
|
||||
#[default]
|
||||
UnderscoreEscapingWithSuffixes,
|
||||
UnderscoreEscapingWithoutSuffixes,
|
||||
NoUtf8EscapingWithSuffixes,
|
||||
NoTranslation,
|
||||
}
|
||||
|
||||
impl OtlpMetricTranslationStrategy {
|
||||
pub const VALUES: [&'static str; 4] = [
|
||||
"UnderscoreEscapingWithSuffixes",
|
||||
"UnderscoreEscapingWithoutSuffixes",
|
||||
"NoUTF8EscapingWithSuffixes",
|
||||
"NoTranslation",
|
||||
];
|
||||
|
||||
pub fn as_str(self) -> &'static str {
|
||||
match self {
|
||||
Self::UnderscoreEscapingWithSuffixes => "UnderscoreEscapingWithSuffixes",
|
||||
Self::UnderscoreEscapingWithoutSuffixes => "UnderscoreEscapingWithoutSuffixes",
|
||||
Self::NoUtf8EscapingWithSuffixes => "NoUTF8EscapingWithSuffixes",
|
||||
Self::NoTranslation => "NoTranslation",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn should_escape(self) -> bool {
|
||||
matches!(
|
||||
self,
|
||||
Self::UnderscoreEscapingWithSuffixes | Self::UnderscoreEscapingWithoutSuffixes
|
||||
)
|
||||
}
|
||||
|
||||
pub fn should_add_suffixes(self) -> bool {
|
||||
matches!(
|
||||
self,
|
||||
Self::UnderscoreEscapingWithSuffixes | Self::NoUtf8EscapingWithSuffixes
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::str::FromStr for OtlpMetricTranslationStrategy {
|
||||
type Err = ();
|
||||
|
||||
fn from_str(value: &str) -> Result<Self, Self::Err> {
|
||||
match value {
|
||||
"UnderscoreEscapingWithSuffixes" => Ok(Self::UnderscoreEscapingWithSuffixes),
|
||||
"UnderscoreEscapingWithoutSuffixes" => Ok(Self::UnderscoreEscapingWithoutSuffixes),
|
||||
"NoUTF8EscapingWithSuffixes" => Ok(Self::NoUtf8EscapingWithSuffixes),
|
||||
"NoTranslation" => Ok(Self::NoTranslation),
|
||||
_ => Err(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for OtlpMetricTranslationStrategy {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_str(self.as_str())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -144,6 +144,7 @@ macro_rules! http_tests {
|
||||
test_pipeline_index_options,
|
||||
|
||||
test_otlp_metrics_new,
|
||||
test_otlp_metric_translation_strategies,
|
||||
test_otlp_traces_v0,
|
||||
test_otlp_traces_v1,
|
||||
test_otlp_logs,
|
||||
@@ -5217,6 +5218,87 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_otlp_metric_translation_strategies(store_type: StorageType) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, mut guard) =
|
||||
setup_test_http_app_with_frontend(store_type, "test_otlp_metric_translation_strategies")
|
||||
.await;
|
||||
|
||||
let content = r#"
|
||||
{"resourceMetrics":[{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"strategy-service"}},{"key":"resource.attr","value":{"stringValue":"resource-a"}}]},"scopeMetrics":[{"scope":{"name":"test.scope","version":"1.0.0","attributes":[{"key":"scope.attr","value":{"stringValue":"scope-a"}}]},"schemaUrl":"https://example.com/schema","metrics":[{"name":"otel.strategy.duration","description":"strategy test metric","unit":"ms","sum":{"dataPoints":[{"attributes":[{"key":"data.point.label","value":{"stringValue":"duration-a"}}],"startTimeUnixNano":"1000000","timeUnixNano":"2000000","asDouble":42.0}],"aggregationTemporality":2,"isMonotonic":true}}]}]}]}
|
||||
"#;
|
||||
|
||||
let req: ExportMetricsServiceRequest = serde_json::from_str(content).unwrap();
|
||||
let body = req.encode_to_vec();
|
||||
let client = TestClient::new(app).await;
|
||||
|
||||
let res = send_req(
|
||||
&client,
|
||||
vec![
|
||||
(
|
||||
HeaderName::from_static("content-type"),
|
||||
HeaderValue::from_static("application/x-protobuf"),
|
||||
),
|
||||
(
|
||||
HeaderName::from_static("x-greptime-otlp-metric-translation-strategy"),
|
||||
HeaderValue::from_static("NoTranslation"),
|
||||
),
|
||||
(
|
||||
HeaderName::from_static("x-greptime-otlp-metric-promote-scope-attrs"),
|
||||
HeaderValue::from_static("true"),
|
||||
),
|
||||
(
|
||||
HeaderName::from_static("x-greptime-otlp-metric-promote-all-resource-attrs"),
|
||||
HeaderValue::from_static("true"),
|
||||
),
|
||||
],
|
||||
"/v1/otlp/v1/metrics",
|
||||
body.clone(),
|
||||
false,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
|
||||
let expected = "[[\"otel.strategy.duration\"]]";
|
||||
validate_data(
|
||||
"otlp_metric_translation_strategy_tables",
|
||||
&client,
|
||||
"select table_name from information_schema.tables where table_schema = 'public' and table_name = 'otel.strategy.duration';",
|
||||
expected,
|
||||
)
|
||||
.await;
|
||||
|
||||
let expected = "[[42.0,\"duration-a\",\"scope-a\",\"resource-a\",\"strategy-service\"]]";
|
||||
validate_data(
|
||||
"otlp_metric_translation_strategy_ingestion",
|
||||
&client,
|
||||
"select greptime_value, \"data.point.label\", \"otel_scope_scope.attr\", \"resource.attr\", job from \"otel.strategy.duration\";",
|
||||
expected,
|
||||
)
|
||||
.await;
|
||||
|
||||
let res = send_req(
|
||||
&client,
|
||||
vec![
|
||||
(
|
||||
HeaderName::from_static("content-type"),
|
||||
HeaderValue::from_static("application/x-protobuf"),
|
||||
),
|
||||
(
|
||||
HeaderName::from_static("x-greptime-otlp-metric-translation-strategy"),
|
||||
HeaderValue::from_static("no_translation"),
|
||||
),
|
||||
],
|
||||
"/v1/otlp/v1/metrics",
|
||||
body,
|
||||
false,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(StatusCode::BAD_REQUEST, res.status());
|
||||
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_otlp_traces_v0(store_type: StorageType) {
|
||||
// init
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
Reference in New Issue
Block a user