diff --git a/src/servers/src/http/extractor.rs b/src/servers/src/http/extractor.rs index b4222fcfe0..e129ac6c1d 100644 --- a/src/servers/src/http/extractor.rs +++ b/src/servers/src/http/extractor.rs @@ -20,6 +20,7 @@ use axum::http::StatusCode; use axum::http::request::Parts; use http::HeaderMap; use pipeline::{GreptimePipelineParams, SelectInfo, truthy}; +use session::protocol_ctx::OtlpMetricTranslationStrategy; use crate::http::header::constants::{ GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, @@ -27,7 +28,8 @@ use crate::http::header::constants::{ GREPTIME_OTLP_METRIC_IGNORE_RESOURCE_ATTRS_HEADER_NAME, GREPTIME_OTLP_METRIC_PROMOTE_ALL_RESOURCE_ATTRS_HEADER_NAME, GREPTIME_OTLP_METRIC_PROMOTE_RESOURCE_ATTRS_HEADER_NAME, - GREPTIME_OTLP_METRIC_PROMOTE_SCOPE_ATTRS_HEADER_NAME, GREPTIME_PIPELINE_NAME_HEADER_NAME, + GREPTIME_OTLP_METRIC_PROMOTE_SCOPE_ATTRS_HEADER_NAME, + GREPTIME_OTLP_METRIC_TRANSLATION_STRATEGY_HEADER_NAME, GREPTIME_PIPELINE_NAME_HEADER_NAME, GREPTIME_PIPELINE_PARAMS_HEADER, GREPTIME_PIPELINE_VERSION_HEADER_NAME, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME, }; @@ -147,6 +149,9 @@ pub struct OtlpMetricOptions { /// Persist scope attributes to the table /// If false, persist none pub promote_scope_attrs: bool, + + /// Metric and label name translation strategy. + pub metric_translation_strategy: OtlpMetricTranslationStrategy, } impl FromRequestParts for OtlpMetricOptions @@ -181,14 +186,39 @@ where .map(truthy) .unwrap_or(false); + let metric_translation_strategy = string_value_from_header( + headers, + &[GREPTIME_OTLP_METRIC_TRANSLATION_STRATEGY_HEADER_NAME], + )? + .map(|value| parse_otlp_metric_translation_strategy(&value)) + .transpose()? + .unwrap_or_default(); + Ok(OtlpMetricOptions { promote_all_resource_attrs, resource_attrs, promote_scope_attrs, + metric_translation_strategy, }) } } +fn parse_otlp_metric_translation_strategy( + value: &str, +) -> Result { + value.parse().map_err(|_| { + ( + StatusCode::BAD_REQUEST, + format!( + "`{}` header value `{}` is invalid. Expected one of: {}.", + GREPTIME_OTLP_METRIC_TRANSLATION_STRATEGY_HEADER_NAME, + value, + OtlpMetricTranslationStrategy::VALUES.join(", ") + ), + ) + }) +} + #[inline] fn string_value_from_header( headers: &HeaderMap, @@ -208,3 +238,95 @@ fn string_value_from_header( Ok(None) } + +#[cfg(test)] +mod tests { + use axum::http::Request; + use session::protocol_ctx::OtlpMetricTranslationStrategy; + + use super::*; + + #[test] + fn test_parse_otlp_metric_translation_strategy() { + let cases = [ + ( + "UnderscoreEscapingWithSuffixes", + OtlpMetricTranslationStrategy::UnderscoreEscapingWithSuffixes, + ), + ( + "UnderscoreEscapingWithoutSuffixes", + OtlpMetricTranslationStrategy::UnderscoreEscapingWithoutSuffixes, + ), + ( + "NoUTF8EscapingWithSuffixes", + OtlpMetricTranslationStrategy::NoUtf8EscapingWithSuffixes, + ), + ( + "NoTranslation", + OtlpMetricTranslationStrategy::NoTranslation, + ), + ]; + + for (value, expected) in cases { + assert_eq!( + parse_otlp_metric_translation_strategy(value).unwrap(), + expected + ); + } + } + + #[test] + fn test_parse_otlp_metric_translation_strategy_rejects_invalid_value() { + let err = parse_otlp_metric_translation_strategy("no_translation").unwrap_err(); + assert_eq!(err.0, StatusCode::BAD_REQUEST); + assert!( + err.1 + .contains(GREPTIME_OTLP_METRIC_TRANSLATION_STRATEGY_HEADER_NAME) + ); + assert!(err.1.contains("no_translation")); + assert!(err.1.contains("NoTranslation")); + } + + #[tokio::test] + async fn test_otlp_metric_options_extracts_translation_strategy() { + let (mut parts, _) = Request::builder() + .header( + GREPTIME_OTLP_METRIC_TRANSLATION_STRATEGY_HEADER_NAME, + "NoTranslation", + ) + .body(()) + .unwrap() + .into_parts(); + + let opts = OtlpMetricOptions::from_request_parts(&mut parts, &()) + .await + .unwrap(); + assert_eq!( + opts.metric_translation_strategy, + OtlpMetricTranslationStrategy::NoTranslation + ); + } + + #[tokio::test] + async fn test_otlp_metric_options_rejects_invalid_translation_strategy() { + let (mut parts, _) = Request::builder() + .header( + GREPTIME_OTLP_METRIC_TRANSLATION_STRATEGY_HEADER_NAME, + "no_translation", + ) + .body(()) + .unwrap() + .into_parts(); + + let err = match OtlpMetricOptions::from_request_parts(&mut parts, &()).await { + Ok(_) => panic!("invalid metric translation strategy should be rejected"), + Err(err) => err, + }; + assert_eq!(err.0, StatusCode::BAD_REQUEST); + assert!( + err.1 + .contains(GREPTIME_OTLP_METRIC_TRANSLATION_STRATEGY_HEADER_NAME) + ); + assert!(err.1.contains("no_translation")); + } +} diff --git a/src/servers/src/http/header.rs b/src/servers/src/http/header.rs index 785901ad7b..1dc94bb76e 100644 --- a/src/servers/src/http/header.rs +++ b/src/servers/src/http/header.rs @@ -68,6 +68,8 @@ pub mod constants { "x-greptime-otlp-metric-ignore-resource-attrs"; pub const GREPTIME_OTLP_METRIC_PROMOTE_SCOPE_ATTRS_HEADER_NAME: &str = "x-greptime-otlp-metric-promote-scope-attrs"; + pub const GREPTIME_OTLP_METRIC_TRANSLATION_STRATEGY_HEADER_NAME: &str = + "x-greptime-otlp-metric-translation-strategy"; /// The header key that contains the pipeline params. pub const GREPTIME_PIPELINE_PARAMS_HEADER: &str = "x-greptime-pipeline-params"; diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index ba0007bc06..6540f6d4c4 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -114,6 +114,7 @@ pub async fn metrics( // set is_legacy later is_legacy: false, metric_type: MetricType::Init, + metric_translation_strategy: http_opts.metric_translation_strategy, })); let query_ctx = Arc::new(query_ctx); diff --git a/src/servers/src/otlp/metrics.rs b/src/servers/src/otlp/metrics.rs index d89cd3f277..4c72f665ee 100644 --- a/src/servers/src/otlp/metrics.rs +++ b/src/servers/src/otlp/metrics.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use ahash::{HashMap, HashSet}; +use ahash::HashSet; use api::v1::{RowInsertRequests, Value}; use common_grpc::precision::Precision; use common_query::prelude::{GREPTIME_COUNT, greptime_timestamp, greptime_value}; @@ -20,13 +20,17 @@ use lazy_static::lazy_static; use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest; use otel_arrow_rust::proto::opentelemetry::common::v1::{AnyValue, KeyValue, any_value}; use otel_arrow_rust::proto::opentelemetry::metrics::v1::{metric, number_data_point, *}; -use regex::Regex; use session::protocol_ctx::{MetricType, OtlpMetricCtx}; use crate::error::Result; use crate::otlp::trace::{KEY_SERVICE_INSTANCE_ID, KEY_SERVICE_NAME}; use crate::row_writer::{self, MultiTableData, TableData}; +mod translator; + +pub use translator::legacy_normalize_otlp_name; +use translator::{translate_label_name, translate_metric_name}; + /// the default column count for table writer const APPROXIMATE_COLUMN_COUNT: usize = 8; @@ -36,11 +40,6 @@ const SUM_TABLE_SUFFIX: &str = "_sum"; const JOB_KEY: &str = "job"; const INSTANCE_KEY: &str = "instance"; -const UNDERSCORE: &str = "_"; -const DOUBLE_UNDERSCORE: &str = "__"; -const TOTAL: &str = "total"; -const RATIO: &str = "ratio"; - // see: https://prometheus.io/docs/guides/opentelemetry/#promoting-resource-attributes const DEFAULT_PROMOTE_ATTRS: [&str; 19] = [ "service.instance.id", @@ -67,48 +66,6 @@ const DEFAULT_PROMOTE_ATTRS: [&str; 19] = [ lazy_static! { static ref DEFAULT_PROMOTE_ATTRS_SET: HashSet = HashSet::from_iter(DEFAULT_PROMOTE_ATTRS.iter().map(|s| s.to_string())); - static ref NON_ALPHA_NUM_CHAR: Regex = Regex::new(r"[^a-zA-Z0-9]").unwrap(); - static ref UNIT_MAP: HashMap = [ - // Time - ("d", "days"), - ("h", "hours"), - ("min", "minutes"), - ("s", "seconds"), - ("ms", "milliseconds"), - ("us", "microseconds"), - ("ns", "nanoseconds"), - // Bytes - ("By", "bytes"), - ("KiBy", "kibibytes"), - ("MiBy", "mebibytes"), - ("GiBy", "gibibytes"), - ("TiBy", "tibibytes"), - ("KBy", "kilobytes"), - ("MBy", "megabytes"), - ("GBy", "gigabytes"), - ("TBy", "terabytes"), - // SI - ("m", "meters"), - ("V", "volts"), - ("A", "amperes"), - ("J", "joules"), - ("W", "watts"), - ("g", "grams"), - // Misc - ("Cel", "celsius"), - ("Hz", "hertz"), - ("1", ""), - ("%", "percent"), - ].iter().map(|(k, v)| (k.to_string(), v.to_string())).collect(); - static ref PER_UNIT_MAP: HashMap = [ - ("s", "second"), - ("m", "minute"), - ("h", "hour"), - ("d", "day"), - ("w", "week"), - ("mo", "month"), - ("y", "year"), - ].iter().map(|(k, v)| (k.to_string(), v.to_string())).collect(); } const OTEL_SCOPE_NAME: &str = "name"; @@ -248,120 +205,6 @@ fn process_scope_attrs(scope: &ScopeMetrics, metric_ctx: &OtlpMetricCtx) -> Opti }) } -// See https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/145942706622aba5c276ca47f48df438228bfea4/pkg/translator/prometheus/normalize_name.go#L55 -pub fn normalize_metric_name(metric: &Metric, metric_type: &MetricType) -> String { - // Split metric name in "tokens" (remove all non-alphanumeric), filtering out empty strings - let mut name_tokens: Vec = NON_ALPHA_NUM_CHAR - .split(&metric.name) - .filter_map(|s| { - let trimmed = s.trim(); - if trimmed.is_empty() { - None - } else { - Some(trimmed.to_string()) - } - }) - .collect(); - - // Append unit if it exists - if !metric.unit.is_empty() { - let (main, per) = build_unit_suffix(&metric.unit); - if let Some(main) = main - && !name_tokens.contains(&main) - { - name_tokens.push(main); - } - if let Some(per) = per - && !name_tokens.contains(&per) - { - name_tokens.push("per".to_string()); - name_tokens.push(per); - } - } - - // Append _total for Counters (monotonic sums) - if matches!(metric_type, MetricType::MonotonicSum) { - // Remove existing "total" tokens first, then append - name_tokens.retain(|t| t != TOTAL); - name_tokens.push(TOTAL.to_string()); - } - - // Append _ratio for metrics with unit "1" (gauges only) - if metric.unit == "1" && matches!(metric_type, MetricType::Gauge) { - // Remove existing "ratio" tokens first, then append - name_tokens.retain(|t| t != RATIO); - name_tokens.push(RATIO.to_string()); - } - - // Build the string from the tokens, separated with underscores - let name = name_tokens.join(UNDERSCORE); - - // Metric name cannot start with a digit, so prefix it with "_" in this case - if let Some((_, first)) = name.char_indices().next() - && first.is_ascii_digit() - { - format!("_{}", name) - } else { - name - } -} - -fn build_unit_suffix(unit: &str) -> (Option, Option) { - let (main, per) = unit.split_once('/').unwrap_or((unit, "")); - (check_unit(main, &UNIT_MAP), check_unit(per, &PER_UNIT_MAP)) -} - -fn check_unit(unit_str: &str, unit_map: &HashMap) -> Option { - let u = unit_str.trim(); - // Skip units that are empty, contain "{" or "}" characters - if !u.is_empty() && !u.contains('{') && !u.contains('}') { - let u = unit_map.get(u).map(|s| s.as_ref()).unwrap_or(u); - let u = clean_unit_name(u); - if !u.is_empty() { - return Some(u); - } - } - None -} - -fn clean_unit_name(name: &str) -> String { - // Split on non-alphanumeric characters, filter out empty strings, then join with underscores - // This matches the Go implementation: strings.FieldsFunc + strings.Join - NON_ALPHA_NUM_CHAR - .split(name) - .filter(|s| !s.is_empty()) - .collect::>() - .join(UNDERSCORE) -} - -// See https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/145942706622aba5c276ca47f48df438228bfea4/pkg/translator/prometheus/normalize_label.go#L27 -pub fn normalize_label_name(name: &str) -> String { - if name.is_empty() { - return name.to_string(); - } - - let n = NON_ALPHA_NUM_CHAR.replace_all(name, UNDERSCORE); - if let Some((_, first)) = n.char_indices().next() - && first.is_ascii_digit() - { - return format!("key_{}", n); - } - if n.starts_with(UNDERSCORE) && !n.starts_with(DOUBLE_UNDERSCORE) { - return format!("key{}", n); - } - n.to_string() -} - -/// Normalize otlp instrumentation, metric and attribute names -/// -/// -/// - since the name are case-insensitive, we transform them to lowercase for -/// better sql usability -/// - replace `.` and `-` with `_` -pub fn legacy_normalize_otlp_name(name: &str) -> String { - name.to_lowercase().replace(['.', '-'], "_") -} - fn encode_metrics( table_writer: &mut MultiTableData, metric: &Metric, @@ -372,7 +215,11 @@ fn encode_metrics( let name = if metric_ctx.is_legacy { legacy_normalize_otlp_name(&metric.name) } else { - normalize_metric_name(metric, &metric_ctx.metric_type) + translate_metric_name( + metric, + &metric_ctx.metric_type, + metric_ctx.metric_translation_strategy, + ) }; // note that we don't store description or unit, we might want to deal with @@ -440,6 +287,7 @@ fn write_attributes( row: &mut Vec, attrs: Option<&Vec>, attribute_type: AttributeType, + metric_ctx: &OtlpMetricCtx, ) -> Result<()> { let Some(attrs) = attrs else { return Ok(()); @@ -452,10 +300,13 @@ fn write_attributes( .and_then(|val| { let key = match attribute_type { AttributeType::Resource | AttributeType::DataPoint => { - normalize_label_name(&attr.key) + translate_label_name(&attr.key, metric_ctx.metric_translation_strategy) } AttributeType::Scope => { - format!("otel_scope_{}", normalize_label_name(&attr.key)) + format!( + "otel_scope_{}", + translate_label_name(&attr.key, metric_ctx.metric_translation_strategy) + ) } AttributeType::Legacy => legacy_normalize_otlp_name(&attr.key), }; @@ -526,14 +377,38 @@ fn write_tags_and_timestamp( metric_ctx: &OtlpMetricCtx, ) -> Result<()> { if metric_ctx.is_legacy { - write_attributes(table, row, resource_attrs, AttributeType::Legacy)?; - write_attributes(table, row, scope_attrs, AttributeType::Legacy)?; - write_attributes(table, row, data_point_attrs, AttributeType::Legacy)?; + write_attributes( + table, + row, + resource_attrs, + AttributeType::Legacy, + metric_ctx, + )?; + write_attributes(table, row, scope_attrs, AttributeType::Legacy, metric_ctx)?; + write_attributes( + table, + row, + data_point_attrs, + AttributeType::Legacy, + metric_ctx, + )?; } else { // TODO(shuiyisong): check `__type__` and `__unit__` tags in prometheus - write_attributes(table, row, resource_attrs, AttributeType::Resource)?; - write_attributes(table, row, scope_attrs, AttributeType::Scope)?; - write_attributes(table, row, data_point_attrs, AttributeType::DataPoint)?; + write_attributes( + table, + row, + resource_attrs, + AttributeType::Resource, + metric_ctx, + )?; + write_attributes(table, row, scope_attrs, AttributeType::Scope, metric_ctx)?; + write_attributes( + table, + row, + data_point_attrs, + AttributeType::DataPoint, + metric_ctx, + )?; } write_timestamp(table, row, timestamp_nanos, metric_ctx.is_legacy)?; @@ -880,570 +755,6 @@ mod tests { use super::*; - #[test] - fn test_legacy_normalize_otlp_name() { - assert_eq!( - legacy_normalize_otlp_name("jvm.memory.free"), - "jvm_memory_free" - ); - assert_eq!( - legacy_normalize_otlp_name("jvm-memory-free"), - "jvm_memory_free" - ); - assert_eq!( - legacy_normalize_otlp_name("jvm_memory_free"), - "jvm_memory_free" - ); - assert_eq!( - legacy_normalize_otlp_name("JVM_MEMORY_FREE"), - "jvm_memory_free" - ); - assert_eq!( - legacy_normalize_otlp_name("JVM_memory_FREE"), - "jvm_memory_free" - ); - } - - #[test] - fn test_normalize_metric_name() { - let test_cases = vec![ - // Default case - (Metric::default(), MetricType::Init, ""), - // Basic metric with just name - ( - Metric { - name: "foo".to_string(), - ..Default::default() - }, - MetricType::Init, - "foo", - ), - // Metric with unit "s" should append "seconds" - ( - Metric { - name: "foo".to_string(), - unit: "s".to_string(), - ..Default::default() - }, - MetricType::Init, - "foo_seconds", - ), - // Metric already ending with unit suffix should not duplicate - ( - Metric { - name: "foo_seconds".to_string(), - unit: "s".to_string(), - ..Default::default() - }, - MetricType::Init, - "foo_seconds", - ), - // Monotonic sum should append "total" - ( - Metric { - name: "foo".to_string(), - ..Default::default() - }, - MetricType::MonotonicSum, - "foo_total", - ), - // Metric already ending with "total" should not duplicate - ( - Metric { - name: "foo_total".to_string(), - ..Default::default() - }, - MetricType::MonotonicSum, - "foo_total", - ), - // Monotonic sum with unit should append both unit and "total" - ( - Metric { - name: "foo".to_string(), - unit: "s".to_string(), - ..Default::default() - }, - MetricType::MonotonicSum, - "foo_seconds_total", - ), - // Metric with unit suffix and monotonic sum - ( - Metric { - name: "foo_seconds".to_string(), - unit: "s".to_string(), - ..Default::default() - }, - MetricType::MonotonicSum, - "foo_seconds_total", - ), - // Metric already ending with "total" and has unit - ( - Metric { - name: "foo_total".to_string(), - unit: "s".to_string(), - ..Default::default() - }, - MetricType::MonotonicSum, - "foo_seconds_total", - ), - // Metric already ending with both unit and "total" - ( - Metric { - name: "foo_seconds_total".to_string(), - unit: "s".to_string(), - ..Default::default() - }, - MetricType::MonotonicSum, - "foo_seconds_total", - ), - // Metric with unusual order (total_seconds) should be normalized - ( - Metric { - name: "foo_total_seconds".to_string(), - unit: "s".to_string(), - ..Default::default() - }, - MetricType::MonotonicSum, - "foo_seconds_total", - ), - // Gauge with unit "1" should append "ratio" - ( - Metric { - name: "foo".to_string(), - unit: "1".to_string(), - ..Default::default() - }, - MetricType::Gauge, - "foo_ratio", - ), - // Complex unit like "m/s" should be converted to "meters_per_second" - ( - Metric { - name: "foo".to_string(), - unit: "m/s".to_string(), - ..Default::default() - }, - MetricType::Init, - "foo_meters_per_second", - ), - // Metric with partial unit match - ( - Metric { - name: "foo_second".to_string(), - unit: "m/s".to_string(), - ..Default::default() - }, - MetricType::Init, - "foo_second_meters", - ), - // Metric already containing the main unit - ( - Metric { - name: "foo_meters".to_string(), - unit: "m/s".to_string(), - ..Default::default() - }, - MetricType::Init, - "foo_meters_per_second", - ), - ]; - - for (metric, metric_type, expected) in test_cases { - let result = normalize_metric_name(&metric, &metric_type); - assert_eq!( - result, expected, - "Failed for metric name: '{}', unit: '{}', type: {:?}", - metric.name, metric.unit, metric_type - ); - } - } - - #[test] - fn test_normalize_metric_name_edge_cases() { - let test_cases = vec![ - // Edge case: name with multiple non-alphanumeric chars in a row - ( - Metric { - name: "foo--bar__baz".to_string(), - ..Default::default() - }, - MetricType::Init, - "foo_bar_baz", - ), - // Edge case: name starting and ending with non-alphanumeric - ( - Metric { - name: "-foo_bar-".to_string(), - ..Default::default() - }, - MetricType::Init, - "foo_bar", - ), - // Edge case: name with only special chars (should be empty) - ( - Metric { - name: "--___--".to_string(), - ..Default::default() - }, - MetricType::Init, - "", - ), - // Edge case: name starting with digit - ( - Metric { - name: "2xx_requests".to_string(), - ..Default::default() - }, - MetricType::Init, - "_2xx_requests", - ), - ]; - - for (metric, metric_type, expected) in test_cases { - let result = normalize_metric_name(&metric, &metric_type); - assert_eq!( - result, expected, - "Failed for metric name: '{}', unit: '{}', type: {:?}", - metric.name, metric.unit, metric_type - ); - } - } - - #[test] - fn test_normalize_label_name() { - let test_cases = vec![ - ("", ""), - ("foo", "foo"), - ("foo_bar/baz:abc", "foo_bar_baz_abc"), - ("1foo", "key_1foo"), - ("_foo", "key_foo"), - ("__bar", "__bar"), - ]; - - for (input, expected) in test_cases { - let result = normalize_label_name(input); - assert_eq!( - result, expected, - "unexpected result for input '{}'; got '{}'; want '{}'", - input, result, expected - ); - } - } - - #[test] - fn test_clean_unit_name() { - // Test the improved clean_unit_name function - assert_eq!(clean_unit_name("faults"), "faults"); - assert_eq!(clean_unit_name("{faults}"), "faults"); // clean_unit_name still processes braces internally - assert_eq!(clean_unit_name("req/sec"), "req_sec"); - assert_eq!(clean_unit_name("m/s"), "m_s"); - assert_eq!(clean_unit_name("___test___"), "test"); - assert_eq!( - clean_unit_name("multiple__underscores"), - "multiple_underscores" - ); - assert_eq!(clean_unit_name(""), ""); - assert_eq!(clean_unit_name("___"), ""); - assert_eq!(clean_unit_name("bytes.per.second"), "bytes_per_second"); - } - - #[test] - fn test_normalize_metric_name_braced_units() { - // Test that units with braces are rejected (not processed) - let test_cases = vec![ - ( - Metric { - name: "test.metric".to_string(), - unit: "{faults}".to_string(), - ..Default::default() - }, - MetricType::MonotonicSum, - "test_metric_total", // braced units are rejected, no unit suffix added - ), - ( - Metric { - name: "test.metric".to_string(), - unit: "{operations}".to_string(), - ..Default::default() - }, - MetricType::Gauge, - "test_metric", // braced units are rejected, no unit suffix added - ), - ( - Metric { - name: "test.metric".to_string(), - unit: "{}".to_string(), // empty braces should be ignored due to contains('{') || contains('}') - ..Default::default() - }, - MetricType::Gauge, - "test_metric", - ), - ( - Metric { - name: "test.metric".to_string(), - unit: "faults".to_string(), // no braces, should work normally - ..Default::default() - }, - MetricType::Gauge, - "test_metric_faults", - ), - ]; - - for (metric, metric_type, expected) in test_cases { - let result = normalize_metric_name(&metric, &metric_type); - assert_eq!( - result, expected, - "Failed for metric name: '{}', unit: '{}', type: {:?}. Got: '{}', Expected: '{}'", - metric.name, metric.unit, metric_type, result, expected - ); - } - } - - #[test] - fn test_normalize_metric_name_with_testdata() { - // Test cases extracted from real OTLP metrics data from testdata.txt - let test_cases = vec![ - // Basic system metrics with various units - ( - Metric { - name: "system.paging.faults".to_string(), - unit: "{faults}".to_string(), - ..Default::default() - }, - MetricType::MonotonicSum, - "system_paging_faults_total", // braced units are rejected, no unit suffix added - ), - ( - Metric { - name: "system.paging.operations".to_string(), - unit: "{operations}".to_string(), - ..Default::default() - }, - MetricType::MonotonicSum, - "system_paging_operations_total", // braced units are rejected, no unit suffix added - ), - ( - Metric { - name: "system.paging.usage".to_string(), - unit: "By".to_string(), - ..Default::default() - }, - MetricType::NonMonotonicSum, - "system_paging_usage_bytes", - ), - // Load average metrics - gauge with custom unit - ( - Metric { - name: "system.cpu.load_average.15m".to_string(), - unit: "{thread}".to_string(), - ..Default::default() - }, - MetricType::Gauge, - "system_cpu_load_average_15m", // braced units are rejected, no unit suffix added - ), - ( - Metric { - name: "system.cpu.load_average.1m".to_string(), - unit: "{thread}".to_string(), - ..Default::default() - }, - MetricType::Gauge, - "system_cpu_load_average_1m", // braced units are rejected, no unit suffix added - ), - // Disk I/O with bytes unit - ( - Metric { - name: "system.disk.io".to_string(), - unit: "By".to_string(), - ..Default::default() - }, - MetricType::MonotonicSum, - "system_disk_io_bytes_total", - ), - // Time-based metrics with seconds unit - ( - Metric { - name: "system.disk.io_time".to_string(), - unit: "s".to_string(), - ..Default::default() - }, - MetricType::MonotonicSum, - "system_disk_io_time_seconds_total", - ), - ( - Metric { - name: "system.disk.operation_time".to_string(), - unit: "s".to_string(), - ..Default::default() - }, - MetricType::MonotonicSum, - "system_disk_operation_time_seconds_total", - ), - // CPU time metric - ( - Metric { - name: "system.cpu.time".to_string(), - unit: "s".to_string(), - ..Default::default() - }, - MetricType::MonotonicSum, - "system_cpu_time_seconds_total", - ), - // Process counts - ( - Metric { - name: "system.processes.count".to_string(), - unit: "{processes}".to_string(), - ..Default::default() - }, - MetricType::NonMonotonicSum, - "system_processes_count", // braced units are rejected, no unit suffix added - ), - ( - Metric { - name: "system.processes.created".to_string(), - unit: "{processes}".to_string(), - ..Default::default() - }, - MetricType::MonotonicSum, - "system_processes_created_total", // braced units are rejected, no unit suffix added - ), - // Memory usage with bytes - ( - Metric { - name: "system.memory.usage".to_string(), - unit: "By".to_string(), - ..Default::default() - }, - MetricType::NonMonotonicSum, - "system_memory_usage_bytes", - ), - // Uptime as gauge - ( - Metric { - name: "system.uptime".to_string(), - unit: "s".to_string(), - ..Default::default() - }, - MetricType::Gauge, - "system_uptime_seconds", - ), - // Network metrics - ( - Metric { - name: "system.network.connections".to_string(), - unit: "{connections}".to_string(), - ..Default::default() - }, - MetricType::NonMonotonicSum, - "system_network_connections", // braced units are rejected, no unit suffix added - ), - ( - Metric { - name: "system.network.dropped".to_string(), - unit: "{packets}".to_string(), - ..Default::default() - }, - MetricType::MonotonicSum, - "system_network_dropped_total", // braced units are rejected, no unit suffix added - ), - ( - Metric { - name: "system.network.errors".to_string(), - unit: "{errors}".to_string(), - ..Default::default() - }, - MetricType::MonotonicSum, - "system_network_errors_total", // braced units are rejected, no unit suffix added - ), - ( - Metric { - name: "system.network.io".to_string(), - unit: "By".to_string(), - ..Default::default() - }, - MetricType::MonotonicSum, - "system_network_io_bytes_total", - ), - ( - Metric { - name: "system.network.packets".to_string(), - unit: "{packets}".to_string(), - ..Default::default() - }, - MetricType::MonotonicSum, - "system_network_packets_total", // braced units are rejected, no unit suffix added - ), - // Filesystem metrics - ( - Metric { - name: "system.filesystem.inodes.usage".to_string(), - unit: "{inodes}".to_string(), - ..Default::default() - }, - MetricType::NonMonotonicSum, - "system_filesystem_inodes_usage", // braced units are rejected, no unit suffix added - ), - ( - Metric { - name: "system.filesystem.usage".to_string(), - unit: "By".to_string(), - ..Default::default() - }, - MetricType::NonMonotonicSum, - "system_filesystem_usage_bytes", - ), - // Edge cases with special characters and numbers - ( - Metric { - name: "system.load.1".to_string(), - unit: "1".to_string(), - ..Default::default() - }, - MetricType::Gauge, - "system_load_1_ratio", - ), - ( - Metric { - name: "http.request.2xx".to_string(), - unit: "{requests}".to_string(), - ..Default::default() - }, - MetricType::MonotonicSum, - "http_request_2xx_total", // braced units are rejected, no unit suffix added - ), - // Metric with dots and underscores mixed - ( - Metric { - name: "jvm.memory.heap_usage".to_string(), - unit: "By".to_string(), - ..Default::default() - }, - MetricType::Gauge, - "jvm_memory_heap_usage_bytes", - ), - // Complex unit with per-second - ( - Metric { - name: "http.request.rate".to_string(), - unit: "1/s".to_string(), - ..Default::default() - }, - MetricType::Gauge, - "http_request_rate_per_second", - ), - ]; - - for (metric, metric_type, expected) in test_cases { - let result = normalize_metric_name(&metric, &metric_type); - assert_eq!( - result, expected, - "Failed for metric name: '{}', unit: '{}', type: {:?}. Got: '{}', Expected: '{}'", - metric.name, metric.unit, metric_type, result, expected - ); - } - } - fn keyvalue(key: &str, value: &str) -> KeyValue { KeyValue { key: key.into(), diff --git a/src/servers/src/otlp/metrics/translator.rs b/src/servers/src/otlp/metrics/translator.rs new file mode 100644 index 0000000000..bb929cd670 --- /dev/null +++ b/src/servers/src/otlp/metrics/translator.rs @@ -0,0 +1,535 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use ahash::HashMap; +use lazy_static::lazy_static; +use otel_arrow_rust::proto::opentelemetry::metrics::v1::Metric; +use regex::Regex; +use session::protocol_ctx::{MetricType, OtlpMetricTranslationStrategy}; + +const UNDERSCORE: &str = "_"; +const DOUBLE_UNDERSCORE: &str = "__"; +const TOTAL: &str = "total"; +const RATIO: &str = "ratio"; +const PER_PREFIX: &str = "per_"; + +lazy_static! { + static ref NON_ALPHA_NUM_CHAR: Regex = Regex::new(r"[^a-zA-Z0-9]").unwrap(); + static ref UNIT_MAP: HashMap = [ + // Time + ("d", "days"), + ("h", "hours"), + ("min", "minutes"), + ("s", "seconds"), + ("ms", "milliseconds"), + ("us", "microseconds"), + ("ns", "nanoseconds"), + // Bytes + ("By", "bytes"), + ("KiBy", "kibibytes"), + ("MiBy", "mebibytes"), + ("GiBy", "gibibytes"), + ("TiBy", "tibibytes"), + ("KBy", "kilobytes"), + ("MBy", "megabytes"), + ("GBy", "gigabytes"), + ("TBy", "terabytes"), + // SI + ("m", "meters"), + ("V", "volts"), + ("A", "amperes"), + ("J", "joules"), + ("W", "watts"), + ("g", "grams"), + // Misc + ("Cel", "celsius"), + ("Hz", "hertz"), + ("1", ""), + ("%", "percent"), + ] + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); + static ref PER_UNIT_MAP: HashMap = [ + ("s", "second"), + ("m", "minute"), + ("h", "hour"), + ("d", "day"), + ("w", "week"), + ("mo", "month"), + ("y", "year"), + ] + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); +} + +pub fn translate_metric_name( + metric: &Metric, + metric_type: &MetricType, + strategy: OtlpMetricTranslationStrategy, +) -> String { + match (strategy.should_escape(), strategy.should_add_suffixes()) { + (true, true) => normalize_metric_name(metric, metric_type), + (true, false) => normalize_metric_name_without_suffixes(&metric.name), + (false, true) => build_utf8_metric_name(&metric.name, &metric.unit, metric_type), + (false, false) => metric.name.clone(), + } +} + +pub fn translate_label_name(name: &str, strategy: OtlpMetricTranslationStrategy) -> String { + if strategy.should_escape() { + normalize_label_name(name) + } else { + name.to_string() + } +} + +// See https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/145942706622aba5c276ca47f48df438228bfea4/pkg/translator/prometheus/normalize_name.go#L55 +pub fn normalize_metric_name(metric: &Metric, metric_type: &MetricType) -> String { + normalize_metric_name_with_suffixes(&metric.name, &metric.unit, metric_type) +} + +fn normalize_metric_name_with_suffixes(name: &str, unit: &str, metric_type: &MetricType) -> String { + let mut name_tokens = metric_name_tokens(name); + + if !unit.is_empty() { + let (main, per) = build_clean_unit_suffix(unit); + if let Some(main) = main + && !name_tokens.contains(&main) + { + name_tokens.push(main); + } + if let Some(per) = per + && !name_tokens.contains(&per) + { + name_tokens.push("per".to_string()); + name_tokens.push(per); + } + } + + if matches!(metric_type, MetricType::MonotonicSum) { + name_tokens.retain(|t| t != TOTAL); + name_tokens.push(TOTAL.to_string()); + } + + if unit == "1" && matches!(metric_type, MetricType::Gauge) { + name_tokens.retain(|t| t != RATIO); + name_tokens.push(RATIO.to_string()); + } + + prefix_digit_metric_name(name_tokens.join(UNDERSCORE)) +} + +fn normalize_metric_name_without_suffixes(name: &str) -> String { + prefix_digit_metric_name(metric_name_tokens(name).join(UNDERSCORE)) +} + +fn metric_name_tokens(name: &str) -> Vec { + NON_ALPHA_NUM_CHAR + .split(name) + .filter_map(|s| { + let trimmed = s.trim(); + if trimmed.is_empty() { + None + } else { + Some(trimmed.to_string()) + } + }) + .collect() +} + +fn prefix_digit_metric_name(name: String) -> String { + if let Some((_, first)) = name.char_indices().next() + && first.is_ascii_digit() + { + format!("_{}", name) + } else { + name + } +} + +fn build_utf8_metric_name(input_name: &str, unit: &str, metric_type: &MetricType) -> String { + let mut name = input_name.to_string(); + + let append_ratio = unit == "1" && matches!(metric_type, MetricType::Gauge); + if append_ratio { + name = trim_suffix_and_delimiter(&name, RATIO); + } + + let append_total = matches!(metric_type, MetricType::MonotonicSum); + if append_total { + name = trim_suffix_and_delimiter(&name, TOTAL); + } + + let (main_unit_suffix, per_unit_suffix) = build_unit_suffixes(unit); + let append_per = !per_unit_suffix.is_empty(); + if append_per { + name = trim_suffix_and_delimiter(&name, &per_unit_suffix); + } + + if !main_unit_suffix.is_empty() && !name.ends_with(&main_unit_suffix) { + name.push('_'); + name.push_str(&main_unit_suffix); + } + if append_per { + name.push('_'); + name.push_str(&per_unit_suffix); + } + if append_total { + name.push_str("_total"); + } + if append_ratio { + name.push_str("_ratio"); + } + + name +} + +fn trim_suffix_and_delimiter(name: &str, suffix: &str) -> String { + name.strip_suffix(suffix) + .and_then(|prefix| prefix.strip_suffix('_')) + .filter(|prefix| !prefix.is_empty()) + .unwrap_or(name) + .to_string() +} + +fn build_clean_unit_suffix(unit: &str) -> (Option, Option) { + let (main, per) = build_unit_suffixes(unit); + let main = clean_unit_name(&main); + let per = per + .strip_prefix(PER_PREFIX) + .map(clean_unit_name) + .unwrap_or_default(); + + ( + (!main.is_empty()).then_some(main), + (!per.is_empty()).then_some(per), + ) +} + +fn build_unit_suffixes(unit: &str) -> (String, String) { + let (main, per) = unit.split_once('/').unwrap_or((unit, "")); + let main_unit_suffix = unit_suffix(main, &UNIT_MAP); + let per_unit_suffix = unit_suffix(per, &PER_UNIT_MAP); + + if per_unit_suffix.is_empty() { + (main_unit_suffix, per_unit_suffix) + } else { + (main_unit_suffix, format!("{PER_PREFIX}{per_unit_suffix}")) + } +} + +fn unit_suffix(unit_str: &str, unit_map: &HashMap) -> String { + let unit = unit_str.trim(); + if unit.is_empty() || unit.contains('{') || unit.contains('}') { + return String::new(); + } + + unit_map + .get(unit) + .map(|s| s.as_ref()) + .unwrap_or(unit) + .to_string() +} + +pub(crate) fn clean_unit_name(name: &str) -> String { + NON_ALPHA_NUM_CHAR + .split(name) + .filter(|s| !s.is_empty()) + .collect::>() + .join(UNDERSCORE) + .trim_matches('_') + .to_string() +} + +// See https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/145942706622aba5c276ca47f48df438228bfea4/pkg/translator/prometheus/normalize_label.go#L27 +pub fn normalize_label_name(name: &str) -> String { + if name.is_empty() { + return name.to_string(); + } + + let n = NON_ALPHA_NUM_CHAR.replace_all(name, UNDERSCORE); + if let Some((_, first)) = n.char_indices().next() + && first.is_ascii_digit() + { + return format!("key_{}", n); + } + if n.starts_with(UNDERSCORE) && !n.starts_with(DOUBLE_UNDERSCORE) { + return format!("key{}", n); + } + n.to_string() +} + +/// Normalize otlp instrumentation, metric and attribute names +/// +/// +/// - since the name are case-insensitive, we transform them to lowercase for +/// better sql usability +/// - replace `.` and `-` with `_` +pub fn legacy_normalize_otlp_name(name: &str) -> String { + name.to_lowercase().replace(['.', '-'], "_") +} + +#[cfg(test)] +mod tests { + use otel_arrow_rust::proto::opentelemetry::metrics::v1::Metric; + use session::protocol_ctx::OtlpMetricTranslationStrategy::{ + NoTranslation, NoUtf8EscapingWithSuffixes, UnderscoreEscapingWithSuffixes, + UnderscoreEscapingWithoutSuffixes, + }; + + use super::*; + + #[test] + fn test_legacy_normalize_otlp_name() { + assert_eq!( + legacy_normalize_otlp_name("jvm.memory.free"), + "jvm_memory_free" + ); + assert_eq!( + legacy_normalize_otlp_name("jvm-memory-free"), + "jvm_memory_free" + ); + assert_eq!( + legacy_normalize_otlp_name("jvm_memory_free"), + "jvm_memory_free" + ); + assert_eq!( + legacy_normalize_otlp_name("JVM_MEMORY_FREE"), + "jvm_memory_free" + ); + assert_eq!( + legacy_normalize_otlp_name("JVM_memory_FREE"), + "jvm_memory_free" + ); + } + + #[test] + fn test_translate_metric_name_strategies() { + let metric = Metric { + name: "http.server.duration_total".to_string(), + unit: "s".to_string(), + ..Default::default() + }; + + assert_eq!( + translate_metric_name( + &metric, + &MetricType::MonotonicSum, + UnderscoreEscapingWithSuffixes + ), + "http_server_duration_seconds_total" + ); + assert_eq!( + translate_metric_name( + &metric, + &MetricType::MonotonicSum, + UnderscoreEscapingWithoutSuffixes, + ), + "http_server_duration_total" + ); + assert_eq!( + translate_metric_name( + &metric, + &MetricType::MonotonicSum, + NoUtf8EscapingWithSuffixes + ), + "http.server.duration_seconds_total" + ); + assert_eq!( + translate_metric_name(&metric, &MetricType::MonotonicSum, NoTranslation), + "http.server.duration_total" + ); + } + + #[test] + fn test_translate_metric_name_no_utf8_suffix_ordering() { + let metric = Metric { + name: "request.rate_per_second_total".to_string(), + unit: "1/s".to_string(), + ..Default::default() + }; + assert_eq!( + translate_metric_name( + &metric, + &MetricType::MonotonicSum, + NoUtf8EscapingWithSuffixes + ), + "request.rate_per_second_total" + ); + + let metric = Metric { + name: "cpu.utilization_ratio".to_string(), + unit: "1".to_string(), + ..Default::default() + }; + assert_eq!( + translate_metric_name(&metric, &MetricType::Gauge, NoUtf8EscapingWithSuffixes), + "cpu.utilization_ratio" + ); + + let metric = Metric { + name: "subtotal".to_string(), + ..Default::default() + }; + assert_eq!( + translate_metric_name( + &metric, + &MetricType::MonotonicSum, + NoUtf8EscapingWithSuffixes + ), + "subtotal_total" + ); + + let metric = Metric { + name: "utilizationratio".to_string(), + unit: "1".to_string(), + ..Default::default() + }; + assert_eq!( + translate_metric_name(&metric, &MetricType::Gauge, NoUtf8EscapingWithSuffixes), + "utilizationratio_ratio" + ); + } + + #[test] + fn test_translate_metric_name_prometheus_style_units_for_all_strategies() { + let cases = [ + ( + Metric { + name: "duration.latency".to_string(), + unit: "ms".to_string(), + ..Default::default() + }, + MetricType::Gauge, + [ + ( + UnderscoreEscapingWithSuffixes, + "duration_latency_milliseconds", + ), + (UnderscoreEscapingWithoutSuffixes, "duration_latency"), + (NoUtf8EscapingWithSuffixes, "duration.latency_milliseconds"), + (NoTranslation, "duration.latency"), + ], + ), + ( + Metric { + name: "disk.io".to_string(), + unit: "By".to_string(), + ..Default::default() + }, + MetricType::MonotonicSum, + [ + (UnderscoreEscapingWithSuffixes, "disk_io_bytes_total"), + (UnderscoreEscapingWithoutSuffixes, "disk_io"), + (NoUtf8EscapingWithSuffixes, "disk.io_bytes_total"), + (NoTranslation, "disk.io"), + ], + ), + ( + Metric { + name: "cpu.utilization".to_string(), + unit: "%".to_string(), + ..Default::default() + }, + MetricType::Gauge, + [ + (UnderscoreEscapingWithSuffixes, "cpu_utilization_percent"), + (UnderscoreEscapingWithoutSuffixes, "cpu_utilization"), + (NoUtf8EscapingWithSuffixes, "cpu.utilization_percent"), + (NoTranslation, "cpu.utilization"), + ], + ), + ( + Metric { + name: "request.rate".to_string(), + unit: "1/s".to_string(), + ..Default::default() + }, + MetricType::MonotonicSum, + [ + ( + UnderscoreEscapingWithSuffixes, + "request_rate_per_second_total", + ), + (UnderscoreEscapingWithoutSuffixes, "request_rate"), + (NoUtf8EscapingWithSuffixes, "request.rate_per_second_total"), + (NoTranslation, "request.rate"), + ], + ), + ( + Metric { + name: "queue.depth".to_string(), + unit: "{items}".to_string(), + ..Default::default() + }, + MetricType::Gauge, + [ + (UnderscoreEscapingWithSuffixes, "queue_depth"), + (UnderscoreEscapingWithoutSuffixes, "queue_depth"), + (NoUtf8EscapingWithSuffixes, "queue.depth"), + (NoTranslation, "queue.depth"), + ], + ), + ]; + + for (metric, metric_type, expectations) in cases { + for (strategy, expected) in expectations { + assert_eq!( + translate_metric_name(&metric, &metric_type, strategy), + expected, + "metric: {}, unit: {}, type: {:?}, strategy: {:?}", + metric.name, + metric.unit, + metric_type, + strategy + ); + } + } + } + + #[test] + fn test_translate_label_name_strategies() { + assert_eq!( + translate_label_name("service.name", UnderscoreEscapingWithSuffixes), + "service_name" + ); + assert_eq!( + translate_label_name("_foo", UnderscoreEscapingWithoutSuffixes), + "key_foo" + ); + assert_eq!( + translate_label_name("service.name", NoUtf8EscapingWithSuffixes), + "service.name" + ); + assert_eq!(translate_label_name("_foo", NoTranslation), "_foo"); + } + + #[test] + fn test_clean_unit_name() { + assert_eq!(clean_unit_name("faults"), "faults"); + assert_eq!(clean_unit_name("{faults}"), "faults"); + assert_eq!(clean_unit_name("req/sec"), "req_sec"); + assert_eq!(clean_unit_name("m/s"), "m_s"); + assert_eq!(clean_unit_name("___test___"), "test"); + assert_eq!( + clean_unit_name("multiple__underscores"), + "multiple_underscores" + ); + assert_eq!(clean_unit_name(""), ""); + assert_eq!(clean_unit_name("___"), ""); + assert_eq!(clean_unit_name("bytes.per.second"), "bytes_per_second"); + } +} diff --git a/src/session/src/protocol_ctx.rs b/src/session/src/protocol_ctx.rs index 2cc1b86174..6e84761c2d 100644 --- a/src/session/src/protocol_ctx.rs +++ b/src/session/src/protocol_ctx.rs @@ -55,6 +55,7 @@ pub struct OtlpMetricCtx { pub with_metric_engine: bool, pub is_legacy: bool, pub metric_type: MetricType, + pub metric_translation_strategy: OtlpMetricTranslationStrategy, } impl OtlpMetricCtx { @@ -75,3 +76,64 @@ pub enum MetricType { ExponentialHistogram, Summary, } + +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub enum OtlpMetricTranslationStrategy { + #[default] + UnderscoreEscapingWithSuffixes, + UnderscoreEscapingWithoutSuffixes, + NoUtf8EscapingWithSuffixes, + NoTranslation, +} + +impl OtlpMetricTranslationStrategy { + pub const VALUES: [&'static str; 4] = [ + "UnderscoreEscapingWithSuffixes", + "UnderscoreEscapingWithoutSuffixes", + "NoUTF8EscapingWithSuffixes", + "NoTranslation", + ]; + + pub fn as_str(self) -> &'static str { + match self { + Self::UnderscoreEscapingWithSuffixes => "UnderscoreEscapingWithSuffixes", + Self::UnderscoreEscapingWithoutSuffixes => "UnderscoreEscapingWithoutSuffixes", + Self::NoUtf8EscapingWithSuffixes => "NoUTF8EscapingWithSuffixes", + Self::NoTranslation => "NoTranslation", + } + } + + pub fn should_escape(self) -> bool { + matches!( + self, + Self::UnderscoreEscapingWithSuffixes | Self::UnderscoreEscapingWithoutSuffixes + ) + } + + pub fn should_add_suffixes(self) -> bool { + matches!( + self, + Self::UnderscoreEscapingWithSuffixes | Self::NoUtf8EscapingWithSuffixes + ) + } +} + +impl std::str::FromStr for OtlpMetricTranslationStrategy { + type Err = (); + + fn from_str(value: &str) -> Result { + match value { + "UnderscoreEscapingWithSuffixes" => Ok(Self::UnderscoreEscapingWithSuffixes), + "UnderscoreEscapingWithoutSuffixes" => Ok(Self::UnderscoreEscapingWithoutSuffixes), + "NoUTF8EscapingWithSuffixes" => Ok(Self::NoUtf8EscapingWithSuffixes), + "NoTranslation" => Ok(Self::NoTranslation), + _ => Err(()), + } + } +} + +impl std::fmt::Display for OtlpMetricTranslationStrategy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) + } +} diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 66148ab2d2..4cdb4db56d 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -144,6 +144,7 @@ macro_rules! http_tests { test_pipeline_index_options, test_otlp_metrics_new, + test_otlp_metric_translation_strategies, test_otlp_traces_v0, test_otlp_traces_v1, test_otlp_logs, @@ -5217,6 +5218,87 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_otlp_metric_translation_strategies(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = + setup_test_http_app_with_frontend(store_type, "test_otlp_metric_translation_strategies") + .await; + + let content = r#" +{"resourceMetrics":[{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"strategy-service"}},{"key":"resource.attr","value":{"stringValue":"resource-a"}}]},"scopeMetrics":[{"scope":{"name":"test.scope","version":"1.0.0","attributes":[{"key":"scope.attr","value":{"stringValue":"scope-a"}}]},"schemaUrl":"https://example.com/schema","metrics":[{"name":"otel.strategy.duration","description":"strategy test metric","unit":"ms","sum":{"dataPoints":[{"attributes":[{"key":"data.point.label","value":{"stringValue":"duration-a"}}],"startTimeUnixNano":"1000000","timeUnixNano":"2000000","asDouble":42.0}],"aggregationTemporality":2,"isMonotonic":true}}]}]}]} + "#; + + let req: ExportMetricsServiceRequest = serde_json::from_str(content).unwrap(); + let body = req.encode_to_vec(); + let client = TestClient::new(app).await; + + let res = send_req( + &client, + vec![ + ( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/x-protobuf"), + ), + ( + HeaderName::from_static("x-greptime-otlp-metric-translation-strategy"), + HeaderValue::from_static("NoTranslation"), + ), + ( + HeaderName::from_static("x-greptime-otlp-metric-promote-scope-attrs"), + HeaderValue::from_static("true"), + ), + ( + HeaderName::from_static("x-greptime-otlp-metric-promote-all-resource-attrs"), + HeaderValue::from_static("true"), + ), + ], + "/v1/otlp/v1/metrics", + body.clone(), + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + let expected = "[[\"otel.strategy.duration\"]]"; + validate_data( + "otlp_metric_translation_strategy_tables", + &client, + "select table_name from information_schema.tables where table_schema = 'public' and table_name = 'otel.strategy.duration';", + expected, + ) + .await; + + let expected = "[[42.0,\"duration-a\",\"scope-a\",\"resource-a\",\"strategy-service\"]]"; + validate_data( + "otlp_metric_translation_strategy_ingestion", + &client, + "select greptime_value, \"data.point.label\", \"otel_scope_scope.attr\", \"resource.attr\", job from \"otel.strategy.duration\";", + expected, + ) + .await; + + let res = send_req( + &client, + vec![ + ( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/x-protobuf"), + ), + ( + HeaderName::from_static("x-greptime-otlp-metric-translation-strategy"), + HeaderValue::from_static("no_translation"), + ), + ], + "/v1/otlp/v1/metrics", + body, + false, + ) + .await; + assert_eq!(StatusCode::BAD_REQUEST, res.status()); + + guard.remove_all().await; +} + pub async fn test_otlp_traces_v0(store_type: StorageType) { // init common_telemetry::init_default_ut_logging();