mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-26 16:10:02 +00:00
feat(otlp): json attributes via impl Serialeze trait (#2685)
* feat: json attributes via impl Serialeze trait * chore: rename TraceLink to SpanLink * feat: support serialize null value
This commit is contained in:
@@ -16,7 +16,7 @@ use std::sync::Arc;
|
||||
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
|
||||
|
||||
use super::trace::TraceSpans;
|
||||
use super::trace::span::TraceSpans;
|
||||
|
||||
/// Transformer helps to transform ExportTraceServiceRequest based on logic, like:
|
||||
/// - uplift some fields from Attributes (Map type) to column
|
||||
|
||||
@@ -12,22 +12,12 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, RowInsertRequests};
|
||||
use common_grpc::writer::Precision;
|
||||
use common_time::timestamp::Timestamp;
|
||||
use itertools::Itertools;
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
|
||||
use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue;
|
||||
use opentelemetry_proto::tonic::common::v1::{
|
||||
AnyValue, ArrayValue, InstrumentationScope, KeyValue, KeyValueList,
|
||||
};
|
||||
use opentelemetry_proto::tonic::trace::v1::span::{Event, Link};
|
||||
use opentelemetry_proto::tonic::trace::v1::{Span, Status};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use self::span::{parse_span, TraceSpan, TraceSpans};
|
||||
use super::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
|
||||
use crate::error::Result;
|
||||
use crate::row_writer::{self, MultiTableData, TableData};
|
||||
@@ -35,68 +25,29 @@ use crate::row_writer::{self, MultiTableData, TableData};
|
||||
const APPROXIMATE_COLUMN_COUNT: usize = 24;
|
||||
pub const TRACE_TABLE_NAME: &str = "traces_preview_v01";
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TraceSpan {
|
||||
// the following are tags
|
||||
pub trace_id: String,
|
||||
pub span_id: String,
|
||||
pub parent_span_id: String,
|
||||
pub mod attributes;
|
||||
pub mod span;
|
||||
|
||||
// the following are fields
|
||||
pub resource_attributes: String, // TODO(yuanbohan): Map in the future
|
||||
pub scope_name: String,
|
||||
pub scope_version: String,
|
||||
pub scope_attributes: String, // TODO(yuanbohan): Map in the future
|
||||
pub trace_state: String,
|
||||
pub span_name: String,
|
||||
pub span_kind: String,
|
||||
pub span_status_code: String,
|
||||
pub span_status_message: String,
|
||||
pub span_attributes: String, // TODO(yuanbohan): Map in the future
|
||||
pub span_events: String, // TODO(yuanbohan): List in the future
|
||||
pub span_links: String, // TODO(yuanbohan): List in the future
|
||||
pub start_in_nanosecond: u64, // this is also the Timestamp Index
|
||||
pub end_in_nanosecond: u64,
|
||||
|
||||
pub uplifted_fields: Vec<(String, ColumnDataType, ValueData)>,
|
||||
}
|
||||
|
||||
pub type TraceSpans = Vec<TraceSpan>;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct TraceLink {
|
||||
pub trace_id: String,
|
||||
pub span_id: String,
|
||||
pub trace_state: String,
|
||||
pub attributes: String, // TODO(yuanbohan): Map in the future
|
||||
}
|
||||
|
||||
impl From<&Link> for TraceLink {
|
||||
fn from(link: &Link) -> Self {
|
||||
Self {
|
||||
trace_id: bytes_to_hex_string(&link.trace_id),
|
||||
span_id: bytes_to_hex_string(&link.span_id),
|
||||
trace_state: link.trace_state.clone(),
|
||||
attributes: vec_kv_to_string(&link.attributes),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct SpanEvent {
|
||||
pub name: String,
|
||||
pub time: String,
|
||||
pub attributes: String, // TODO(yuanbohan): Map in the future
|
||||
}
|
||||
|
||||
impl From<&Event> for SpanEvent {
|
||||
fn from(event: &Event) -> Self {
|
||||
Self {
|
||||
name: event.name.clone(),
|
||||
time: Timestamp::new_nanosecond(event.time_unix_nano as i64).to_iso8601_string(),
|
||||
attributes: vec_kv_to_string(&event.attributes),
|
||||
/// Convert OpenTelemetry traces to SpanTraces
|
||||
///
|
||||
/// See
|
||||
/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto>
|
||||
/// for data structure of OTLP traces.
|
||||
pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans {
|
||||
let mut spans = vec![];
|
||||
for resource_spans in request.resource_spans {
|
||||
let resource_attrs = resource_spans
|
||||
.resource
|
||||
.map(|r| r.attributes)
|
||||
.unwrap_or_default();
|
||||
for scope_spans in resource_spans.scope_spans {
|
||||
let scope = scope_spans.scope.unwrap_or_default();
|
||||
for span in scope_spans.spans {
|
||||
spans.push(parse_span(resource_attrs.clone(), scope.clone(), span));
|
||||
}
|
||||
}
|
||||
}
|
||||
spans
|
||||
}
|
||||
|
||||
/// Convert SpanTraces to GreptimeDB row insert requests.
|
||||
@@ -135,18 +86,18 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
|
||||
{
|
||||
// fields
|
||||
let str_fields_iter = vec![
|
||||
("resource_attributes", span.resource_attributes),
|
||||
("resource_attributes", span.resource_attributes.to_string()),
|
||||
("scope_name", span.scope_name),
|
||||
("scope_version", span.scope_version),
|
||||
("scope_attributes", span.scope_attributes),
|
||||
("scope_attributes", span.scope_attributes.to_string()),
|
||||
("trace_state", span.trace_state),
|
||||
("span_name", span.span_name),
|
||||
("span_kind", span.span_kind),
|
||||
("span_status_code", span.span_status_code),
|
||||
("span_status_message", span.span_status_message),
|
||||
("span_attributes", span.span_attributes),
|
||||
("span_events", span.span_events),
|
||||
("span_links", span.span_links),
|
||||
("span_attributes", span.span_attributes.to_string()),
|
||||
("span_events", span.span_events.to_string()),
|
||||
("span_links", span.span_links.to_string()),
|
||||
]
|
||||
.into_iter()
|
||||
.map(|(col, val)| {
|
||||
@@ -172,7 +123,7 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
|
||||
|
||||
row_writer::write_fields(writer, str_fields_iter, &mut row)?;
|
||||
row_writer::write_fields(writer, time_fields_iter, &mut row)?;
|
||||
row_writer::write_fields(writer, span.uplifted_fields.into_iter(), &mut row)?;
|
||||
row_writer::write_fields(writer, span.uplifted_span_attributes.into_iter(), &mut row)?;
|
||||
}
|
||||
|
||||
row_writer::write_f64(
|
||||
@@ -193,271 +144,3 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn parse_span(
|
||||
resource_attrs: &[KeyValue],
|
||||
scope: &InstrumentationScope,
|
||||
span: Span,
|
||||
) -> TraceSpan {
|
||||
let (span_status_code, span_status_message) = status_to_string(&span.status);
|
||||
let span_kind = span.kind().as_str_name().into();
|
||||
TraceSpan {
|
||||
trace_id: bytes_to_hex_string(&span.trace_id),
|
||||
span_id: bytes_to_hex_string(&span.span_id),
|
||||
parent_span_id: bytes_to_hex_string(&span.parent_span_id),
|
||||
|
||||
resource_attributes: vec_kv_to_string(resource_attrs),
|
||||
trace_state: span.trace_state,
|
||||
|
||||
scope_name: scope.name.clone(),
|
||||
scope_version: scope.version.clone(),
|
||||
scope_attributes: vec_kv_to_string(&scope.attributes),
|
||||
|
||||
span_name: span.name,
|
||||
span_kind,
|
||||
span_status_code,
|
||||
span_status_message,
|
||||
span_attributes: vec_kv_to_string(&span.attributes),
|
||||
span_events: events_to_string(&span.events),
|
||||
span_links: links_to_string(&span.links),
|
||||
|
||||
start_in_nanosecond: span.start_time_unix_nano,
|
||||
end_in_nanosecond: span.end_time_unix_nano,
|
||||
|
||||
uplifted_fields: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert OpenTelemetry traces to SpanTraces
|
||||
///
|
||||
/// See
|
||||
/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto>
|
||||
/// for data structure of OTLP traces.
|
||||
pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans {
|
||||
let mut spans = vec![];
|
||||
for resource_spans in request.resource_spans {
|
||||
let resource_attrs = resource_spans
|
||||
.resource
|
||||
.map(|r| r.attributes)
|
||||
.unwrap_or_default();
|
||||
for scope_spans in resource_spans.scope_spans {
|
||||
let scope = scope_spans.scope.unwrap_or_default();
|
||||
for span in scope_spans.spans {
|
||||
spans.push(parse_span(&resource_attrs, &scope, span));
|
||||
}
|
||||
}
|
||||
}
|
||||
spans
|
||||
}
|
||||
|
||||
pub fn bytes_to_hex_string(bs: &[u8]) -> String {
|
||||
bs.iter().map(|b| format!("{:02x}", b)).join("")
|
||||
}
|
||||
|
||||
pub fn arr_vals_to_string(arr: &ArrayValue) -> String {
|
||||
let vs: Vec<String> = arr
|
||||
.values
|
||||
.iter()
|
||||
.filter_map(|val| any_value_to_string(val.clone()))
|
||||
.collect();
|
||||
|
||||
serde_json::to_string(&vs).unwrap_or_else(|_| "[]".into())
|
||||
}
|
||||
|
||||
pub fn vec_kv_to_string(vec: &[KeyValue]) -> String {
|
||||
let vs: HashMap<String, String> = vec
|
||||
.iter()
|
||||
.map(|kv| {
|
||||
let val = kv
|
||||
.value
|
||||
.clone()
|
||||
.and_then(any_value_to_string)
|
||||
.unwrap_or_default();
|
||||
(kv.key.clone(), val)
|
||||
})
|
||||
.collect();
|
||||
|
||||
serde_json::to_string(&vs).unwrap_or_else(|_| "{}".into())
|
||||
}
|
||||
|
||||
pub fn kvlist_to_string(kvlist: &KeyValueList) -> String {
|
||||
vec_kv_to_string(&kvlist.values)
|
||||
}
|
||||
|
||||
pub fn any_value_to_string(val: AnyValue) -> Option<String> {
|
||||
val.value.map(|value| match value {
|
||||
OtlpValue::StringValue(s) => s,
|
||||
OtlpValue::BoolValue(b) => b.to_string(),
|
||||
OtlpValue::IntValue(i) => i.to_string(),
|
||||
OtlpValue::DoubleValue(d) => d.to_string(),
|
||||
OtlpValue::ArrayValue(arr) => arr_vals_to_string(&arr),
|
||||
OtlpValue::KvlistValue(kv) => kvlist_to_string(&kv),
|
||||
OtlpValue::BytesValue(bs) => bytes_to_hex_string(&bs),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn events_to_string(events: &[Event]) -> String {
|
||||
let v: Vec<SpanEvent> = events.iter().map(SpanEvent::from).collect();
|
||||
serde_json::to_string(&v).unwrap_or_else(|_| "[]".into())
|
||||
}
|
||||
|
||||
pub fn links_to_string(links: &[Link]) -> String {
|
||||
let v: Vec<TraceLink> = links.iter().map(TraceLink::from).collect();
|
||||
serde_json::to_string(&v).unwrap_or_else(|_| "[]".into())
|
||||
}
|
||||
|
||||
pub fn status_to_string(status: &Option<Status>) -> (String, String) {
|
||||
match status {
|
||||
Some(status) => (status.code().as_str_name().into(), status.message.clone()),
|
||||
None => ("".into(), "".into()),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_time::timestamp::Timestamp;
|
||||
use opentelemetry_proto::tonic::common::v1::{
|
||||
any_value, AnyValue, ArrayValue, KeyValue, KeyValueList,
|
||||
};
|
||||
use opentelemetry_proto::tonic::trace::v1::span::{Event, Link};
|
||||
use opentelemetry_proto::tonic::trace::v1::Status;
|
||||
use serde_json::json;
|
||||
|
||||
use crate::otlp::trace::{
|
||||
arr_vals_to_string, bytes_to_hex_string, events_to_string, kvlist_to_string,
|
||||
links_to_string, status_to_string, vec_kv_to_string, SpanEvent, TraceLink,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn test_bytes_to_hex_string() {
|
||||
assert_eq!(
|
||||
"24fe79948641b110a29bc27859307e8d",
|
||||
bytes_to_hex_string(&[
|
||||
36, 254, 121, 148, 134, 65, 177, 16, 162, 155, 194, 120, 89, 48, 126, 141,
|
||||
])
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
"baffeedd7b8debc0",
|
||||
bytes_to_hex_string(&[186, 255, 238, 221, 123, 141, 235, 192,])
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_arr_vals_to_string() {
|
||||
assert_eq!("[]", arr_vals_to_string(&ArrayValue { values: vec![] }));
|
||||
|
||||
let arr = ArrayValue {
|
||||
values: vec![
|
||||
AnyValue {
|
||||
value: Some(any_value::Value::StringValue("string_value".into())),
|
||||
},
|
||||
AnyValue {
|
||||
value: Some(any_value::Value::BoolValue(true)),
|
||||
},
|
||||
AnyValue {
|
||||
value: Some(any_value::Value::IntValue(1)),
|
||||
},
|
||||
AnyValue {
|
||||
value: Some(any_value::Value::DoubleValue(1.2)),
|
||||
},
|
||||
],
|
||||
};
|
||||
let expect = json!(["string_value", "true", "1", "1.2"]).to_string();
|
||||
assert_eq!(expect, arr_vals_to_string(&arr));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_kv_list_to_string() {
|
||||
let kvlist = KeyValueList {
|
||||
values: vec![KeyValue {
|
||||
key: "str_key".into(),
|
||||
value: Some(AnyValue {
|
||||
value: Some(any_value::Value::StringValue("val1".into())),
|
||||
}),
|
||||
}],
|
||||
};
|
||||
let expect = json!({
|
||||
"str_key": "val1",
|
||||
})
|
||||
.to_string();
|
||||
assert_eq!(expect, kvlist_to_string(&kvlist))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_links_to_string() {
|
||||
let trace_id = vec![
|
||||
36, 254, 121, 148, 134, 65, 177, 16, 162, 155, 194, 120, 89, 48, 126, 141,
|
||||
];
|
||||
let span_id = vec![186, 255, 238, 221, 123, 141, 235, 192];
|
||||
let trace_state = "OK".to_string();
|
||||
let link_attributes = vec![KeyValue {
|
||||
key: "str_key".into(),
|
||||
value: Some(AnyValue {
|
||||
value: Some(any_value::Value::StringValue("val1".into())),
|
||||
}),
|
||||
}];
|
||||
|
||||
let trace_links = vec![TraceLink {
|
||||
trace_id: bytes_to_hex_string(&trace_id),
|
||||
span_id: bytes_to_hex_string(&span_id),
|
||||
trace_state: trace_state.clone(),
|
||||
attributes: vec_kv_to_string(&link_attributes),
|
||||
}];
|
||||
let expect_string = serde_json::to_string(&trace_links).unwrap_or_default();
|
||||
|
||||
let links = vec![Link {
|
||||
trace_id,
|
||||
span_id,
|
||||
trace_state,
|
||||
attributes: link_attributes,
|
||||
dropped_attributes_count: 0,
|
||||
}];
|
||||
let links_string = links_to_string(&links);
|
||||
|
||||
assert_eq!(expect_string, links_string);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_events_to_string() {
|
||||
let time_unix_nano = 1697620662450128000_u64;
|
||||
let event_name = "event_name".to_string();
|
||||
let event_attributes = vec![KeyValue {
|
||||
key: "str_key".into(),
|
||||
value: Some(AnyValue {
|
||||
value: Some(any_value::Value::StringValue("val1".into())),
|
||||
}),
|
||||
}];
|
||||
|
||||
let span_events = vec![SpanEvent {
|
||||
name: event_name.clone(),
|
||||
time: Timestamp::new_nanosecond(time_unix_nano as i64).to_iso8601_string(),
|
||||
attributes: vec_kv_to_string(&event_attributes),
|
||||
}];
|
||||
let expect_string = serde_json::to_string(&span_events).unwrap_or_default();
|
||||
|
||||
let events = vec![Event {
|
||||
time_unix_nano,
|
||||
name: event_name,
|
||||
attributes: event_attributes,
|
||||
dropped_attributes_count: 0,
|
||||
}];
|
||||
let events_string = events_to_string(&events);
|
||||
|
||||
assert_eq!(expect_string, events_string);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_status_to_string() {
|
||||
let message = String::from("status message");
|
||||
let status = Status {
|
||||
code: 1,
|
||||
message: message.clone(),
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
("STATUS_CODE_OK".into(), message),
|
||||
status_to_string(&Some(status)),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
314
src/servers/src/otlp/trace/attributes.rs
Normal file
314
src/servers/src/otlp/trace/attributes.rs
Normal file
@@ -0,0 +1,314 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt::Display;
|
||||
|
||||
use opentelemetry_proto::tonic::common::v1::any_value::Value::{
|
||||
ArrayValue, BoolValue, BytesValue, DoubleValue, IntValue, KvlistValue, StringValue,
|
||||
};
|
||||
use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue};
|
||||
use serde::ser::{SerializeMap, SerializeSeq};
|
||||
use serde::Serialize;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct OtlpAnyValue<'a>(&'a AnyValue);
|
||||
|
||||
impl<'a> From<&'a AnyValue> for OtlpAnyValue<'a> {
|
||||
fn from(any_val: &'a AnyValue) -> Self {
|
||||
Self(any_val)
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for OtlpAnyValue<'_> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", serde_json::to_string(self).unwrap_or_default())
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for OtlpAnyValue<'_> {
|
||||
fn serialize<S>(&self, zer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
match &self.0.value {
|
||||
Some(val) => match &val {
|
||||
StringValue(v) => zer.serialize_str(v),
|
||||
BoolValue(v) => zer.serialize_bool(*v),
|
||||
IntValue(v) => zer.serialize_i64(*v),
|
||||
DoubleValue(v) => zer.serialize_f64(*v),
|
||||
ArrayValue(v) => {
|
||||
let mut seq = zer.serialize_seq(Some(v.values.len()))?;
|
||||
for val in &v.values {
|
||||
seq.serialize_element(&OtlpAnyValue::from(val))?;
|
||||
}
|
||||
seq.end()
|
||||
}
|
||||
KvlistValue(v) => {
|
||||
let mut map = zer.serialize_map(Some(v.values.len()))?;
|
||||
for kv in &v.values {
|
||||
match &kv.value {
|
||||
Some(val) => map.serialize_entry(&kv.key, &OtlpAnyValue::from(val))?,
|
||||
None => map.serialize_entry(
|
||||
&kv.key,
|
||||
&OtlpAnyValue::from(&AnyValue { value: None }),
|
||||
)?,
|
||||
}
|
||||
}
|
||||
map.end()
|
||||
}
|
||||
BytesValue(v) => zer.serialize_bytes(v),
|
||||
},
|
||||
None => zer.serialize_none(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Attributes(Vec<KeyValue>);
|
||||
|
||||
impl Display for Attributes {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", serde_json::to_string(self).unwrap_or_default())
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for Attributes {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
let mut map = serializer.serialize_map(Some(self.0.len()))?;
|
||||
for attr in &self.0 {
|
||||
match &attr.value {
|
||||
Some(val) => map.serialize_entry(&attr.key, &OtlpAnyValue::from(val))?,
|
||||
None => {
|
||||
map.serialize_entry(&attr.key, &OtlpAnyValue::from(&AnyValue { value: None }))?
|
||||
}
|
||||
}
|
||||
}
|
||||
map.end()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Vec<KeyValue>> for Attributes {
|
||||
fn from(attrs: Vec<KeyValue>) -> Self {
|
||||
Self(attrs)
|
||||
}
|
||||
}
|
||||
|
||||
impl Attributes {
|
||||
pub fn get_ref(&self) -> &Vec<KeyValue> {
|
||||
&self.0
|
||||
}
|
||||
|
||||
pub fn get_mut(&mut self) -> &mut Vec<KeyValue> {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use opentelemetry_proto::tonic::common::v1::any_value::Value;
|
||||
use opentelemetry_proto::tonic::common::v1::{AnyValue, ArrayValue, KeyValue, KeyValueList};
|
||||
|
||||
use crate::otlp::trace::attributes::{Attributes, OtlpAnyValue};
|
||||
|
||||
#[test]
|
||||
fn test_null_value() {
|
||||
let otlp_value = OtlpAnyValue::from(&AnyValue { value: None });
|
||||
assert_eq!("null", serde_json::to_string(&otlp_value).unwrap())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_any_value_primitive_type_serialize() {
|
||||
let values = vec![
|
||||
(
|
||||
r#""string value""#,
|
||||
Value::StringValue(String::from("string value")),
|
||||
),
|
||||
("true", Value::BoolValue(true)),
|
||||
("1", Value::IntValue(1)),
|
||||
("1.1", Value::DoubleValue(1.1)),
|
||||
("[1,2,3]", Value::BytesValue(vec![1, 2, 3])),
|
||||
];
|
||||
|
||||
for (expect, val) in values {
|
||||
let any_val = AnyValue { value: Some(val) };
|
||||
let otlp_value = OtlpAnyValue::from(&any_val);
|
||||
assert_eq!(expect, serde_json::to_string(&otlp_value).unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_any_value_array_type_serialize() {
|
||||
let values = vec![
|
||||
("[]", vec![]),
|
||||
("[null]", vec![AnyValue { value: None }]),
|
||||
(
|
||||
r#"["string1","string2","string3"]"#,
|
||||
vec![
|
||||
AnyValue {
|
||||
value: Some(Value::StringValue(String::from("string1"))),
|
||||
},
|
||||
AnyValue {
|
||||
value: Some(Value::StringValue(String::from("string2"))),
|
||||
},
|
||||
AnyValue {
|
||||
value: Some(Value::StringValue(String::from("string3"))),
|
||||
},
|
||||
],
|
||||
),
|
||||
(
|
||||
"[1,2,3]",
|
||||
vec![
|
||||
AnyValue {
|
||||
value: Some(Value::IntValue(1)),
|
||||
},
|
||||
AnyValue {
|
||||
value: Some(Value::IntValue(2)),
|
||||
},
|
||||
AnyValue {
|
||||
value: Some(Value::IntValue(3)),
|
||||
},
|
||||
],
|
||||
),
|
||||
(
|
||||
"[1.1,2.2,3.3]",
|
||||
vec![
|
||||
AnyValue {
|
||||
value: Some(Value::DoubleValue(1.1)),
|
||||
},
|
||||
AnyValue {
|
||||
value: Some(Value::DoubleValue(2.2)),
|
||||
},
|
||||
AnyValue {
|
||||
value: Some(Value::DoubleValue(3.3)),
|
||||
},
|
||||
],
|
||||
),
|
||||
(
|
||||
"[true,false,true]",
|
||||
vec![
|
||||
AnyValue {
|
||||
value: Some(Value::BoolValue(true)),
|
||||
},
|
||||
AnyValue {
|
||||
value: Some(Value::BoolValue(false)),
|
||||
},
|
||||
AnyValue {
|
||||
value: Some(Value::BoolValue(true)),
|
||||
},
|
||||
],
|
||||
),
|
||||
(
|
||||
r#"[1,1.1,"str_value",true,null]"#,
|
||||
vec![
|
||||
AnyValue {
|
||||
value: Some(Value::IntValue(1)),
|
||||
},
|
||||
AnyValue {
|
||||
value: Some(Value::DoubleValue(1.1)),
|
||||
},
|
||||
AnyValue {
|
||||
value: Some(Value::StringValue("str_value".into())),
|
||||
},
|
||||
AnyValue {
|
||||
value: Some(Value::BoolValue(true)),
|
||||
},
|
||||
AnyValue { value: None },
|
||||
],
|
||||
),
|
||||
];
|
||||
|
||||
for (expect, values) in values {
|
||||
let any_val = AnyValue {
|
||||
value: Some(Value::ArrayValue(ArrayValue { values })),
|
||||
};
|
||||
let otlp_value = OtlpAnyValue::from(&any_val);
|
||||
assert_eq!(expect, serde_json::to_string(&otlp_value).unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_any_value_map_type_serialize() {
|
||||
let cases = vec![
|
||||
("{}", vec![]),
|
||||
(
|
||||
r#"{"key1":null}"#,
|
||||
vec![KeyValue {
|
||||
key: "key1".into(),
|
||||
value: None,
|
||||
}],
|
||||
),
|
||||
(
|
||||
r#"{"key1":null}"#,
|
||||
vec![KeyValue {
|
||||
key: "key1".into(),
|
||||
value: Some(AnyValue { value: None }),
|
||||
}],
|
||||
),
|
||||
(
|
||||
r#"{"key1":"val1"}"#,
|
||||
vec![KeyValue {
|
||||
key: "key1".into(),
|
||||
value: Some(AnyValue {
|
||||
value: Some(Value::StringValue(String::from("val1"))),
|
||||
}),
|
||||
}],
|
||||
),
|
||||
];
|
||||
|
||||
for (expect, values) in cases {
|
||||
let any_val = AnyValue {
|
||||
value: Some(Value::KvlistValue(KeyValueList { values })),
|
||||
};
|
||||
let otlp_value = OtlpAnyValue::from(&any_val);
|
||||
assert_eq!(expect, serde_json::to_string(&otlp_value).unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_attributes_serialize() {
|
||||
let cases = vec![
|
||||
("{}", vec![]),
|
||||
(
|
||||
r#"{"key1":null}"#,
|
||||
vec![KeyValue {
|
||||
key: "key1".into(),
|
||||
value: None,
|
||||
}],
|
||||
),
|
||||
(
|
||||
r#"{"key1":null}"#,
|
||||
vec![KeyValue {
|
||||
key: "key1".into(),
|
||||
value: Some(AnyValue { value: None }),
|
||||
}],
|
||||
),
|
||||
(
|
||||
r#"{"key1":"val1"}"#,
|
||||
vec![KeyValue {
|
||||
key: "key1".into(),
|
||||
value: Some(AnyValue {
|
||||
value: Some(Value::StringValue(String::from("val1"))),
|
||||
}),
|
||||
}],
|
||||
),
|
||||
];
|
||||
|
||||
for (expect, values) in cases {
|
||||
assert_eq!(expect, serde_json::to_string(&Attributes(values)).unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
223
src/servers/src/otlp/trace/span.rs
Normal file
223
src/servers/src/otlp/trace/span.rs
Normal file
@@ -0,0 +1,223 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt::Display;
|
||||
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::ColumnDataType;
|
||||
use common_time::timestamp::Timestamp;
|
||||
use itertools::Itertools;
|
||||
use opentelemetry_proto::tonic::common::v1::{InstrumentationScope, KeyValue};
|
||||
use opentelemetry_proto::tonic::trace::v1::span::{Event, Link};
|
||||
use opentelemetry_proto::tonic::trace::v1::{Span, Status};
|
||||
use serde::Serialize;
|
||||
|
||||
use super::attributes::Attributes;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TraceSpan {
|
||||
// the following are tags
|
||||
pub trace_id: String,
|
||||
pub span_id: String,
|
||||
pub parent_span_id: String,
|
||||
|
||||
// the following are fields
|
||||
pub resource_attributes: Attributes, // TODO(yuanbohan): Map in the future
|
||||
pub scope_name: String,
|
||||
pub scope_version: String,
|
||||
pub scope_attributes: Attributes, // TODO(yuanbohan): Map in the future
|
||||
pub trace_state: String,
|
||||
pub span_name: String,
|
||||
pub span_kind: String,
|
||||
pub span_status_code: String,
|
||||
pub span_status_message: String,
|
||||
pub span_attributes: Attributes, // TODO(yuanbohan): Map in the future
|
||||
pub span_events: SpanEvents, // TODO(yuanbohan): List in the future
|
||||
pub span_links: SpanLinks, // TODO(yuanbohan): List in the future
|
||||
pub start_in_nanosecond: u64, // this is also the Timestamp Index
|
||||
pub end_in_nanosecond: u64,
|
||||
|
||||
pub uplifted_span_attributes: Vec<(String, ColumnDataType, ValueData)>,
|
||||
}
|
||||
|
||||
pub type TraceSpans = Vec<TraceSpan>;
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct SpanLink {
|
||||
pub trace_id: String,
|
||||
pub span_id: String,
|
||||
pub trace_state: String,
|
||||
pub attributes: Attributes, // TODO(yuanbohan): Map in the future
|
||||
}
|
||||
|
||||
impl From<Link> for SpanLink {
|
||||
fn from(link: Link) -> Self {
|
||||
Self {
|
||||
trace_id: bytes_to_hex_string(&link.trace_id),
|
||||
span_id: bytes_to_hex_string(&link.span_id),
|
||||
trace_state: link.trace_state,
|
||||
attributes: Attributes::from(link.attributes),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct SpanLinks(Vec<SpanLink>);
|
||||
|
||||
impl From<Vec<Link>> for SpanLinks {
|
||||
fn from(value: Vec<Link>) -> Self {
|
||||
let links = value.into_iter().map(SpanLink::from).collect_vec();
|
||||
Self(links)
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for SpanLinks {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", serde_json::to_string(self).unwrap_or_default())
|
||||
}
|
||||
}
|
||||
|
||||
impl SpanLinks {
|
||||
pub fn get_ref(&self) -> &Vec<SpanLink> {
|
||||
&self.0
|
||||
}
|
||||
|
||||
pub fn get_mut(&mut self) -> &mut Vec<SpanLink> {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct SpanEvent {
|
||||
pub name: String,
|
||||
pub time: String,
|
||||
pub attributes: Attributes, // TODO(yuanbohan): Map in the future
|
||||
}
|
||||
|
||||
impl From<Event> for SpanEvent {
|
||||
fn from(event: Event) -> Self {
|
||||
Self {
|
||||
name: event.name,
|
||||
time: Timestamp::new_nanosecond(event.time_unix_nano as i64).to_iso8601_string(),
|
||||
attributes: Attributes::from(event.attributes),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct SpanEvents(Vec<SpanEvent>);
|
||||
|
||||
impl From<Vec<Event>> for SpanEvents {
|
||||
fn from(value: Vec<Event>) -> Self {
|
||||
let events = value.into_iter().map(SpanEvent::from).collect_vec();
|
||||
Self(events)
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for SpanEvents {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", serde_json::to_string(self).unwrap_or_default())
|
||||
}
|
||||
}
|
||||
|
||||
impl SpanEvents {
|
||||
pub fn get_ref(&self) -> &Vec<SpanEvent> {
|
||||
&self.0
|
||||
}
|
||||
|
||||
pub fn get_mut(&mut self) -> &mut Vec<SpanEvent> {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
pub fn parse_span(
|
||||
resource_attrs: Vec<KeyValue>,
|
||||
scope: InstrumentationScope,
|
||||
span: Span,
|
||||
) -> TraceSpan {
|
||||
let (span_status_code, span_status_message) = status_to_string(&span.status);
|
||||
let span_kind = span.kind().as_str_name().into();
|
||||
TraceSpan {
|
||||
trace_id: bytes_to_hex_string(&span.trace_id),
|
||||
span_id: bytes_to_hex_string(&span.span_id),
|
||||
parent_span_id: bytes_to_hex_string(&span.parent_span_id),
|
||||
|
||||
resource_attributes: Attributes::from(resource_attrs),
|
||||
trace_state: span.trace_state,
|
||||
|
||||
scope_name: scope.name,
|
||||
scope_version: scope.version,
|
||||
scope_attributes: Attributes::from(scope.attributes),
|
||||
|
||||
span_name: span.name,
|
||||
span_kind,
|
||||
span_status_code,
|
||||
span_status_message,
|
||||
span_attributes: Attributes::from(span.attributes),
|
||||
span_events: SpanEvents::from(span.events),
|
||||
span_links: SpanLinks::from(span.links),
|
||||
|
||||
start_in_nanosecond: span.start_time_unix_nano,
|
||||
end_in_nanosecond: span.end_time_unix_nano,
|
||||
|
||||
uplifted_span_attributes: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
pub fn bytes_to_hex_string(bs: &[u8]) -> String {
|
||||
bs.iter().map(|b| format!("{:02x}", b)).join("")
|
||||
}
|
||||
|
||||
pub fn status_to_string(status: &Option<Status>) -> (String, String) {
|
||||
match status {
|
||||
Some(status) => (status.code().as_str_name().into(), status.message.clone()),
|
||||
None => ("".into(), "".into()),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use opentelemetry_proto::tonic::trace::v1::Status;
|
||||
|
||||
use crate::otlp::trace::span::{bytes_to_hex_string, status_to_string};
|
||||
|
||||
#[test]
|
||||
fn test_bytes_to_hex_string() {
|
||||
assert_eq!(
|
||||
"24fe79948641b110a29bc27859307e8d",
|
||||
bytes_to_hex_string(&[
|
||||
36, 254, 121, 148, 134, 65, 177, 16, 162, 155, 194, 120, 89, 48, 126, 141,
|
||||
])
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
"baffeedd7b8debc0",
|
||||
bytes_to_hex_string(&[186, 255, 238, 221, 123, 141, 235, 192,])
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_status_to_string() {
|
||||
let message = String::from("status message");
|
||||
let status = Status {
|
||||
code: 1,
|
||||
message: message.clone(),
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
("STATUS_CODE_OK".into(), message),
|
||||
status_to_string(&Some(status)),
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user