diff --git a/src/frontend/src/instance/jaeger.rs b/src/frontend/src/instance/jaeger.rs index 7654f7bb41..2e135cfd87 100644 --- a/src/frontend/src/instance/jaeger.rs +++ b/src/frontend/src/instance/jaeger.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use async_trait::async_trait; @@ -28,6 +28,7 @@ use common_function::scalars::udf::create_udf; use common_query::{Output, OutputData}; use common_recordbatch::adapter::RecordBatchStreamAdapter; use common_recordbatch::util; +use common_telemetry::warn; use datafusion::dataframe::DataFrame; use datafusion::execution::SessionStateBuilder; use datafusion::execution::context::SessionContext; @@ -42,8 +43,9 @@ use servers::error::{ }; use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams}; use servers::otlp::trace::{ - DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_KIND_COLUMN, - SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, + DURATION_NANO_COLUMN, KEY_OTEL_STATUS_ERROR_KEY, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, + SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_ERROR, + TIMESTAMP_COLUMN, TRACE_ID_COLUMN, }; use servers::query_handler::JaegerQueryHandler; use session::context::QueryContextRef; @@ -322,6 +324,7 @@ async fn query_trace_table( })?; let is_data_model_v1 = table + .clone() .table_info() .meta .options @@ -330,6 +333,14 @@ async fn query_trace_table( .map(|s| s.as_str()) == Some(TABLE_DATA_MODEL_TRACE_V1); + // collect to set + let col_names = table + .table_info() + .meta + .field_column_names() + .map(|s| format!("\"{}\"", s)) + .collect::>(); + let df_context = create_df_context(query_engine)?; let dataframe = df_context @@ -342,7 +353,7 @@ async fn query_trace_table( let dataframe = filters .into_iter() .chain(tags.map_or(Ok(vec![]), |t| { - tags_filters(&dataframe, t, is_data_model_v1) + tags_filters(&dataframe, t, is_data_model_v1, &col_names) })?) .try_fold(dataframe, |df, expr| { df.filter(expr).context(DataFusionSnafu) @@ -472,23 +483,73 @@ fn json_tag_filters( Ok(filters) } -fn flatten_tag_filters(tags: HashMap) -> ServerResult> { +/// Helper function to check if span_key or resource_key exists in col_names and create an expression. +/// If neither exists, logs a warning and returns None. +#[inline] +fn check_col_and_build_expr( + span_key: String, + resource_key: String, + key: &str, + col_names: &HashSet, + expr_builder: F, +) -> Option +where + F: FnOnce(String) -> Expr, +{ + if col_names.contains(&span_key) { + return Some(expr_builder(span_key)); + } + if col_names.contains(&resource_key) { + return Some(expr_builder(resource_key)); + } + warn!("tag key {} not found in table columns", key); + None +} + +fn flatten_tag_filters( + tags: HashMap, + col_names: &HashSet, +) -> ServerResult> { let filters = tags .into_iter() .filter_map(|(key, value)| { - let key = format!("\"span_attributes.{}\"", key); + if key == KEY_OTEL_STATUS_ERROR_KEY && value == JsonValue::Bool(true) { + return Some(col(SPAN_STATUS_CODE).eq(lit(SPAN_STATUS_ERROR))); + } + + // TODO(shuiyisong): add more precise mapping from key to col name + let span_key = format!("\"span_attributes.{}\"", key); + let resource_key = format!("\"resource_attributes.{}\"", key); match value { - JsonValue::String(value) => Some(col(key).eq(lit(value))), + JsonValue::String(value) => { + check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| { + col(k).eq(lit(value)) + }) + } JsonValue::Number(value) => { if value.is_f64() { // safe to unwrap as checked previously - Some(col(key).eq(lit(value.as_f64().unwrap()))) + let value = value.as_f64().unwrap(); + check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| { + col(k).eq(lit(value)) + }) } else { - Some(col(key).eq(lit(value.as_i64().unwrap()))) + let value = value.as_i64().unwrap(); + check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| { + col(k).eq(lit(value)) + }) } } - JsonValue::Bool(value) => Some(col(key).eq(lit(value))), - JsonValue::Null => Some(col(key).is_null()), + JsonValue::Bool(value) => { + check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| { + col(k).eq(lit(value)) + }) + } + JsonValue::Null => { + check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| { + col(k).is_null() + }) + } // not supported at the moment JsonValue::Array(_value) => None, JsonValue::Object(_value) => None, @@ -502,9 +563,10 @@ fn tags_filters( dataframe: &DataFrame, tags: HashMap, is_data_model_v1: bool, + col_names: &HashSet, ) -> ServerResult> { if is_data_model_v1 { - flatten_tag_filters(tags) + flatten_tag_filters(tags, col_names) } else { json_tag_filters(dataframe, tags) } diff --git a/src/servers/src/http/jaeger.rs b/src/servers/src/http/jaeger.rs index 9420c5ca2f..8d15db2a5b 100644 --- a/src/servers/src/http/jaeger.rs +++ b/src/servers/src/http/jaeger.rs @@ -40,10 +40,12 @@ use crate::http::extractor::TraceTableName; use crate::metrics::METRIC_JAEGER_QUERY_ELAPSED; use crate::otlp::trace::{ DURATION_NANO_COLUMN, KEY_OTEL_SCOPE_NAME, KEY_OTEL_SCOPE_VERSION, KEY_OTEL_STATUS_CODE, - KEY_SERVICE_NAME, KEY_SPAN_KIND, RESOURCE_ATTRIBUTES_COLUMN, SCOPE_NAME_COLUMN, - SCOPE_VERSION_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_EVENTS_COLUMN, - SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, - SPAN_STATUS_PREFIX, SPAN_STATUS_UNSET, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, + KEY_OTEL_STATUS_ERROR_KEY, KEY_OTEL_STATUS_MESSAGE, KEY_OTEL_TRACE_STATE, KEY_SERVICE_NAME, + KEY_SPAN_KIND, RESOURCE_ATTRIBUTES_COLUMN, SCOPE_NAME_COLUMN, SCOPE_VERSION_COLUMN, + SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, + SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_ERROR, + SPAN_STATUS_MESSAGE_COLUMN, SPAN_STATUS_PREFIX, SPAN_STATUS_UNSET, TIMESTAMP_COLUMN, + TRACE_ID_COLUMN, TRACE_STATE_COLUMN, }; use crate::query_handler::JaegerQueryHandlerRef; @@ -654,7 +656,10 @@ async fn covert_to_records(output: Output) -> Result> .await .context(CollectRecordbatchSnafu)?, )?; - debug!("The query records: {:?}", records); + debug!( + "The query records: {}", + serde_json::to_string(&records).unwrap() + ); Ok(Some(records)) } // It's unlikely to happen. However, if the output is not a stream, return None. @@ -859,6 +864,38 @@ fn traces_from_records(records: HttpRecordsOutput) -> Result> { value_type: ValueType::String, value: Value::String(normalize_status_code(&span_status)), }); + // set error to comply with the Jaeger API + if span_status == SPAN_STATUS_ERROR { + span.tags.push(KeyValue { + key: KEY_OTEL_STATUS_ERROR_KEY.to_string(), + value_type: ValueType::Boolean, + value: Value::Boolean(true), + }); + } + } + } + + SPAN_STATUS_MESSAGE_COLUMN => { + if let JsonValue::String(span_status_message) = cell + && !span_status_message.is_empty() + { + span.tags.push(KeyValue { + key: KEY_OTEL_STATUS_MESSAGE.to_string(), + value_type: ValueType::String, + value: Value::String(span_status_message), + }); + } + } + + TRACE_STATE_COLUMN => { + if let JsonValue::String(trace_state) = cell + && !trace_state.is_empty() + { + span.tags.push(KeyValue { + key: KEY_OTEL_TRACE_STATE.to_string(), + value_type: ValueType::String, + value: Value::String(trace_state), + }); } } diff --git a/src/servers/src/otlp/trace.rs b/src/servers/src/otlp/trace.rs index 19abc74daf..b724bb1d22 100644 --- a/src/servers/src/otlp/trace.rs +++ b/src/servers/src/otlp/trace.rs @@ -34,11 +34,13 @@ pub const TIMESTAMP_COLUMN: &str = "timestamp"; pub const DURATION_NANO_COLUMN: &str = "duration_nano"; pub const SPAN_KIND_COLUMN: &str = "span_kind"; pub const SPAN_STATUS_CODE: &str = "span_status_code"; +pub const SPAN_STATUS_MESSAGE_COLUMN: &str = "span_status_message"; pub const SPAN_ATTRIBUTES_COLUMN: &str = "span_attributes"; pub const SPAN_EVENTS_COLUMN: &str = "span_events"; pub const SCOPE_NAME_COLUMN: &str = "scope_name"; pub const SCOPE_VERSION_COLUMN: &str = "scope_version"; pub const RESOURCE_ATTRIBUTES_COLUMN: &str = "resource_attributes"; +pub const TRACE_STATE_COLUMN: &str = "trace_state"; // const keys pub const KEY_SERVICE_NAME: &str = "service.name"; @@ -49,6 +51,9 @@ pub const KEY_SPAN_KIND: &str = "span.kind"; pub const KEY_OTEL_SCOPE_NAME: &str = "otel.scope.name"; pub const KEY_OTEL_SCOPE_VERSION: &str = "otel.scope.version"; pub const KEY_OTEL_STATUS_CODE: &str = "otel.status_code"; +pub const KEY_OTEL_STATUS_MESSAGE: &str = "otel.status_description"; +pub const KEY_OTEL_STATUS_ERROR_KEY: &str = "error"; +pub const KEY_OTEL_TRACE_STATE: &str = "w3c.tracestate"; /// The span kind prefix in the database. /// If the span kind is `server`, it will be stored as `SPAN_KIND_SERVER` in the database. @@ -57,6 +62,7 @@ pub const SPAN_KIND_PREFIX: &str = "SPAN_KIND_"; // The span status code prefix in the database. pub const SPAN_STATUS_PREFIX: &str = "STATUS_CODE_"; pub const SPAN_STATUS_UNSET: &str = "STATUS_CODE_UNSET"; +pub const SPAN_STATUS_ERROR: &str = "STATUS_CODE_ERROR"; /// Convert SpanTraces to GreptimeDB row insert requests. /// Returns `InsertRequests` and total number of rows to ingest diff --git a/src/servers/src/otlp/trace/v0.rs b/src/servers/src/otlp/trace/v0.rs index d45b0e6802..b52b406fb2 100644 --- a/src/servers/src/otlp/trace/v0.rs +++ b/src/servers/src/otlp/trace/v0.rs @@ -26,8 +26,8 @@ use crate::error::Result; use crate::otlp::trace::span::{TraceSpan, parse}; use crate::otlp::trace::{ DURATION_NANO_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, - SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, - TRACE_ID_COLUMN, + SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, + SPAN_STATUS_MESSAGE_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_STATE_COLUMN, }; use crate::otlp::utils::{make_column_data, make_string_column_data}; use crate::query_handler::PipelineHandlerRef; @@ -124,9 +124,9 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> make_string_column_data(PARENT_SPAN_ID_COLUMN, span.parent_span_id), make_string_column_data(SPAN_KIND_COLUMN, Some(span.span_kind)), make_string_column_data(SPAN_NAME_COLUMN, Some(span.span_name)), - make_string_column_data("span_status_code", Some(span.span_status_code)), - make_string_column_data("span_status_message", Some(span.span_status_message)), - make_string_column_data("trace_state", Some(span.trace_state)), + make_string_column_data(SPAN_STATUS_CODE, Some(span.span_status_code)), + make_string_column_data(SPAN_STATUS_MESSAGE_COLUMN, Some(span.span_status_message)), + make_string_column_data(TRACE_STATE_COLUMN, Some(span.trace_state)), ]; row_writer::write_fields(writer, fields.into_iter(), &mut row)?; diff --git a/src/servers/src/otlp/trace/v1.rs b/src/servers/src/otlp/trace/v1.rs index b7dcdbce7c..86f8229769 100644 --- a/src/servers/src/otlp/trace/v1.rs +++ b/src/servers/src/otlp/trace/v1.rs @@ -27,9 +27,10 @@ use crate::error::Result; use crate::otlp::trace::attributes::Attributes; use crate::otlp::trace::span::{TraceSpan, parse}; use crate::otlp::trace::{ - DURATION_NANO_COLUMN, KEY_SERVICE_NAME, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, - SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, - TRACE_ID_COLUMN, + DURATION_NANO_COLUMN, KEY_SERVICE_NAME, PARENT_SPAN_ID_COLUMN, SCOPE_NAME_COLUMN, + SCOPE_VERSION_COLUMN, SERVICE_NAME_COLUMN, SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, + SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_MESSAGE_COLUMN, + TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_STATE_COLUMN, }; use crate::otlp::utils::{any_value_to_jsonb, make_column_data, make_string_column_data}; use crate::query_handler::PipelineHandlerRef; @@ -135,11 +136,11 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> make_string_column_data(SPAN_ID_COLUMN, Some(span.span_id)), make_string_column_data(SPAN_KIND_COLUMN, Some(span.span_kind)), make_string_column_data(SPAN_NAME_COLUMN, Some(span.span_name)), - make_string_column_data("span_status_code", Some(span.span_status_code)), - make_string_column_data("span_status_message", Some(span.span_status_message)), - make_string_column_data("trace_state", Some(span.trace_state)), - make_string_column_data("scope_name", Some(span.scope_name)), - make_string_column_data("scope_version", Some(span.scope_version)), + make_string_column_data(SPAN_STATUS_CODE, Some(span.span_status_code)), + make_string_column_data(SPAN_STATUS_MESSAGE_COLUMN, Some(span.span_status_message)), + make_string_column_data(TRACE_STATE_COLUMN, Some(span.trace_state)), + make_string_column_data(SCOPE_NAME_COLUMN, Some(span.scope_name)), + make_string_column_data(SCOPE_VERSION_COLUMN, Some(span.scope_version)), ]; row_writer::write_fields(writer, fields.into_iter(), &mut row)?; diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 0976a831cd..93dea19337 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -5633,6 +5633,11 @@ pub async fn test_jaeger_query_api(store_type: StorageType) { "type": "string", "value": "1.0.0" }, + { + "key": "otel.status_description", + "type": "string", + "value": "success" + }, { "key": "peer.service", "type": "string", @@ -5675,6 +5680,11 @@ pub async fn test_jaeger_query_api(store_type: StorageType) { "type": "string", "value": "1.0.0" }, + { + "key": "otel.status_description", + "type": "string", + "value": "success" + }, { "key": "peer.service", "type": "string", @@ -5750,6 +5760,11 @@ pub async fn test_jaeger_query_api(store_type: StorageType) { "type": "string", "value": "1.0.0" }, + { + "key": "otel.status_description", + "type": "string", + "value": "success" + }, { "key": "peer.service", "type": "string", @@ -5792,6 +5807,11 @@ pub async fn test_jaeger_query_api(store_type: StorageType) { "type": "string", "value": "1.0.0" }, + { + "key": "otel.status_description", + "type": "string", + "value": "success" + }, { "key": "peer.service", "type": "string", @@ -5975,6 +5995,34 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { } } ] + }, + { + "scope": { + "name": "test-jaeger-find-traces", + "version": "1.0.0" + }, + "spans": [ + { + "traceId": "5611dce1bc9ebed65352d99a027b08fa", + "spanId": "ffa03416a7b9ea49", + "name": "access-pg", + "kind": 2, + "startTimeUnixNano": "1738726754492422000", + "endTimeUnixNano": "1738726754592422000", + "attributes": [ + { + "key": "operation.type", + "value": { + "stringValue": "access-pg" + } + } + ], + "status": { + "message": "success", + "code": 0 + } + } + ] } ], "schemaUrl": "https://opentelemetry.io/schemas/1.4.0" @@ -6143,6 +6191,11 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { "type": "string", "value": "1.0.0" }, + { + "key": "otel.status_description", + "type": "string", + "value": "success" + }, { "key": "peer.service", "type": "string", @@ -6185,6 +6238,11 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { "type": "string", "value": "1.0.0" }, + { + "key": "otel.status_description", + "type": "string", + "value": "success" + }, { "key": "peer.service", "type": "string", @@ -6259,6 +6317,11 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { "type": "string", "value": "1.0.0" }, + { + "key": "otel.status_description", + "type": "string", + "value": "success" + }, { "key": "peer.service", "type": "string", @@ -6301,6 +6364,11 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { "type": "string", "value": "1.0.0" }, + { + "key": "otel.status_description", + "type": "string", + "value": "success" + }, { "key": "peer.service", "type": "string", @@ -6376,6 +6444,11 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { "type": "string", "value": "1.0.0" }, + { + "key": "otel.status_description", + "type": "string", + "value": "success" + }, { "key": "peer.service", "type": "string", @@ -6418,6 +6491,11 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { "type": "string", "value": "1.0.0" }, + { + "key": "otel.status_description", + "type": "string", + "value": "success" + }, { "key": "peer.service", "type": "string", @@ -6452,6 +6530,39 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { let expected: Value = serde_json::from_str(expected).unwrap(); assert_eq!(resp, expected); + // Test `/api/traces` API with tags. + // 1. first query without tags, get 2 results + let res = client + .get("/v1/jaeger/api/traces?service=test-jaeger-query-api&start=1738726754492422&end=1738726754592422") + .header("x-greptime-trace-table-name", trace_table_name) + .send() + .await; + assert_eq!(StatusCode::OK, res.status()); + + let expected = r#" +{"data":[{"processes":{"p1":{"serviceName":"test-jaeger-query-api","tags":[]}},"spans":[{"duration":100000,"logs":[],"operationName":"access-pg","processID":"p1","references":[],"spanID":"ffa03416a7b9ea49","startTime":1738726754492422,"tags":[{"key":"operation.type","type":"string","value":"access-pg"},{"key":"otel.scope.name","type":"string","value":"test-jaeger-find-traces"},{"key":"otel.scope.version","type":"string","value":"1.0.0"},{"key":"otel.status_description","type":"string","value":"success"},{"key":"span.kind","type":"string","value":"server"}],"traceID":"5611dce1bc9ebed65352d99a027b08fa"}],"traceID":"5611dce1bc9ebed65352d99a027b08fa"},{"processes":{"p1":{"serviceName":"test-jaeger-query-api","tags":[]}},"spans":[{"duration":100000,"logs":[],"operationName":"access-redis","processID":"p1","references":[],"spanID":"ffa03416a7b9ea48","startTime":1738726754492422,"tags":[{"key":"net.peer.ip","type":"string","value":"1.2.3.4"},{"key":"operation.type","type":"string","value":"access-redis"},{"key":"otel.scope.name","type":"string","value":"test-jaeger-query-api"},{"key":"otel.scope.version","type":"string","value":"1.0.0"},{"key":"otel.status_description","type":"string","value":"success"},{"key":"peer.service","type":"string","value":"test-jaeger-query-api"},{"key":"span.kind","type":"string","value":"server"}],"traceID":"5611dce1bc9ebed65352d99a027b08ea"}],"traceID":"5611dce1bc9ebed65352d99a027b08ea"}],"errors":[],"limit":0,"offset":0,"total":0} + "#; + + let resp: Value = serde_json::from_str(&res.text().await).unwrap(); + let expected: Value = serde_json::from_str(expected).unwrap(); + assert_eq!(resp, expected); + + // 2. second query with tags, get 1 result + let res = client +.get("/v1/jaeger/api/traces?service=test-jaeger-query-api&start=1738726754492422&end=1738726754592422&tags=%7B%22operation.type%22%3A%22access-pg%22%7D") +.header("x-greptime-trace-table-name", trace_table_name) +.send() +.await; + assert_eq!(StatusCode::OK, res.status()); + + let expected = r#" +{"data":[{"processes":{"p1":{"serviceName":"test-jaeger-query-api","tags":[]}},"spans":[{"duration":100000,"logs":[],"operationName":"access-pg","processID":"p1","references":[],"spanID":"ffa03416a7b9ea49","startTime":1738726754492422,"tags":[{"key":"operation.type","type":"string","value":"access-pg"},{"key":"otel.scope.name","type":"string","value":"test-jaeger-find-traces"},{"key":"otel.scope.version","type":"string","value":"1.0.0"},{"key":"otel.status_description","type":"string","value":"success"},{"key":"span.kind","type":"string","value":"server"}],"traceID":"5611dce1bc9ebed65352d99a027b08fa"}],"traceID":"5611dce1bc9ebed65352d99a027b08fa"}],"errors":[],"limit":0,"offset":0,"total":0} +"#; + + let resp: Value = serde_json::from_str(&res.text().await).unwrap(); + let expected: Value = serde_json::from_str(expected).unwrap(); + assert_eq!(resp, expected); + guard.remove_all().await; }