diff --git a/src/servers/src/otlp/trace.rs b/src/servers/src/otlp/trace.rs index 9572ea4df1..aeb31a3796 100644 --- a/src/servers/src/otlp/trace.rs +++ b/src/servers/src/otlp/trace.rs @@ -15,7 +15,9 @@ use api::v1::value::ValueData; use api::v1::{ColumnDataType, RowInsertRequests}; use common_grpc::precision::Precision; +use itertools::Itertools; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; +use opentelemetry_proto::tonic::common::v1::any_value; use self::span::{parse_span, TraceSpan, TraceSpans}; use crate::error::Result; @@ -34,16 +36,39 @@ pub mod span; /// /// for data structure of OTLP traces. pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans { - let mut spans = vec![]; + let span_size = request + .resource_spans + .iter() + .flat_map(|res| res.scope_spans.iter()) + .flat_map(|scope| scope.spans.iter()) + .count(); + let mut spans = Vec::with_capacity(span_size); for resource_spans in request.resource_spans { let resource_attrs = resource_spans .resource .map(|r| r.attributes) .unwrap_or_default(); + let service_name = resource_attrs + .iter() + .find_or_first(|kv| kv.key == "service.name") + .and_then(|kv| kv.value.clone()) + .and_then(|v| match v.value { + Some(any_value::Value::StringValue(s)) => Some(s), + Some(any_value::Value::BytesValue(b)) => { + Some(String::from_utf8_lossy(&b).to_string()) + } + _ => None, + }); + for scope_spans in resource_spans.scope_spans { let scope = scope_spans.scope.unwrap_or_default(); for span in scope_spans.spans { - spans.push(parse_span(&resource_attrs, &scope, span)); + spans.push(parse_span( + service_name.clone(), + &resource_attrs, + &scope, + span, + )); } } } @@ -96,6 +121,10 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> ]; row_writer::write_fields(writer, fields.into_iter(), &mut row)?; + if let Some(service_name) = span.service_name { + row_writer::write_tag(writer, "service_name", service_name, &mut row)?; + } + // tags let iter = vec![ ("trace_id", span.trace_id), diff --git a/src/servers/src/otlp/trace/span.rs b/src/servers/src/otlp/trace/span.rs index 02fc523f66..a6d810d045 100644 --- a/src/servers/src/otlp/trace/span.rs +++ b/src/servers/src/otlp/trace/span.rs @@ -27,6 +27,7 @@ use crate::otlp::utils::bytes_to_hex_string; #[derive(Debug, Clone)] pub struct TraceSpan { // the following are tags + pub service_name: Option, pub trace_id: String, pub span_id: String, pub parent_span_id: String, @@ -189,6 +190,7 @@ impl SpanEvents { } pub fn parse_span( + service_name: Option, resource_attrs: &[KeyValue], scope: &InstrumentationScope, span: Span, @@ -196,6 +198,7 @@ pub fn parse_span( let (span_status_code, span_status_message) = status_to_string(&span.status); let span_kind = span.kind().as_str_name().into(); TraceSpan { + service_name, trace_id: bytes_to_hex_string(&span.trace_id), span_id: bytes_to_hex_string(&span.span_id), parent_span_id: bytes_to_hex_string(&span.parent_span_id), diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 2c912aa0af..3d6aef33fc 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1713,7 +1713,7 @@ pub async fn test_otlp_traces(store_type: StorageType) { assert_eq!(StatusCode::OK, res.status()); // select traces data - let expected = r#"[[1726631197820927000,1726631197821050000,123000,"b5e5fb572cf0a3335dd194a14145fef5","3364d2da58c9fd2b","","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-server"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1726631197820927000,1726631197821050000,123000,"b5e5fb572cf0a3335dd194a14145fef5","74c82efa6f628e80","3364d2da58c9fd2b","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-client"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}]]"#; + let expected = r#"[[1726631197820927000,1726631197821050000,123000,"telemetrygen","b5e5fb572cf0a3335dd194a14145fef5","3364d2da58c9fd2b","","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-server"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1726631197820927000,1726631197821050000,123000,"telemetrygen","b5e5fb572cf0a3335dd194a14145fef5","74c82efa6f628e80","3364d2da58c9fd2b","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-client"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}]]"#; validate_data( "otlp_traces", &client,