diff --git a/src/frontend/src/instance/jaeger.rs b/src/frontend/src/instance/jaeger.rs index 980fd5d6f5..526c1926c5 100644 --- a/src/frontend/src/instance/jaeger.rs +++ b/src/frontend/src/instance/jaeger.rs @@ -24,12 +24,14 @@ use common_function::scalars::json::json_get::{ }; use common_function::scalars::udf::create_udf; use common_function::state::FunctionState; -use common_query::Output; +use common_query::{Output, OutputData}; use common_recordbatch::adapter::RecordBatchStreamAdapter; +use common_recordbatch::util; use datafusion::dataframe::DataFrame; use datafusion::execution::context::SessionContext; use datafusion::execution::SessionStateBuilder; use datafusion_expr::{col, lit, lit_timestamp_nano, wildcard, Expr, SortExpr}; +use datatypes::value::ValueRef; use query::QueryEngineRef; use serde_json::Value as JsonValue; use servers::error::{ @@ -97,7 +99,7 @@ impl JaegerQueryHandler for Instance { filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time * 1_000))); } - // It's equivalent to + // It's equivalent to the following SQL query: // // ``` // SELECT DISTINCT span_name, span_kind @@ -137,7 +139,7 @@ impl JaegerQueryHandler for Instance { start_time: Option, end_time: Option, ) -> ServerResult { - // It's equivalent to + // It's equivalent to the following SQL query: // // ``` // SELECT @@ -156,13 +158,11 @@ impl JaegerQueryHandler for Instance { let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))]; if let Some(start_time) = start_time { - // Microseconds to nanoseconds. - filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time * 1_000))); + filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time))); } if let Some(end_time) = end_time { - // Microseconds to nanoseconds. - filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time * 1_000))); + filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time))); } Ok(query_trace_table( @@ -184,10 +184,11 @@ impl JaegerQueryHandler for Instance { ctx: QueryContextRef, query_params: QueryTraceParams, ) -> ServerResult { - let selects = vec![wildcard()]; - let mut filters = vec![]; + // `service_name` is already validated in `from_jaeger_query_params()`, so no additional check needed here. + filters.push(col(SERVICE_NAME_COLUMN).eq(lit(query_params.service_name))); + if let Some(operation_name) = query_params.operation_name { filters.push(col(SPAN_NAME_COLUMN).eq(lit(operation_name))); } @@ -208,15 +209,73 @@ impl JaegerQueryHandler for Instance { filters.push(col(DURATION_NANO_COLUMN).lt_eq(lit(max_duration))); } + // Get all distinct trace ids that match the filters. + // It's equivalent to the following SQL query: + // + // ``` + // SELECT DISTINCT trace_id + // FROM + // {db}.{trace_table} + // WHERE + // service_name = '{service_name}' AND + // operation_name = '{operation_name}' AND + // timestamp >= {start_time} AND + // timestamp <= {end_time} AND + // duration >= {min_duration} AND + // duration <= {max_duration} + // LIMIT {limit} + // ```. + let output = query_trace_table( + ctx.clone(), + self.catalog_manager(), + self.query_engine(), + vec![wildcard()], + filters, + vec![], + Some(query_params.limit.unwrap_or(DEFAULT_LIMIT)), + query_params.tags, + vec![col(TRACE_ID_COLUMN)], + ) + .await?; + + // Get all traces that match the trace ids from the previous query. + // It's equivalent to the following SQL query: + // + // ``` + // SELECT * + // FROM + // {db}.{trace_table} + // WHERE + // trace_id IN ({trace_ids}) AND + // timestamp >= {start_time} AND + // timestamp <= {end_time} + // ``` + let mut filters = vec![col(TRACE_ID_COLUMN).in_list( + trace_ids_from_output(output) + .await? + .iter() + .map(lit) + .collect::>(), + false, + )]; + + if let Some(start_time) = query_params.start_time { + filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time))); + } + + if let Some(end_time) = query_params.end_time { + filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time))); + } + Ok(query_trace_table( ctx, self.catalog_manager(), self.query_engine(), - selects, + vec![wildcard()], filters, - vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order. - Some(DEFAULT_LIMIT), - query_params.tags, + vec![], + None, + None, vec![], ) .await?) @@ -458,3 +517,34 @@ fn tags_filters( json_tag_filters(dataframe, tags) } } + +// Get trace ids from the output in recordbatches. +async fn trace_ids_from_output(output: Output) -> ServerResult> { + if let OutputData::Stream(stream) = output.data { + let schema = stream.schema().clone(); + let recordbatches = util::collect(stream) + .await + .context(CollectRecordbatchSnafu)?; + + // Only contains `trace_id` column in string type. + if !recordbatches.is_empty() + && schema.num_columns() == 1 + && schema.contains_column(TRACE_ID_COLUMN) + { + let mut trace_ids = vec![]; + for recordbatch in recordbatches { + for col in recordbatch.columns().iter() { + for row_idx in 0..recordbatch.num_rows() { + if let ValueRef::String(value) = col.get_ref(row_idx) { + trace_ids.push(value.to_string()); + } + } + } + } + + return Ok(trace_ids); + } + } + + Ok(vec![]) +} diff --git a/src/servers/src/http/jaeger.rs b/src/servers/src/http/jaeger.rs index dc102b66d7..77a05e9f85 100644 --- a/src/servers/src/http/jaeger.rs +++ b/src/servers/src/http/jaeger.rs @@ -13,6 +13,8 @@ // limitations under the License. use std::collections::HashMap; +use std::fmt; +use std::str::FromStr; use std::sync::Arc; use axum::extract::{Path, Query, State}; @@ -26,7 +28,7 @@ use common_error::status_code::StatusCode; use common_query::{Output, OutputData}; use common_recordbatch::util; use common_telemetry::{debug, error, tracing, warn}; -use serde::{Deserialize, Serialize}; +use serde::{de, Deserialize, Deserializer, Serialize}; use serde_json::Value as JsonValue; use session::context::{Channel, QueryContext}; use snafu::{OptionExt, ResultExt}; @@ -215,6 +217,7 @@ pub struct JaegerQueryParams { pub operation_name: Option, /// Limit the return data. + #[serde(default, deserialize_with = "empty_string_as_none")] pub limit: Option, /// Start time of the trace in microseconds since unix epoch. @@ -224,9 +227,11 @@ pub struct JaegerQueryParams { pub end: Option, /// Max duration string value of the trace. Units can be `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`. + #[serde(default, deserialize_with = "empty_string_as_none")] pub max_duration: Option, /// Min duration string value of the trace. Units can be `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`. + #[serde(default, deserialize_with = "empty_string_as_none")] pub min_duration: Option, /// Tags of the trace in JSON format. It will be URL encoded in the raw query. @@ -238,6 +243,20 @@ pub struct JaegerQueryParams { pub span_kind: Option, } +/// Serde deserialization decorator to map empty Strings to None. +fn empty_string_as_none<'de, D, T>(de: D) -> Result, D::Error> +where + D: Deserializer<'de>, + T: FromStr, + T::Err: fmt::Display, +{ + let opt = Option::::deserialize(de)?; + match opt.as_deref() { + None | Some("") => Ok(None), + Some(s) => FromStr::from_str(s).map_err(de::Error::custom).map(Some), + } +} + fn update_query_context(query_ctx: &mut QueryContext, table_name: Option) { // db should be already handled by middlewares query_ctx.set_channel(Channel::Jaeger); @@ -403,8 +422,12 @@ pub async fn handle_get_trace( .with_label_values(&[&db, "/api/traces"]) .start_timer(); + // Convert start time and end time from microseconds to nanoseconds. + let start_time_ns = query_params.start.map(|start_us| start_us * 1000); + let end_time_ns = query_params.end.map(|end_us| end_us * 1000); + let output = match handler - .get_trace(query_ctx, &trace_id, query_params.start, query_params.end) + .get_trace(query_ctx, &trace_id, start_time_ns, end_time_ns) .await { Ok(output) => output, diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index da7d5c38b3..9c0e306fda 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -202,7 +202,10 @@ pub trait JaegerQueryHandler { end_time: Option, ) -> Result; - /// Get trace by trace id. It's used for `/api/traces/{trace_id}` API. + /// Retrieves a trace by its unique identifier. + /// + /// This method is used to handle requests to the `/api/traces/{trace_id}` endpoint. + /// It accepts optional `start_time` and `end_time` parameters in nanoseconds to filter the trace data within a specific time range. async fn get_trace( &self, ctx: QueryContextRef, diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index a2c04695c8..91c3afd22f 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -4431,65 +4431,107 @@ pub async fn test_jaeger_query_api(store_type: StorageType) { assert_eq!(StatusCode::OK, res.status()); let expected = r#" { - "data": [ - { - "traceID": "5611dce1bc9ebed65352d99a027b08ea", - "spans": [ + "data": [ { - "traceID": "5611dce1bc9ebed65352d99a027b08ea", - "spanID": "008421dbbd33a3e9", - "operationName": "access-mysql", - "references": [], - "startTime": 1738726754492421, - "duration": 100000, - "tags": [ - { - "key": "net.peer.ip", - "type": "string", - "value": "1.2.3.4" - }, - { - "key": "operation.type", - "type": "string", - "value": "access-mysql" - }, - { - "key": "otel.scope.name", - "type": "string", - "value": "test-jaeger-query-api" - }, - { - "key": "otel.scope.version", - "type": "string", - "value": "1.0.0" - }, - { - "key": "peer.service", - "type": "string", - "value": "test-jaeger-query-api" - }, - { - "key": "span.kind", - "type": "string", - "value": "server" + "traceID": "5611dce1bc9ebed65352d99a027b08ea", + "spans": [ + { + "traceID": "5611dce1bc9ebed65352d99a027b08ea", + "spanID": "008421dbbd33a3e9", + "operationName": "access-mysql", + "references": [], + "startTime": 1738726754492421, + "duration": 100000, + "tags": [ + { + "key": "net.peer.ip", + "type": "string", + "value": "1.2.3.4" + }, + { + "key": "operation.type", + "type": "string", + "value": "access-mysql" + }, + { + "key": "otel.scope.name", + "type": "string", + "value": "test-jaeger-query-api" + }, + { + "key": "otel.scope.version", + "type": "string", + "value": "1.0.0" + }, + { + "key": "peer.service", + "type": "string", + "value": "test-jaeger-query-api" + }, + { + "key": "span.kind", + "type": "string", + "value": "server" + } + ], + "logs": [], + "processID": "p1" + }, + { + "traceID": "5611dce1bc9ebed65352d99a027b08ea", + "spanID": "ffa03416a7b9ea48", + "operationName": "access-redis", + "references": [], + "startTime": 1738726754492422, + "duration": 100000, + "tags": [ + { + "key": "net.peer.ip", + "type": "string", + "value": "1.2.3.4" + }, + { + "key": "operation.type", + "type": "string", + "value": "access-redis" + }, + { + "key": "otel.scope.name", + "type": "string", + "value": "test-jaeger-query-api" + }, + { + "key": "otel.scope.version", + "type": "string", + "value": "1.0.0" + }, + { + "key": "peer.service", + "type": "string", + "value": "test-jaeger-query-api" + }, + { + "key": "span.kind", + "type": "string", + "value": "server" + } + ], + "logs": [], + "processID": "p1" + } + ], + "processes": { + "p1": { + "serviceName": "test-jaeger-query-api", + "tags": [] + } } - ], - "logs": [], - "processID": "p1" } - ], - "processes": { - "p1": { - "serviceName": "test-jaeger-query-api", - "tags": [] - } - } - } - ], - "total": 0, - "limit": 0, - "offset": 0, - "errors": [] + ], + "total": 0, + "limit": 0, + "offset": 0, + "errors": [] } "#; @@ -5006,66 +5048,108 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { .await; assert_eq!(StatusCode::OK, res.status()); let expected = r#" -{ - "data": [ - { - "traceID": "5611dce1bc9ebed65352d99a027b08ea", - "spans": [ + { + "data": [ { - "traceID": "5611dce1bc9ebed65352d99a027b08ea", - "spanID": "008421dbbd33a3e9", - "operationName": "access-mysql", - "references": [], - "startTime": 1738726754492421, - "duration": 100000, - "tags": [ - { - "key": "net.peer.ip", - "type": "string", - "value": "1.2.3.4" - }, - { - "key": "operation.type", - "type": "string", - "value": "access-mysql" - }, - { - "key": "otel.scope.name", - "type": "string", - "value": "test-jaeger-query-api" - }, - { - "key": "otel.scope.version", - "type": "string", - "value": "1.0.0" - }, - { - "key": "peer.service", - "type": "string", - "value": "test-jaeger-query-api" - }, - { - "key": "span.kind", - "type": "string", - "value": "server" + "traceID": "5611dce1bc9ebed65352d99a027b08ea", + "spans": [ + { + "traceID": "5611dce1bc9ebed65352d99a027b08ea", + "spanID": "008421dbbd33a3e9", + "operationName": "access-mysql", + "references": [], + "startTime": 1738726754492421, + "duration": 100000, + "tags": [ + { + "key": "net.peer.ip", + "type": "string", + "value": "1.2.3.4" + }, + { + "key": "operation.type", + "type": "string", + "value": "access-mysql" + }, + { + "key": "otel.scope.name", + "type": "string", + "value": "test-jaeger-query-api" + }, + { + "key": "otel.scope.version", + "type": "string", + "value": "1.0.0" + }, + { + "key": "peer.service", + "type": "string", + "value": "test-jaeger-query-api" + }, + { + "key": "span.kind", + "type": "string", + "value": "server" + } + ], + "logs": [], + "processID": "p1" + }, + { + "traceID": "5611dce1bc9ebed65352d99a027b08ea", + "spanID": "ffa03416a7b9ea48", + "operationName": "access-redis", + "references": [], + "startTime": 1738726754492422, + "duration": 100000, + "tags": [ + { + "key": "net.peer.ip", + "type": "string", + "value": "1.2.3.4" + }, + { + "key": "operation.type", + "type": "string", + "value": "access-redis" + }, + { + "key": "otel.scope.name", + "type": "string", + "value": "test-jaeger-query-api" + }, + { + "key": "otel.scope.version", + "type": "string", + "value": "1.0.0" + }, + { + "key": "peer.service", + "type": "string", + "value": "test-jaeger-query-api" + }, + { + "key": "span.kind", + "type": "string", + "value": "server" + } + ], + "logs": [], + "processID": "p1" + } + ], + "processes": { + "p1": { + "serviceName": "test-jaeger-query-api", + "tags": [] + } } - ], - "logs": [], - "processID": "p1" } - ], - "processes": { - "p1": { - "serviceName": "test-jaeger-query-api", - "tags": [] - } - } - } - ], - "total": 0, - "limit": 0, - "offset": 0, - "errors": [] + ], + "total": 0, + "limit": 0, + "offset": 0, + "errors": [] } "#;