mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 13:52:59 +00:00
chore: improve /v1/jaeger/api/trace/{trace_id}'s resp (#5663)
* chore: improve jaeger trace api resp * chore: fix timestamp type * chore: fix timestamp type * chore: complete more fields * chore: change to microseconds * chore: add empty check & span status code * chore: minor update * chore: update test
This commit is contained in:
@@ -35,11 +35,10 @@ use servers::error::{
|
||||
CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
|
||||
TableNotFoundSnafu,
|
||||
};
|
||||
use servers::http::jaeger::QueryTraceParams;
|
||||
use servers::http::jaeger::{QueryTraceParams, FIND_TRACES_COLS};
|
||||
use servers::otlp::trace::{
|
||||
DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_ID_COLUMN,
|
||||
SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
|
||||
TRACE_TABLE_NAME,
|
||||
DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_KIND_COLUMN,
|
||||
SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_TABLE_NAME,
|
||||
};
|
||||
use servers::query_handler::JaegerQueryHandler;
|
||||
use session::context::QueryContextRef;
|
||||
@@ -102,16 +101,9 @@ impl JaegerQueryHandler for Instance {
|
||||
}
|
||||
|
||||
async fn get_trace(&self, ctx: QueryContextRef, trace_id: &str) -> ServerResult<Output> {
|
||||
// It's equivalent to `SELECT trace_id, timestamp, duration_nano, service_name, span_name, span_id, span_attributes FROM {db}.{trace_table} WHERE trace_id = '{trace_id}'`.
|
||||
let selects = vec![
|
||||
col(TRACE_ID_COLUMN),
|
||||
col(TIMESTAMP_COLUMN),
|
||||
col(DURATION_NANO_COLUMN),
|
||||
col(SERVICE_NAME_COLUMN),
|
||||
col(SPAN_NAME_COLUMN),
|
||||
col(SPAN_ID_COLUMN),
|
||||
col(SPAN_ATTRIBUTES_COLUMN),
|
||||
];
|
||||
// It's equivalent to `SELECT trace_id, timestamp, duration_nano, service_name, span_name, span_id, span_attributes, resource_attributes, parent_span_id
|
||||
// FROM {db}.{trace_table} WHERE trace_id = '{trace_id}'`.
|
||||
let selects: Vec<Expr> = FIND_TRACES_COLS.clone();
|
||||
|
||||
let filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
|
||||
|
||||
@@ -133,15 +125,7 @@ impl JaegerQueryHandler for Instance {
|
||||
ctx: QueryContextRef,
|
||||
query_params: QueryTraceParams,
|
||||
) -> ServerResult<Output> {
|
||||
let selects = vec![
|
||||
col(TRACE_ID_COLUMN),
|
||||
col(TIMESTAMP_COLUMN),
|
||||
col(DURATION_NANO_COLUMN),
|
||||
col(SERVICE_NAME_COLUMN),
|
||||
col(SPAN_NAME_COLUMN),
|
||||
col(SPAN_ID_COLUMN),
|
||||
col(SPAN_ATTRIBUTES_COLUMN),
|
||||
];
|
||||
let selects: Vec<Expr> = FIND_TRACES_COLS.clone();
|
||||
|
||||
let mut filters = vec![];
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::hash_map::Entry::{Occupied, Vacant};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -19,11 +20,14 @@ use axum::extract::{Path, Query, State};
|
||||
use axum::http::StatusCode as HttpStatusCode;
|
||||
use axum::response::IntoResponse;
|
||||
use axum::Extension;
|
||||
use common_catalog::consts::PARENT_SPAN_ID_COLUMN;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_query::{Output, OutputData};
|
||||
use common_recordbatch::util;
|
||||
use common_telemetry::{debug, error, tracing, warn};
|
||||
use datafusion_expr::{col, Expr};
|
||||
use lazy_static::lazy_static;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value as JsonValue;
|
||||
use session::context::{Channel, QueryContext};
|
||||
@@ -35,12 +39,53 @@ use crate::error::{
|
||||
use crate::http::HttpRecordsOutput;
|
||||
use crate::metrics::METRIC_JAEGER_QUERY_ELAPSED;
|
||||
use crate::otlp::trace::{
|
||||
DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_ID_COLUMN,
|
||||
SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
|
||||
TRACE_TABLE_NAME,
|
||||
DURATION_NANO_COLUMN, KEY_OTEL_SCOPE_NAME, KEY_OTEL_SCOPE_VERSION, KEY_OTEL_STATUS_CODE,
|
||||
KEY_SERVICE_NAME, KEY_SPAN_KIND, RESOURCE_ATTRIBUTES_COLUMN, SCOPE_NAME_COLUMN,
|
||||
SCOPE_VERSION_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_EVENTS_COLUMN,
|
||||
SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE,
|
||||
SPAN_STATUS_PREFIX, SPAN_STATUS_UNSET, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_TABLE_NAME,
|
||||
};
|
||||
use crate::query_handler::JaegerQueryHandlerRef;
|
||||
|
||||
lazy_static! {
|
||||
pub static ref FIND_TRACES_COLS: Vec<Expr> = vec![
|
||||
col(TRACE_ID_COLUMN),
|
||||
col(TIMESTAMP_COLUMN),
|
||||
col(DURATION_NANO_COLUMN),
|
||||
col(SERVICE_NAME_COLUMN),
|
||||
col(SPAN_NAME_COLUMN),
|
||||
col(SPAN_ID_COLUMN),
|
||||
col(SPAN_ATTRIBUTES_COLUMN),
|
||||
col(RESOURCE_ATTRIBUTES_COLUMN),
|
||||
col(PARENT_SPAN_ID_COLUMN),
|
||||
col(SPAN_EVENTS_COLUMN),
|
||||
col(SCOPE_NAME_COLUMN),
|
||||
col(SCOPE_VERSION_COLUMN),
|
||||
col(SPAN_KIND_COLUMN),
|
||||
col(SPAN_STATUS_CODE),
|
||||
];
|
||||
static ref FIND_TRACES_SCHEMA: Vec<(&'static str, &'static str)> = vec![
|
||||
(TRACE_ID_COLUMN, "String"),
|
||||
(TIMESTAMP_COLUMN, "TimestampNanosecond"),
|
||||
(DURATION_NANO_COLUMN, "UInt64"),
|
||||
(SERVICE_NAME_COLUMN, "String"),
|
||||
(SPAN_NAME_COLUMN, "String"),
|
||||
(SPAN_ID_COLUMN, "String"),
|
||||
(SPAN_ATTRIBUTES_COLUMN, "Json"),
|
||||
(RESOURCE_ATTRIBUTES_COLUMN, "Json"),
|
||||
(PARENT_SPAN_ID_COLUMN, "String"),
|
||||
(SPAN_EVENTS_COLUMN, "Json"),
|
||||
(SCOPE_NAME_COLUMN, "String"),
|
||||
(SCOPE_VERSION_COLUMN, "String"),
|
||||
(SPAN_KIND_COLUMN, "String"),
|
||||
(SPAN_STATUS_CODE, "String"),
|
||||
];
|
||||
}
|
||||
|
||||
const REF_TYPE_CHILD_OF: &str = "CHILD_OF";
|
||||
|
||||
const SPAN_KIND_TIME_FMTS: [&str; 2] = ["%Y-%m-%d %H:%M:%S%.6f%z", "%Y-%m-%d %H:%M:%S%.9f%z"];
|
||||
|
||||
/// JaegerAPIResponse is the response of Jaeger HTTP API.
|
||||
/// The original version is `structuredResponse` which is defined in https://github.com/jaegertracing/jaeger/blob/main/cmd/query/app/http_handler.go.
|
||||
#[derive(Default, Debug, Serialize, Deserialize, PartialEq)]
|
||||
@@ -154,7 +199,7 @@ pub struct Process {
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Log {
|
||||
pub timestamp: i64,
|
||||
pub timestamp: u64,
|
||||
pub fields: Vec<KeyValue>,
|
||||
}
|
||||
|
||||
@@ -387,29 +432,35 @@ pub async fn handle_get_trace(
|
||||
.with_label_values(&[&db, "/api/traces"])
|
||||
.start_timer();
|
||||
|
||||
match handler.get_trace(query_ctx, &trace_id).await {
|
||||
Ok(output) => match covert_to_records(output).await {
|
||||
Ok(Some(records)) => match traces_from_records(records) {
|
||||
Ok(traces) => (
|
||||
HttpStatusCode::OK,
|
||||
axum::Json(JaegerAPIResponse {
|
||||
data: Some(JaegerData::Traces(traces)),
|
||||
..Default::default()
|
||||
}),
|
||||
),
|
||||
Err(err) => {
|
||||
error!("Failed to get trace '{}': {:?}", trace_id, err);
|
||||
error_response(err)
|
||||
}
|
||||
},
|
||||
Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
|
||||
let output = match handler.get_trace(query_ctx, &trace_id).await {
|
||||
Ok(output) => output,
|
||||
Err(err) => {
|
||||
return handle_query_error(
|
||||
err,
|
||||
&format!("Failed to get trace for '{}'", trace_id),
|
||||
&db,
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
match covert_to_records(output).await {
|
||||
Ok(Some(records)) => match traces_from_records(records) {
|
||||
Ok(traces) => (
|
||||
HttpStatusCode::OK,
|
||||
axum::Json(JaegerAPIResponse {
|
||||
data: Some(JaegerData::Traces(traces)),
|
||||
..Default::default()
|
||||
}),
|
||||
),
|
||||
Err(err) => {
|
||||
error!("Failed to get trace '{}': {:?}", trace_id, err);
|
||||
error_response(err)
|
||||
}
|
||||
},
|
||||
Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
|
||||
Err(err) => {
|
||||
handle_query_error(err, &format!("Failed to get trace for '{}'", trace_id), &db)
|
||||
error!("Failed to get trace '{}': {:?}", trace_id, err);
|
||||
error_response(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -641,21 +692,15 @@ fn error_response(err: Error) -> (HttpStatusCode, axum::Json<JaegerAPIResponse>)
|
||||
}
|
||||
// Construct Jaeger traces from records.
|
||||
fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
|
||||
let expected_schema = vec![
|
||||
(TRACE_ID_COLUMN, "String"),
|
||||
(TIMESTAMP_COLUMN, "TimestampNanosecond"),
|
||||
(DURATION_NANO_COLUMN, "UInt64"),
|
||||
(SERVICE_NAME_COLUMN, "String"),
|
||||
(SPAN_NAME_COLUMN, "String"),
|
||||
(SPAN_ID_COLUMN, "String"),
|
||||
(SPAN_ATTRIBUTES_COLUMN, "Json"),
|
||||
];
|
||||
let expected_schema = FIND_TRACES_SCHEMA.clone();
|
||||
check_schema(&records, &expected_schema)?;
|
||||
|
||||
// maintain the mapping: trace_id -> (process_id -> service_name).
|
||||
let mut trace_id_to_processes: HashMap<String, HashMap<String, String>> = HashMap::new();
|
||||
// maintain the mapping: trace_id -> spans.
|
||||
let mut trace_id_to_spans: HashMap<String, Vec<Span>> = HashMap::new();
|
||||
// maintain the mapping: service.name -> resource.attributes.
|
||||
let mut service_to_resource_attributes: HashMap<String, Vec<KeyValue>> = HashMap::new();
|
||||
|
||||
for row in records.rows.into_iter() {
|
||||
let mut span = Span::default();
|
||||
@@ -713,32 +758,114 @@ fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
|
||||
|
||||
// Convert span attributes to tags.
|
||||
if let Some(JsonValue::Object(object)) = row_iter.next() {
|
||||
let tags = object
|
||||
.into_iter()
|
||||
.filter_map(|(key, value)| match value {
|
||||
JsonValue::String(value) => Some(KeyValue {
|
||||
key,
|
||||
value_type: ValueType::String,
|
||||
value: Value::String(value.to_string()),
|
||||
}),
|
||||
JsonValue::Number(value) => Some(KeyValue {
|
||||
key,
|
||||
value_type: ValueType::Int64,
|
||||
value: Value::Int64(value.as_i64().unwrap_or(0)),
|
||||
}),
|
||||
JsonValue::Bool(value) => Some(KeyValue {
|
||||
key,
|
||||
value_type: ValueType::Boolean,
|
||||
value: Value::Boolean(value),
|
||||
}),
|
||||
// FIXME(zyy17): Do we need to support other types?
|
||||
_ => {
|
||||
warn!("Unsupported value type: {:?}", value);
|
||||
None
|
||||
span.tags = object_to_tags(object);
|
||||
}
|
||||
|
||||
// Save resource attributes with service name.
|
||||
if let Some(JsonValue::Object(mut object)) = row_iter.next() {
|
||||
if let Some(service_name) = object
|
||||
.remove(KEY_SERVICE_NAME)
|
||||
.and_then(|v| v.as_str().map(|s| s.to_string()))
|
||||
{
|
||||
match service_to_resource_attributes.entry(service_name) {
|
||||
Occupied(_) => {}
|
||||
Vacant(vacant) => {
|
||||
let _ = vacant.insert(object_to_tags(object));
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
span.tags = tags;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Set parent span id.
|
||||
if let Some(JsonValue::String(parent_span_id)) = row_iter.next()
|
||||
&& !parent_span_id.is_empty()
|
||||
{
|
||||
span.references.push(Reference {
|
||||
trace_id: span.trace_id.clone(),
|
||||
span_id: parent_span_id,
|
||||
ref_type: REF_TYPE_CHILD_OF.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
// Set span events to logs.
|
||||
if let Some(JsonValue::Array(events)) = row_iter.next() {
|
||||
for event in events {
|
||||
if let JsonValue::Object(mut obj) = event {
|
||||
let Some(action) = obj.get("name").and_then(|v| v.as_str()) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(t) = obj.get("time").and_then(|t| t.as_str()).and_then(|s| {
|
||||
SPAN_KIND_TIME_FMTS
|
||||
.iter()
|
||||
.find_map(|fmt| chrono::NaiveDateTime::parse_from_str(s, fmt).ok())
|
||||
.map(|dt| dt.and_utc().timestamp_micros() as u64)
|
||||
}) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let mut fields = vec![KeyValue {
|
||||
key: "event".to_string(),
|
||||
value_type: ValueType::String,
|
||||
value: Value::String(action.to_string()),
|
||||
}];
|
||||
|
||||
// Add event attributes as fields
|
||||
if let Some(JsonValue::Object(attrs)) = obj.remove("attributes") {
|
||||
fields.extend(object_to_tags(attrs));
|
||||
}
|
||||
|
||||
span.logs.push(Log {
|
||||
timestamp: t,
|
||||
fields,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Set scope name.
|
||||
if let Some(JsonValue::String(scope_name)) = row_iter.next()
|
||||
&& !scope_name.is_empty()
|
||||
{
|
||||
span.tags.push(KeyValue {
|
||||
key: KEY_OTEL_SCOPE_NAME.to_string(),
|
||||
value_type: ValueType::String,
|
||||
value: Value::String(scope_name),
|
||||
});
|
||||
}
|
||||
|
||||
// Set scope version.
|
||||
if let Some(JsonValue::String(scope_version)) = row_iter.next()
|
||||
&& !scope_version.is_empty()
|
||||
{
|
||||
span.tags.push(KeyValue {
|
||||
key: KEY_OTEL_SCOPE_VERSION.to_string(),
|
||||
value_type: ValueType::String,
|
||||
value: Value::String(scope_version),
|
||||
});
|
||||
}
|
||||
|
||||
// Set span kind.
|
||||
if let Some(JsonValue::String(span_kind)) = row_iter.next()
|
||||
&& !span_kind.is_empty()
|
||||
{
|
||||
span.tags.push(KeyValue {
|
||||
key: KEY_SPAN_KIND.to_string(),
|
||||
value_type: ValueType::String,
|
||||
value: Value::String(normalize_span_kind(&span_kind)),
|
||||
});
|
||||
}
|
||||
|
||||
// Set span status code.
|
||||
if let Some(JsonValue::String(span_status_code)) = row_iter.next()
|
||||
&& span_status_code != SPAN_STATUS_UNSET
|
||||
&& !span_status_code.is_empty()
|
||||
{
|
||||
span.tags.push(KeyValue {
|
||||
key: KEY_OTEL_STATUS_CODE.to_string(),
|
||||
value_type: ValueType::String,
|
||||
value: Value::String(normalize_status_code(&span_status_code)),
|
||||
});
|
||||
}
|
||||
|
||||
if let Some(spans) = trace_id_to_spans.get_mut(&span.trace_id) {
|
||||
@@ -759,13 +886,10 @@ fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
|
||||
if let Some(processes) = trace_id_to_processes.remove(&trace.trace_id) {
|
||||
let mut process_id_to_process = HashMap::new();
|
||||
for (service_name, process_id) in processes.into_iter() {
|
||||
process_id_to_process.insert(
|
||||
process_id,
|
||||
Process {
|
||||
service_name,
|
||||
tags: vec![],
|
||||
},
|
||||
);
|
||||
let tags = service_to_resource_attributes
|
||||
.remove(&service_name)
|
||||
.unwrap_or_default();
|
||||
process_id_to_process.insert(process_id, Process { service_name, tags });
|
||||
}
|
||||
trace.processes = process_id_to_process;
|
||||
}
|
||||
@@ -775,6 +899,45 @@ fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
|
||||
Ok(traces)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn object_to_tags(object: serde_json::map::Map<String, JsonValue>) -> Vec<KeyValue> {
|
||||
object
|
||||
.into_iter()
|
||||
.filter_map(|(key, value)| match value {
|
||||
JsonValue::String(value) => Some(KeyValue {
|
||||
key,
|
||||
value_type: ValueType::String,
|
||||
value: Value::String(value.to_string()),
|
||||
}),
|
||||
JsonValue::Number(value) => Some(KeyValue {
|
||||
key,
|
||||
value_type: ValueType::Int64,
|
||||
value: Value::Int64(value.as_i64().unwrap_or(0)),
|
||||
}),
|
||||
JsonValue::Bool(value) => Some(KeyValue {
|
||||
key,
|
||||
value_type: ValueType::Boolean,
|
||||
value: Value::Boolean(value),
|
||||
}),
|
||||
JsonValue::Array(value) => Some(KeyValue {
|
||||
key,
|
||||
value_type: ValueType::String,
|
||||
value: Value::String(serde_json::to_string(&value).unwrap()),
|
||||
}),
|
||||
JsonValue::Object(value) => Some(KeyValue {
|
||||
key,
|
||||
value_type: ValueType::String,
|
||||
value: Value::String(serde_json::to_string(&value).unwrap()),
|
||||
}),
|
||||
// FIXME(zyy17): Do we need to support other types?
|
||||
_ => {
|
||||
warn!("Unsupported value type: {:?}", value);
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn services_from_records(records: HttpRecordsOutput) -> Result<Vec<String>> {
|
||||
let expected_schema = vec![(SERVICE_NAME_COLUMN, "String")];
|
||||
check_schema(&records, &expected_schema)?;
|
||||
@@ -850,6 +1013,18 @@ fn normalize_span_kind(span_kind: &str) -> String {
|
||||
}
|
||||
}
|
||||
|
||||
// By default, the status code is stored as `STATUS_CODE_<code>` in GreptimeDB.
|
||||
// However, in Jaeger API, the status code is returned as `<code>` without the `STATUS_CODE_` prefix.
|
||||
fn normalize_status_code(status_code: &str) -> String {
|
||||
// If the span_kind starts with `SPAN_KIND_` prefix, remove it and convert to lowercase.
|
||||
if let Some(stripped) = status_code.strip_prefix(SPAN_STATUS_PREFIX) {
|
||||
stripped.to_string()
|
||||
} else {
|
||||
// It's unlikely to happen
|
||||
status_code.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
fn convert_string_to_number(input: &serde_json::Value) -> Option<serde_json::Value> {
|
||||
if let Some(data) = input.as_str() {
|
||||
if let Ok(number) = data.parse::<i64>() {
|
||||
|
||||
@@ -30,15 +30,35 @@ use crate::query_handler::PipelineHandlerRef;
|
||||
|
||||
pub const TRACE_TABLE_NAME: &str = "opentelemetry_traces";
|
||||
|
||||
// column names
|
||||
pub const SERVICE_NAME_COLUMN: &str = "service_name";
|
||||
pub const TIMESTAMP_COLUMN: &str = "timestamp";
|
||||
pub const DURATION_NANO_COLUMN: &str = "duration_nano";
|
||||
pub const SPAN_KIND_COLUMN: &str = "span_kind";
|
||||
pub const SPAN_STATUS_CODE: &str = "span_status_code";
|
||||
pub const SPAN_ATTRIBUTES_COLUMN: &str = "span_attributes";
|
||||
pub const SPAN_EVENTS_COLUMN: &str = "span_events";
|
||||
pub const SCOPE_NAME_COLUMN: &str = "scope_name";
|
||||
pub const SCOPE_VERSION_COLUMN: &str = "scope_version";
|
||||
pub const RESOURCE_ATTRIBUTES_COLUMN: &str = "resource_attributes";
|
||||
|
||||
// const keys
|
||||
pub const KEY_SERVICE_NAME: &str = "service.name";
|
||||
pub const KEY_SPAN_KIND: &str = "span.kind";
|
||||
|
||||
// jaeger const keys, not sure if they are general
|
||||
pub const KEY_OTEL_SCOPE_NAME: &str = "otel.scope.name";
|
||||
pub const KEY_OTEL_SCOPE_VERSION: &str = "otel.scope.version";
|
||||
pub const KEY_OTEL_STATUS_CODE: &str = "otel.status_code";
|
||||
|
||||
/// The span kind prefix in the database.
|
||||
/// If the span kind is `server`, it will be stored as `SPAN_KIND_SERVER` in the database.
|
||||
pub const SPAN_KIND_PREFIX: &str = "SPAN_KIND_";
|
||||
|
||||
// The span status code prefix in the database.
|
||||
pub const SPAN_STATUS_PREFIX: &str = "STATUS_CODE_";
|
||||
pub const SPAN_STATUS_UNSET: &str = "STATUS_CODE_UNSET";
|
||||
|
||||
/// Convert SpanTraces to GreptimeDB row insert requests.
|
||||
/// Returns `InsertRequests` and total number of rows to ingest
|
||||
pub fn to_grpc_insert_requests(
|
||||
|
||||
@@ -23,6 +23,7 @@ use opentelemetry_proto::tonic::trace::v1::{Span, Status};
|
||||
use serde::Serialize;
|
||||
|
||||
use super::attributes::Attributes;
|
||||
use crate::otlp::trace::KEY_SERVICE_NAME;
|
||||
use crate::otlp::utils::bytes_to_hex_string;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -251,7 +252,7 @@ pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans {
|
||||
.unwrap_or_default();
|
||||
let service_name = resource_attrs
|
||||
.iter()
|
||||
.find_or_first(|kv| kv.key == "service.name")
|
||||
.find_or_first(|kv| kv.key == KEY_SERVICE_NAME)
|
||||
.and_then(|kv| kv.value.clone())
|
||||
.and_then(|v| match v.value {
|
||||
Some(any_value::Value::StringValue(s)) => Some(s),
|
||||
|
||||
@@ -25,6 +25,7 @@ use super::{
|
||||
SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
|
||||
};
|
||||
use crate::error::Result;
|
||||
use crate::otlp::trace::SPAN_EVENTS_COLUMN;
|
||||
use crate::otlp::utils::{make_column_data, make_string_column_data};
|
||||
use crate::query_handler::PipelineHandlerRef;
|
||||
use crate::row_writer::{self, MultiTableData, TableData};
|
||||
@@ -112,7 +113,12 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
|
||||
span.span_attributes.into(),
|
||||
&mut row,
|
||||
)?;
|
||||
row_writer::write_json(writer, "span_events", span.span_events.into(), &mut row)?;
|
||||
row_writer::write_json(
|
||||
writer,
|
||||
SPAN_EVENTS_COLUMN,
|
||||
span.span_events.into(),
|
||||
&mut row,
|
||||
)?;
|
||||
row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
|
||||
|
||||
// write fields
|
||||
|
||||
@@ -27,6 +27,7 @@ use super::{
|
||||
SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
|
||||
};
|
||||
use crate::error::Result;
|
||||
use crate::otlp::trace::{KEY_SERVICE_NAME, SPAN_EVENTS_COLUMN};
|
||||
use crate::otlp::utils::{any_value_to_jsonb, make_column_data, make_string_column_data};
|
||||
use crate::query_handler::PipelineHandlerRef;
|
||||
use crate::row_writer::{self, MultiTableData, TableData};
|
||||
@@ -131,7 +132,12 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
|
||||
&mut row,
|
||||
)?;
|
||||
|
||||
row_writer::write_json(writer, "span_events", span.span_events.into(), &mut row)?;
|
||||
row_writer::write_json(
|
||||
writer,
|
||||
SPAN_EVENTS_COLUMN,
|
||||
span.span_events.into(),
|
||||
&mut row,
|
||||
)?;
|
||||
row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
|
||||
|
||||
writer.add_row(row);
|
||||
@@ -149,7 +155,7 @@ fn write_attributes(
|
||||
let key_suffix = attr.key;
|
||||
// skip resource_attributes.service.name because its already copied to
|
||||
// top level as `SERVICE_NAME_COLUMN`
|
||||
if prefix == "resource_attributes" && key_suffix == "service.name" {
|
||||
if prefix == "resource_attributes" && key_suffix == KEY_SERVICE_NAME {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
@@ -2866,82 +2866,113 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
|
||||
.await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
let expected = r#"
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"data": [
|
||||
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
|
||||
"spans": [
|
||||
{
|
||||
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
|
||||
"spans": [
|
||||
"spanID": "008421dbbd33a3e9",
|
||||
"operationName": "access-mysql",
|
||||
"references": [],
|
||||
"startTime": 1738726754492422,
|
||||
"duration": 100000,
|
||||
"tags": [
|
||||
{
|
||||
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
|
||||
"spanID": "008421dbbd33a3e9",
|
||||
"operationName": "access-mysql",
|
||||
"references": [],
|
||||
"startTime": 1738726754492422,
|
||||
"duration": 100000,
|
||||
"tags": [
|
||||
{
|
||||
"key": "net.peer.ip",
|
||||
"type": "string",
|
||||
"value": "1.2.3.4"
|
||||
},
|
||||
{
|
||||
"key": "operation.type",
|
||||
"type": "string",
|
||||
"value": "access-mysql"
|
||||
},
|
||||
{
|
||||
"key": "peer.service",
|
||||
"type": "string",
|
||||
"value": "test-jaeger-query-api"
|
||||
}
|
||||
],
|
||||
"logs": [],
|
||||
"processID": "p1"
|
||||
"key": "net.peer.ip",
|
||||
"type": "string",
|
||||
"value": "1.2.3.4"
|
||||
},
|
||||
{
|
||||
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
|
||||
"spanID": "ffa03416a7b9ea48",
|
||||
"operationName": "access-redis",
|
||||
"references": [],
|
||||
"startTime": 1738726754492422,
|
||||
"duration": 100000,
|
||||
"tags": [
|
||||
{
|
||||
"key": "net.peer.ip",
|
||||
"type": "string",
|
||||
"value": "1.2.3.4"
|
||||
},
|
||||
{
|
||||
"key": "operation.type",
|
||||
"type": "string",
|
||||
"value": "access-redis"
|
||||
},
|
||||
{
|
||||
"key": "peer.service",
|
||||
"type": "string",
|
||||
"value": "test-jaeger-query-api"
|
||||
}
|
||||
],
|
||||
"logs": [],
|
||||
"processID": "p1"
|
||||
"key": "operation.type",
|
||||
"type": "string",
|
||||
"value": "access-mysql"
|
||||
},
|
||||
{
|
||||
"key": "peer.service",
|
||||
"type": "string",
|
||||
"value": "test-jaeger-query-api"
|
||||
},
|
||||
{
|
||||
"key": "otel.scope.name",
|
||||
"type": "string",
|
||||
"value": "test-jaeger-query-api"
|
||||
},
|
||||
{
|
||||
"key": "otel.scope.version",
|
||||
"type": "string",
|
||||
"value": "1.0.0"
|
||||
},
|
||||
{
|
||||
"key": "span.kind",
|
||||
"type": "string",
|
||||
"value": "server"
|
||||
}
|
||||
],
|
||||
"processes": {
|
||||
"p1": {
|
||||
"serviceName": "test-jaeger-query-api",
|
||||
"tags": []
|
||||
"logs": [],
|
||||
"processID": "p1"
|
||||
},
|
||||
{
|
||||
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
|
||||
"spanID": "ffa03416a7b9ea48",
|
||||
"operationName": "access-redis",
|
||||
"references": [],
|
||||
"startTime": 1738726754492422,
|
||||
"duration": 100000,
|
||||
"tags": [
|
||||
{
|
||||
"key": "net.peer.ip",
|
||||
"type": "string",
|
||||
"value": "1.2.3.4"
|
||||
},
|
||||
{
|
||||
"key": "operation.type",
|
||||
"type": "string",
|
||||
"value": "access-redis"
|
||||
},
|
||||
{
|
||||
"key": "peer.service",
|
||||
"type": "string",
|
||||
"value": "test-jaeger-query-api"
|
||||
},
|
||||
{
|
||||
"key": "otel.scope.name",
|
||||
"type": "string",
|
||||
"value": "test-jaeger-query-api"
|
||||
},
|
||||
{
|
||||
"key": "otel.scope.version",
|
||||
"type": "string",
|
||||
"value": "1.0.0"
|
||||
},
|
||||
{
|
||||
"key": "span.kind",
|
||||
"type": "string",
|
||||
"value": "server"
|
||||
}
|
||||
}
|
||||
],
|
||||
"logs": [],
|
||||
"processID": "p1"
|
||||
}
|
||||
],
|
||||
"total": 0,
|
||||
"limit": 0,
|
||||
"offset": 0,
|
||||
"errors": []
|
||||
"processes": {
|
||||
"p1": {
|
||||
"serviceName": "test-jaeger-query-api",
|
||||
"tags": []
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"total": 0,
|
||||
"limit": 0,
|
||||
"offset": 0,
|
||||
"errors": []
|
||||
}
|
||||
"#;
|
||||
|
||||
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
|
||||
let resp_txt = &res.text().await;
|
||||
let resp: Value = serde_json::from_str(resp_txt).unwrap();
|
||||
let expected: Value = serde_json::from_str(expected).unwrap();
|
||||
assert_eq!(resp, expected);
|
||||
|
||||
@@ -2952,55 +2983,71 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
|
||||
.await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
let expected = r#"
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"data": [
|
||||
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
|
||||
"spans": [
|
||||
{
|
||||
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
|
||||
"spans": [
|
||||
"spanID": "008421dbbd33a3e9",
|
||||
"operationName": "access-mysql",
|
||||
"references": [],
|
||||
"startTime": 1738726754492422,
|
||||
"duration": 100000,
|
||||
"tags": [
|
||||
{
|
||||
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
|
||||
"spanID": "008421dbbd33a3e9",
|
||||
"operationName": "access-mysql",
|
||||
"references": [],
|
||||
"startTime": 1738726754492422,
|
||||
"duration": 100000,
|
||||
"tags": [
|
||||
{
|
||||
"key": "net.peer.ip",
|
||||
"type": "string",
|
||||
"value": "1.2.3.4"
|
||||
},
|
||||
{
|
||||
"key": "operation.type",
|
||||
"type": "string",
|
||||
"value": "access-mysql"
|
||||
},
|
||||
{
|
||||
"key": "peer.service",
|
||||
"type": "string",
|
||||
"value": "test-jaeger-query-api"
|
||||
}
|
||||
],
|
||||
"logs": [],
|
||||
"processID": "p1"
|
||||
"key": "net.peer.ip",
|
||||
"type": "string",
|
||||
"value": "1.2.3.4"
|
||||
},
|
||||
{
|
||||
"key": "operation.type",
|
||||
"type": "string",
|
||||
"value": "access-mysql"
|
||||
},
|
||||
{
|
||||
"key": "peer.service",
|
||||
"type": "string",
|
||||
"value": "test-jaeger-query-api"
|
||||
},
|
||||
{
|
||||
"key": "otel.scope.name",
|
||||
"type": "string",
|
||||
"value": "test-jaeger-query-api"
|
||||
},
|
||||
{
|
||||
"key": "otel.scope.version",
|
||||
"type": "string",
|
||||
"value": "1.0.0"
|
||||
},
|
||||
{
|
||||
"key": "span.kind",
|
||||
"type": "string",
|
||||
"value": "server"
|
||||
}
|
||||
],
|
||||
"processes": {
|
||||
"p1": {
|
||||
"serviceName": "test-jaeger-query-api",
|
||||
"tags": []
|
||||
}
|
||||
}
|
||||
"logs": [],
|
||||
"processID": "p1"
|
||||
}
|
||||
],
|
||||
"total": 0,
|
||||
"limit": 0,
|
||||
"offset": 0,
|
||||
"errors": []
|
||||
"processes": {
|
||||
"p1": {
|
||||
"serviceName": "test-jaeger-query-api",
|
||||
"tags": []
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"total": 0,
|
||||
"limit": 0,
|
||||
"offset": 0,
|
||||
"errors": []
|
||||
}
|
||||
"#;
|
||||
|
||||
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
|
||||
let resp_txt = &res.text().await;
|
||||
let resp: Value = serde_json::from_str(resp_txt).unwrap();
|
||||
let expected: Value = serde_json::from_str(expected).unwrap();
|
||||
assert_eq!(resp, expected);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user