diff --git a/src/servers/src/otlp/metrics.rs b/src/servers/src/otlp/metrics.rs index 4a9ba6e2fb..1e2f9241c2 100644 --- a/src/servers/src/otlp/metrics.rs +++ b/src/servers/src/otlp/metrics.rs @@ -16,7 +16,6 @@ use ahash::{HashMap, HashSet}; use api::v1::{RowInsertRequests, Value}; use common_grpc::precision::Precision; use common_query::prelude::{GREPTIME_COUNT, GREPTIME_TIMESTAMP, GREPTIME_VALUE}; -use itertools::Itertools; use lazy_static::lazy_static; use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest; use otel_arrow_rust::proto::opentelemetry::common::v1::{any_value, AnyValue, KeyValue}; @@ -251,10 +250,20 @@ fn process_scope_attrs(scope: &ScopeMetrics, metric_ctx: &OtlpMetricCtx) -> Opti // See https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/145942706622aba5c276ca47f48df438228bfea4/pkg/translator/prometheus/normalize_name.go#L55 pub fn normalize_metric_name(metric: &Metric, metric_type: &MetricType) -> String { - let mut name_tokens = NON_ALPHA_NUM_CHAR + // Split metric name in "tokens" (remove all non-alphanumeric), filtering out empty strings + let mut name_tokens: Vec = NON_ALPHA_NUM_CHAR .split(&metric.name) - .map(|s| s.to_string()) - .collect_vec(); + .filter_map(|s| { + let trimmed = s.trim(); + if trimmed.is_empty() { + None + } else { + Some(trimmed.to_string()) + } + }) + .collect(); + + // Append unit if it exists if !metric.unit.is_empty() { let (main, per) = build_unit_suffix(&metric.unit); if let Some(main) = main @@ -270,17 +279,24 @@ pub fn normalize_metric_name(metric: &Metric, metric_type: &MetricType) -> Strin } } + // Append _total for Counters (monotonic sums) if matches!(metric_type, MetricType::MonotonicSum) { + // Remove existing "total" tokens first, then append name_tokens.retain(|t| t != TOTAL); name_tokens.push(TOTAL.to_string()); } + + // Append _ratio for metrics with unit "1" (gauges only) if metric.unit == "1" && matches!(metric_type, MetricType::Gauge) { + // Remove existing "ratio" tokens first, then append name_tokens.retain(|t| t != RATIO); name_tokens.push(RATIO.to_string()); } + // Build the string from the tokens, separated with underscores let name = name_tokens.join(UNDERSCORE); + // Metric name cannot start with a digit, so prefix it with "_" in this case if let Some((_, first)) = name.char_indices().next() && first >= '0' && first <= '9' @@ -298,7 +314,8 @@ fn build_unit_suffix(unit: &str) -> (Option, Option) { fn check_unit(unit_str: &str, unit_map: &HashMap) -> Option { let u = unit_str.trim(); - if !u.is_empty() && !u.contains("{}") { + // Skip units that are empty, contain "{" or "}" characters + if !u.is_empty() && !u.contains('{') && !u.contains('}') { let u = unit_map.get(u).map(|s| s.as_ref()).unwrap_or(u); let u = clean_unit_name(u); if !u.is_empty() { @@ -309,7 +326,13 @@ fn check_unit(unit_str: &str, unit_map: &HashMap) -> Option String { - NON_ALPHA_NUM_CHAR.split(name).join(UNDERSCORE) + // Split on non-alphanumeric characters, filter out empty strings, then join with underscores + // This matches the Go implementation: strings.FieldsFunc + strings.Join + NON_ALPHA_NUM_CHAR + .split(name) + .filter(|s| !s.is_empty()) + .collect::>() + .join(UNDERSCORE) } // See https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/145942706622aba5c276ca47f48df438228bfea4/pkg/translator/prometheus/normalize_label.go#L27 @@ -1037,6 +1060,57 @@ mod tests { } } + #[test] + fn test_normalize_metric_name_edge_cases() { + let test_cases = vec![ + // Edge case: name with multiple non-alphanumeric chars in a row + ( + Metric { + name: "foo--bar__baz".to_string(), + ..Default::default() + }, + MetricType::Init, + "foo_bar_baz", + ), + // Edge case: name starting and ending with non-alphanumeric + ( + Metric { + name: "-foo_bar-".to_string(), + ..Default::default() + }, + MetricType::Init, + "foo_bar", + ), + // Edge case: name with only special chars (should be empty) + ( + Metric { + name: "--___--".to_string(), + ..Default::default() + }, + MetricType::Init, + "", + ), + // Edge case: name starting with digit + ( + Metric { + name: "2xx_requests".to_string(), + ..Default::default() + }, + MetricType::Init, + "_2xx_requests", + ), + ]; + + for (metric, metric_type, expected) in test_cases { + let result = normalize_metric_name(&metric, &metric_type); + assert_eq!( + result, expected, + "Failed for metric name: '{}', unit: '{}', type: {:?}", + metric.name, metric.unit, metric_type + ); + } + } + #[test] fn test_normalize_label_name() { let test_cases = vec![ @@ -1058,6 +1132,320 @@ mod tests { } } + #[test] + fn test_clean_unit_name() { + // Test the improved clean_unit_name function + assert_eq!(clean_unit_name("faults"), "faults"); + assert_eq!(clean_unit_name("{faults}"), "faults"); // clean_unit_name still processes braces internally + assert_eq!(clean_unit_name("req/sec"), "req_sec"); + assert_eq!(clean_unit_name("m/s"), "m_s"); + assert_eq!(clean_unit_name("___test___"), "test"); + assert_eq!( + clean_unit_name("multiple__underscores"), + "multiple_underscores" + ); + assert_eq!(clean_unit_name(""), ""); + assert_eq!(clean_unit_name("___"), ""); + assert_eq!(clean_unit_name("bytes.per.second"), "bytes_per_second"); + } + + #[test] + fn test_normalize_metric_name_braced_units() { + // Test that units with braces are rejected (not processed) + let test_cases = vec![ + ( + Metric { + name: "test.metric".to_string(), + unit: "{faults}".to_string(), + ..Default::default() + }, + MetricType::MonotonicSum, + "test_metric_total", // braced units are rejected, no unit suffix added + ), + ( + Metric { + name: "test.metric".to_string(), + unit: "{operations}".to_string(), + ..Default::default() + }, + MetricType::Gauge, + "test_metric", // braced units are rejected, no unit suffix added + ), + ( + Metric { + name: "test.metric".to_string(), + unit: "{}".to_string(), // empty braces should be ignored due to contains('{') || contains('}') + ..Default::default() + }, + MetricType::Gauge, + "test_metric", + ), + ( + Metric { + name: "test.metric".to_string(), + unit: "faults".to_string(), // no braces, should work normally + ..Default::default() + }, + MetricType::Gauge, + "test_metric_faults", + ), + ]; + + for (metric, metric_type, expected) in test_cases { + let result = normalize_metric_name(&metric, &metric_type); + assert_eq!( + result, expected, + "Failed for metric name: '{}', unit: '{}', type: {:?}. Got: '{}', Expected: '{}'", + metric.name, metric.unit, metric_type, result, expected + ); + } + } + + #[test] + fn test_normalize_metric_name_with_testdata() { + // Test cases extracted from real OTLP metrics data from testdata.txt + let test_cases = vec![ + // Basic system metrics with various units + ( + Metric { + name: "system.paging.faults".to_string(), + unit: "{faults}".to_string(), + ..Default::default() + }, + MetricType::MonotonicSum, + "system_paging_faults_total", // braced units are rejected, no unit suffix added + ), + ( + Metric { + name: "system.paging.operations".to_string(), + unit: "{operations}".to_string(), + ..Default::default() + }, + MetricType::MonotonicSum, + "system_paging_operations_total", // braced units are rejected, no unit suffix added + ), + ( + Metric { + name: "system.paging.usage".to_string(), + unit: "By".to_string(), + ..Default::default() + }, + MetricType::NonMonotonicSum, + "system_paging_usage_bytes", + ), + // Load average metrics - gauge with custom unit + ( + Metric { + name: "system.cpu.load_average.15m".to_string(), + unit: "{thread}".to_string(), + ..Default::default() + }, + MetricType::Gauge, + "system_cpu_load_average_15m", // braced units are rejected, no unit suffix added + ), + ( + Metric { + name: "system.cpu.load_average.1m".to_string(), + unit: "{thread}".to_string(), + ..Default::default() + }, + MetricType::Gauge, + "system_cpu_load_average_1m", // braced units are rejected, no unit suffix added + ), + // Disk I/O with bytes unit + ( + Metric { + name: "system.disk.io".to_string(), + unit: "By".to_string(), + ..Default::default() + }, + MetricType::MonotonicSum, + "system_disk_io_bytes_total", + ), + // Time-based metrics with seconds unit + ( + Metric { + name: "system.disk.io_time".to_string(), + unit: "s".to_string(), + ..Default::default() + }, + MetricType::MonotonicSum, + "system_disk_io_time_seconds_total", + ), + ( + Metric { + name: "system.disk.operation_time".to_string(), + unit: "s".to_string(), + ..Default::default() + }, + MetricType::MonotonicSum, + "system_disk_operation_time_seconds_total", + ), + // CPU time metric + ( + Metric { + name: "system.cpu.time".to_string(), + unit: "s".to_string(), + ..Default::default() + }, + MetricType::MonotonicSum, + "system_cpu_time_seconds_total", + ), + // Process counts + ( + Metric { + name: "system.processes.count".to_string(), + unit: "{processes}".to_string(), + ..Default::default() + }, + MetricType::NonMonotonicSum, + "system_processes_count", // braced units are rejected, no unit suffix added + ), + ( + Metric { + name: "system.processes.created".to_string(), + unit: "{processes}".to_string(), + ..Default::default() + }, + MetricType::MonotonicSum, + "system_processes_created_total", // braced units are rejected, no unit suffix added + ), + // Memory usage with bytes + ( + Metric { + name: "system.memory.usage".to_string(), + unit: "By".to_string(), + ..Default::default() + }, + MetricType::NonMonotonicSum, + "system_memory_usage_bytes", + ), + // Uptime as gauge + ( + Metric { + name: "system.uptime".to_string(), + unit: "s".to_string(), + ..Default::default() + }, + MetricType::Gauge, + "system_uptime_seconds", + ), + // Network metrics + ( + Metric { + name: "system.network.connections".to_string(), + unit: "{connections}".to_string(), + ..Default::default() + }, + MetricType::NonMonotonicSum, + "system_network_connections", // braced units are rejected, no unit suffix added + ), + ( + Metric { + name: "system.network.dropped".to_string(), + unit: "{packets}".to_string(), + ..Default::default() + }, + MetricType::MonotonicSum, + "system_network_dropped_total", // braced units are rejected, no unit suffix added + ), + ( + Metric { + name: "system.network.errors".to_string(), + unit: "{errors}".to_string(), + ..Default::default() + }, + MetricType::MonotonicSum, + "system_network_errors_total", // braced units are rejected, no unit suffix added + ), + ( + Metric { + name: "system.network.io".to_string(), + unit: "By".to_string(), + ..Default::default() + }, + MetricType::MonotonicSum, + "system_network_io_bytes_total", + ), + ( + Metric { + name: "system.network.packets".to_string(), + unit: "{packets}".to_string(), + ..Default::default() + }, + MetricType::MonotonicSum, + "system_network_packets_total", // braced units are rejected, no unit suffix added + ), + // Filesystem metrics + ( + Metric { + name: "system.filesystem.inodes.usage".to_string(), + unit: "{inodes}".to_string(), + ..Default::default() + }, + MetricType::NonMonotonicSum, + "system_filesystem_inodes_usage", // braced units are rejected, no unit suffix added + ), + ( + Metric { + name: "system.filesystem.usage".to_string(), + unit: "By".to_string(), + ..Default::default() + }, + MetricType::NonMonotonicSum, + "system_filesystem_usage_bytes", + ), + // Edge cases with special characters and numbers + ( + Metric { + name: "system.load.1".to_string(), + unit: "1".to_string(), + ..Default::default() + }, + MetricType::Gauge, + "system_load_1_ratio", + ), + ( + Metric { + name: "http.request.2xx".to_string(), + unit: "{requests}".to_string(), + ..Default::default() + }, + MetricType::MonotonicSum, + "http_request_2xx_total", // braced units are rejected, no unit suffix added + ), + // Metric with dots and underscores mixed + ( + Metric { + name: "jvm.memory.heap_usage".to_string(), + unit: "By".to_string(), + ..Default::default() + }, + MetricType::Gauge, + "jvm_memory_heap_usage_bytes", + ), + // Complex unit with per-second + ( + Metric { + name: "http.request.rate".to_string(), + unit: "1/s".to_string(), + ..Default::default() + }, + MetricType::Gauge, + "http_request_rate_per_second", + ), + ]; + + for (metric, metric_type, expected) in test_cases { + let result = normalize_metric_name(&metric, &metric_type); + assert_eq!( + result, expected, + "Failed for metric name: '{}', unit: '{}', type: {:?}. Got: '{}', Expected: '{}'", + metric.name, metric.unit, metric_type, result, expected + ); + } + } + fn keyvalue(key: &str, value: &str) -> KeyValue { KeyValue { key: key.into(),