fix(jaeger-api): incorrect find_traces() logic and multiple api compatible issues (#6293)

* fix: use `limit` params in jaeger http

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: only parse `max_duration` and `min_duration` when it's not empty

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* fix: handle the input for empty `limit` string

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* fix: missing the fileter for `service_name`

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* test: fix ci errors

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* fix: incorrect behavior of find_traces

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* fix: the logic of `find_traces()`

The correct logic should be:

1. Get all trace ids that match the filters;

2. Get all traces that match the trace ids from the previous query;

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* fix: integration test errors

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: add `empty_string_as_none`

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: refine naming

Signed-off-by: zyy17 <zyylsxm@gmail.com>

---------

Signed-off-by: zyy17 <zyylsxm@gmail.com>
This commit is contained in:
zyy17
2025-06-18 16:01:36 +08:00
committed by GitHub
parent 4d4bfb7d8b
commit f6ce6fe385
4 changed files with 329 additions and 129 deletions

View File

@@ -24,12 +24,14 @@ use common_function::scalars::json::json_get::{
};
use common_function::scalars::udf::create_udf;
use common_function::state::FunctionState;
use common_query::Output;
use common_query::{Output, OutputData};
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::util;
use datafusion::dataframe::DataFrame;
use datafusion::execution::context::SessionContext;
use datafusion::execution::SessionStateBuilder;
use datafusion_expr::{col, lit, lit_timestamp_nano, wildcard, Expr, SortExpr};
use datatypes::value::ValueRef;
use query::QueryEngineRef;
use serde_json::Value as JsonValue;
use servers::error::{
@@ -97,7 +99,7 @@ impl JaegerQueryHandler for Instance {
filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time * 1_000)));
}
// It's equivalent to
// It's equivalent to the following SQL query:
//
// ```
// SELECT DISTINCT span_name, span_kind
@@ -137,7 +139,7 @@ impl JaegerQueryHandler for Instance {
start_time: Option<i64>,
end_time: Option<i64>,
) -> ServerResult<Output> {
// It's equivalent to
// It's equivalent to the following SQL query:
//
// ```
// SELECT
@@ -156,13 +158,11 @@ impl JaegerQueryHandler for Instance {
let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
if let Some(start_time) = start_time {
// Microseconds to nanoseconds.
filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time * 1_000)));
filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
}
if let Some(end_time) = end_time {
// Microseconds to nanoseconds.
filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time * 1_000)));
filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
}
Ok(query_trace_table(
@@ -184,10 +184,11 @@ impl JaegerQueryHandler for Instance {
ctx: QueryContextRef,
query_params: QueryTraceParams,
) -> ServerResult<Output> {
let selects = vec![wildcard()];
let mut filters = vec![];
// `service_name` is already validated in `from_jaeger_query_params()`, so no additional check needed here.
filters.push(col(SERVICE_NAME_COLUMN).eq(lit(query_params.service_name)));
if let Some(operation_name) = query_params.operation_name {
filters.push(col(SPAN_NAME_COLUMN).eq(lit(operation_name)));
}
@@ -208,15 +209,73 @@ impl JaegerQueryHandler for Instance {
filters.push(col(DURATION_NANO_COLUMN).lt_eq(lit(max_duration)));
}
// Get all distinct trace ids that match the filters.
// It's equivalent to the following SQL query:
//
// ```
// SELECT DISTINCT trace_id
// FROM
// {db}.{trace_table}
// WHERE
// service_name = '{service_name}' AND
// operation_name = '{operation_name}' AND
// timestamp >= {start_time} AND
// timestamp <= {end_time} AND
// duration >= {min_duration} AND
// duration <= {max_duration}
// LIMIT {limit}
// ```.
let output = query_trace_table(
ctx.clone(),
self.catalog_manager(),
self.query_engine(),
vec![wildcard()],
filters,
vec![],
Some(query_params.limit.unwrap_or(DEFAULT_LIMIT)),
query_params.tags,
vec![col(TRACE_ID_COLUMN)],
)
.await?;
// Get all traces that match the trace ids from the previous query.
// It's equivalent to the following SQL query:
//
// ```
// SELECT *
// FROM
// {db}.{trace_table}
// WHERE
// trace_id IN ({trace_ids}) AND
// timestamp >= {start_time} AND
// timestamp <= {end_time}
// ```
let mut filters = vec![col(TRACE_ID_COLUMN).in_list(
trace_ids_from_output(output)
.await?
.iter()
.map(lit)
.collect::<Vec<Expr>>(),
false,
)];
if let Some(start_time) = query_params.start_time {
filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
}
if let Some(end_time) = query_params.end_time {
filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
}
Ok(query_trace_table(
ctx,
self.catalog_manager(),
self.query_engine(),
selects,
vec![wildcard()],
filters,
vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
Some(DEFAULT_LIMIT),
query_params.tags,
vec![],
None,
None,
vec![],
)
.await?)
@@ -458,3 +517,34 @@ fn tags_filters(
json_tag_filters(dataframe, tags)
}
}
// Get trace ids from the output in recordbatches.
async fn trace_ids_from_output(output: Output) -> ServerResult<Vec<String>> {
if let OutputData::Stream(stream) = output.data {
let schema = stream.schema().clone();
let recordbatches = util::collect(stream)
.await
.context(CollectRecordbatchSnafu)?;
// Only contains `trace_id` column in string type.
if !recordbatches.is_empty()
&& schema.num_columns() == 1
&& schema.contains_column(TRACE_ID_COLUMN)
{
let mut trace_ids = vec![];
for recordbatch in recordbatches {
for col in recordbatch.columns().iter() {
for row_idx in 0..recordbatch.num_rows() {
if let ValueRef::String(value) = col.get_ref(row_idx) {
trace_ids.push(value.to_string());
}
}
}
}
return Ok(trace_ids);
}
}
Ok(vec![])
}

View File

@@ -13,6 +13,8 @@
// limitations under the License.
use std::collections::HashMap;
use std::fmt;
use std::str::FromStr;
use std::sync::Arc;
use axum::extract::{Path, Query, State};
@@ -26,7 +28,7 @@ use common_error::status_code::StatusCode;
use common_query::{Output, OutputData};
use common_recordbatch::util;
use common_telemetry::{debug, error, tracing, warn};
use serde::{Deserialize, Serialize};
use serde::{de, Deserialize, Deserializer, Serialize};
use serde_json::Value as JsonValue;
use session::context::{Channel, QueryContext};
use snafu::{OptionExt, ResultExt};
@@ -215,6 +217,7 @@ pub struct JaegerQueryParams {
pub operation_name: Option<String>,
/// Limit the return data.
#[serde(default, deserialize_with = "empty_string_as_none")]
pub limit: Option<usize>,
/// Start time of the trace in microseconds since unix epoch.
@@ -224,9 +227,11 @@ pub struct JaegerQueryParams {
pub end: Option<i64>,
/// Max duration string value of the trace. Units can be `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`.
#[serde(default, deserialize_with = "empty_string_as_none")]
pub max_duration: Option<String>,
/// Min duration string value of the trace. Units can be `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`.
#[serde(default, deserialize_with = "empty_string_as_none")]
pub min_duration: Option<String>,
/// Tags of the trace in JSON format. It will be URL encoded in the raw query.
@@ -238,6 +243,20 @@ pub struct JaegerQueryParams {
pub span_kind: Option<String>,
}
/// Serde deserialization decorator to map empty Strings to None.
fn empty_string_as_none<'de, D, T>(de: D) -> Result<Option<T>, D::Error>
where
D: Deserializer<'de>,
T: FromStr,
T::Err: fmt::Display,
{
let opt = Option::<String>::deserialize(de)?;
match opt.as_deref() {
None | Some("") => Ok(None),
Some(s) => FromStr::from_str(s).map_err(de::Error::custom).map(Some),
}
}
fn update_query_context(query_ctx: &mut QueryContext, table_name: Option<String>) {
// db should be already handled by middlewares
query_ctx.set_channel(Channel::Jaeger);
@@ -403,8 +422,12 @@ pub async fn handle_get_trace(
.with_label_values(&[&db, "/api/traces"])
.start_timer();
// Convert start time and end time from microseconds to nanoseconds.
let start_time_ns = query_params.start.map(|start_us| start_us * 1000);
let end_time_ns = query_params.end.map(|end_us| end_us * 1000);
let output = match handler
.get_trace(query_ctx, &trace_id, query_params.start, query_params.end)
.get_trace(query_ctx, &trace_id, start_time_ns, end_time_ns)
.await
{
Ok(output) => output,

View File

@@ -202,7 +202,10 @@ pub trait JaegerQueryHandler {
end_time: Option<i64>,
) -> Result<Output>;
/// Get trace by trace id. It's used for `/api/traces/{trace_id}` API.
/// Retrieves a trace by its unique identifier.
///
/// This method is used to handle requests to the `/api/traces/{trace_id}` endpoint.
/// It accepts optional `start_time` and `end_time` parameters in nanoseconds to filter the trace data within a specific time range.
async fn get_trace(
&self,
ctx: QueryContextRef,

View File

@@ -4431,65 +4431,107 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
assert_eq!(StatusCode::OK, res.status());
let expected = r#"
{
"data": [
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spans": [
"data": [
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spanID": "008421dbbd33a3e9",
"operationName": "access-mysql",
"references": [],
"startTime": 1738726754492421,
"duration": 100000,
"tags": [
{
"key": "net.peer.ip",
"type": "string",
"value": "1.2.3.4"
},
{
"key": "operation.type",
"type": "string",
"value": "access-mysql"
},
{
"key": "otel.scope.name",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "otel.scope.version",
"type": "string",
"value": "1.0.0"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "span.kind",
"type": "string",
"value": "server"
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spans": [
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spanID": "008421dbbd33a3e9",
"operationName": "access-mysql",
"references": [],
"startTime": 1738726754492421,
"duration": 100000,
"tags": [
{
"key": "net.peer.ip",
"type": "string",
"value": "1.2.3.4"
},
{
"key": "operation.type",
"type": "string",
"value": "access-mysql"
},
{
"key": "otel.scope.name",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "otel.scope.version",
"type": "string",
"value": "1.0.0"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "span.kind",
"type": "string",
"value": "server"
}
],
"logs": [],
"processID": "p1"
},
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spanID": "ffa03416a7b9ea48",
"operationName": "access-redis",
"references": [],
"startTime": 1738726754492422,
"duration": 100000,
"tags": [
{
"key": "net.peer.ip",
"type": "string",
"value": "1.2.3.4"
},
{
"key": "operation.type",
"type": "string",
"value": "access-redis"
},
{
"key": "otel.scope.name",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "otel.scope.version",
"type": "string",
"value": "1.0.0"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "span.kind",
"type": "string",
"value": "server"
}
],
"logs": [],
"processID": "p1"
}
],
"processes": {
"p1": {
"serviceName": "test-jaeger-query-api",
"tags": []
}
}
],
"logs": [],
"processID": "p1"
}
],
"processes": {
"p1": {
"serviceName": "test-jaeger-query-api",
"tags": []
}
}
}
],
"total": 0,
"limit": 0,
"offset": 0,
"errors": []
],
"total": 0,
"limit": 0,
"offset": 0,
"errors": []
}
"#;
@@ -5006,66 +5048,108 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
.await;
assert_eq!(StatusCode::OK, res.status());
let expected = r#"
{
"data": [
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spans": [
{
"data": [
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spanID": "008421dbbd33a3e9",
"operationName": "access-mysql",
"references": [],
"startTime": 1738726754492421,
"duration": 100000,
"tags": [
{
"key": "net.peer.ip",
"type": "string",
"value": "1.2.3.4"
},
{
"key": "operation.type",
"type": "string",
"value": "access-mysql"
},
{
"key": "otel.scope.name",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "otel.scope.version",
"type": "string",
"value": "1.0.0"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "span.kind",
"type": "string",
"value": "server"
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spans": [
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spanID": "008421dbbd33a3e9",
"operationName": "access-mysql",
"references": [],
"startTime": 1738726754492421,
"duration": 100000,
"tags": [
{
"key": "net.peer.ip",
"type": "string",
"value": "1.2.3.4"
},
{
"key": "operation.type",
"type": "string",
"value": "access-mysql"
},
{
"key": "otel.scope.name",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "otel.scope.version",
"type": "string",
"value": "1.0.0"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "span.kind",
"type": "string",
"value": "server"
}
],
"logs": [],
"processID": "p1"
},
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spanID": "ffa03416a7b9ea48",
"operationName": "access-redis",
"references": [],
"startTime": 1738726754492422,
"duration": 100000,
"tags": [
{
"key": "net.peer.ip",
"type": "string",
"value": "1.2.3.4"
},
{
"key": "operation.type",
"type": "string",
"value": "access-redis"
},
{
"key": "otel.scope.name",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "otel.scope.version",
"type": "string",
"value": "1.0.0"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "span.kind",
"type": "string",
"value": "server"
}
],
"logs": [],
"processID": "p1"
}
],
"processes": {
"p1": {
"serviceName": "test-jaeger-query-api",
"tags": []
}
}
],
"logs": [],
"processID": "p1"
}
],
"processes": {
"p1": {
"serviceName": "test-jaeger-query-api",
"tags": []
}
}
}
],
"total": 0,
"limit": 0,
"offset": 0,
"errors": []
],
"total": 0,
"limit": 0,
"offset": 0,
"errors": []
}
"#;