refactor: add time range for jager get operations API (#5791)

* refactor: add default time range for jager get operations API

* refactor: use desc order for timestamp colomn

* chore: modify http header name
This commit is contained in:
zyy17
2025-04-07 17:07:31 +08:00
committed by GitHub
parent 21a209f7ba
commit cf1440fc32
4 changed files with 181 additions and 55 deletions

View File

@@ -28,7 +28,7 @@ use common_recordbatch::adapter::RecordBatchStreamAdapter;
use datafusion::dataframe::DataFrame;
use datafusion::execution::context::SessionContext;
use datafusion::execution::SessionStateBuilder;
use datafusion_expr::{col, lit, lit_timestamp_nano, wildcard, Expr};
use datafusion_expr::{col, lit, lit_timestamp_nano, wildcard, Expr, SortExpr};
use query::QueryEngineRef;
use serde_json::Value as JsonValue;
use servers::error::{
@@ -60,9 +60,10 @@ impl JaegerQueryHandler for Instance {
self.query_engine(),
vec![col(SERVICE_NAME_COLUMN)],
vec![],
vec![],
Some(DEFAULT_LIMIT),
None,
true,
vec![col(SERVICE_NAME_COLUMN)],
)
.await?)
}
@@ -72,6 +73,8 @@ impl JaegerQueryHandler for Instance {
ctx: QueryContextRef,
service_name: &str,
span_kind: Option<&str>,
start_time: Option<i64>,
end_time: Option<i64>,
) -> ServerResult<Output> {
let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))];
@@ -83,18 +86,29 @@ impl JaegerQueryHandler for Instance {
))));
}
if let Some(start_time) = start_time {
// Microseconds to nanoseconds.
filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time * 1_000)));
}
if let Some(end_time) = end_time {
// Microseconds to nanoseconds.
filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time * 1_000)));
}
// It's equivalent to
//
// ```
// SELECT
// span_name,
// span_kind
// SELECT DISTINCT span_name, span_kind
// FROM
// {db}.{trace_table}
// WHERE
// service_name = '{service_name}'
// service_name = '{service_name}' AND
// timestamp >= {start_time} AND
// timestamp <= {end_time} AND
// span_kind = '{span_kind}'
// ORDER BY
// timestamp
// span_name ASC
// ```.
Ok(query_trace_table(
ctx,
@@ -104,11 +118,13 @@ impl JaegerQueryHandler for Instance {
col(SPAN_NAME_COLUMN),
col(SPAN_KIND_COLUMN),
col(SERVICE_NAME_COLUMN),
col(TIMESTAMP_COLUMN),
],
filters,
vec![col(SPAN_NAME_COLUMN).sort(true, false)], // Sort by span_name in ascending order.
Some(DEFAULT_LIMIT),
None,
false,
vec![col(SPAN_NAME_COLUMN), col(SPAN_KIND_COLUMN)],
)
.await?)
}
@@ -124,7 +140,7 @@ impl JaegerQueryHandler for Instance {
// WHERE
// trace_id = '{trace_id}'
// ORDER BY
// timestamp
// timestamp DESC
// ```.
let selects = vec![wildcard()];
@@ -136,9 +152,10 @@ impl JaegerQueryHandler for Instance {
self.query_engine(),
selects,
filters,
vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
Some(DEFAULT_LIMIT),
None,
false,
vec![],
)
.await?)
}
@@ -178,9 +195,10 @@ impl JaegerQueryHandler for Instance {
self.query_engine(),
selects,
filters,
vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
Some(DEFAULT_LIMIT),
query_params.tags,
false,
vec![],
)
.await?)
}
@@ -193,9 +211,10 @@ async fn query_trace_table(
query_engine: &QueryEngineRef,
selects: Vec<Expr>,
filters: Vec<Expr>,
sorts: Vec<SortExpr>,
limit: Option<usize>,
tags: Option<HashMap<String, JsonValue>>,
distinct: bool,
distincts: Vec<Expr>,
) -> ServerResult<Output> {
let table_name = ctx
.extension(JAEGER_QUERY_TABLE_NAME_KEY)
@@ -244,13 +263,19 @@ async fn query_trace_table(
})?;
// Apply the distinct if needed.
let dataframe = if distinct {
dataframe.distinct().context(DataFusionSnafu)?
} else {
// for non distinct query, sort by timestamp to make results stable
let dataframe = if !distincts.is_empty() {
dataframe
.sort_by(vec![col(TIMESTAMP_COLUMN)])
.distinct_on(distincts.clone(), distincts, None)
.context(DataFusionSnafu)?
} else {
dataframe
};
// Apply the sorts if needed.
let dataframe = if !sorts.is_empty() {
dataframe.sort(sorts).context(DataFusionSnafu)?
} else {
dataframe
};
// Apply the limit if needed.

View File

@@ -16,9 +16,10 @@ use std::collections::HashMap;
use std::sync::Arc;
use axum::extract::{Path, Query, State};
use axum::http::StatusCode as HttpStatusCode;
use axum::http::{HeaderMap, StatusCode as HttpStatusCode};
use axum::response::IntoResponse;
use axum::Extension;
use chrono::Utc;
use common_catalog::consts::PARENT_SPAN_ID_COLUMN;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
@@ -49,6 +50,7 @@ pub const JAEGER_QUERY_TABLE_NAME_KEY: &str = "jaeger_query_table_name";
const REF_TYPE_CHILD_OF: &str = "CHILD_OF";
const SPAN_KIND_TIME_FMTS: [&str; 2] = ["%Y-%m-%d %H:%M:%S%.6f%z", "%Y-%m-%d %H:%M:%S%.9f%z"];
pub const JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER: &str = "x-greptime-jaeger-query-time-range";
/// JaegerAPIResponse is the response of Jaeger HTTP API.
/// The original version is `structuredResponse` which is defined in https://github.com/jaegertracing/jaeger/blob/main/cmd/query/app/http_handler.go.
@@ -489,11 +491,20 @@ pub async fn handle_get_operations(
Query(query_params): Query<JaegerQueryParams>,
Extension(mut query_ctx): Extension<QueryContext>,
TraceTableName(table_name): TraceTableName,
headers: HeaderMap,
) -> impl IntoResponse {
debug!(
"Received Jaeger '/api/operations' request, query_params: {:?}, query_ctx: {:?}",
query_params, query_ctx
"Received Jaeger '/api/operations' request, query_params: {:?}, query_ctx: {:?}, headers: {:?}",
query_params, query_ctx, headers
);
let (start, end) = match parse_jaeger_time_range_for_operations(&headers, &query_params) {
Ok((start, end)) => (start, end),
Err(e) => return error_response(e),
};
debug!("Get operations with start: {:?}, end: {:?}", start, end);
if let Some(service_name) = &query_params.service_name {
update_query_context(&mut query_ctx, table_name);
let query_ctx = Arc::new(query_ctx);
@@ -505,7 +516,13 @@ pub async fn handle_get_operations(
.start_timer();
match handler
.get_operations(query_ctx, service_name, query_params.span_kind.as_deref())
.get_operations(
query_ctx,
service_name,
query_params.span_kind.as_deref(),
start,
end,
)
.await
{
Ok(output) => match covert_to_records(output).await {
@@ -562,10 +579,11 @@ pub async fn handle_get_operations_by_service(
Query(query_params): Query<JaegerQueryParams>,
Extension(mut query_ctx): Extension<QueryContext>,
TraceTableName(table_name): TraceTableName,
headers: HeaderMap,
) -> impl IntoResponse {
debug!(
"Received Jaeger '/api/services/{}/operations' request, query_params: {:?}, query_ctx: {:?}",
service_name, query_params, query_ctx
"Received Jaeger '/api/services/{}/operations' request, query_params: {:?}, query_ctx: {:?}, headers: {:?}",
service_name, query_params, query_ctx, headers
);
update_query_context(&mut query_ctx, table_name);
@@ -577,7 +595,15 @@ pub async fn handle_get_operations_by_service(
.with_label_values(&[&db, "/api/services"])
.start_timer();
match handler.get_operations(query_ctx, &service_name, None).await {
let (start, end) = match parse_jaeger_time_range_for_operations(&headers, &query_params) {
Ok((start, end)) => (start, end),
Err(e) => return error_response(e),
};
match handler
.get_operations(query_ctx, &service_name, None, start, end)
.await
{
Ok(output) => match covert_to_records(output).await {
Ok(Some(records)) => match operations_from_records(records, false) {
Ok(operations) => {
@@ -969,11 +995,7 @@ fn operations_from_records(
records: HttpRecordsOutput,
contain_span_kind: bool,
) -> Result<Vec<Operation>> {
let expected_schema = vec![
(SPAN_NAME_COLUMN, "String"),
(SPAN_KIND_COLUMN, "String"),
(SERVICE_NAME_COLUMN, "String"),
];
let expected_schema = vec![(SPAN_NAME_COLUMN, "String"), (SPAN_KIND_COLUMN, "String")];
check_schema(&records, &expected_schema)?;
let mut operations = Vec::with_capacity(records.total_rows);
@@ -1064,6 +1086,42 @@ fn convert_string_to_boolean(input: &serde_json::Value) -> Option<serde_json::Va
None
}
fn parse_jaeger_time_range_for_operations(
headers: &HeaderMap,
query_params: &JaegerQueryParams,
) -> Result<(Option<i64>, Option<i64>)> {
if let Some(time_range) = headers.get(JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER) {
match time_range.to_str() {
Ok(time_range) => match humantime::parse_duration(time_range) {
Ok(duration) => {
debug!(
"Get operations with time range: {:?}, duration: {:?}",
time_range, duration
);
let now = Utc::now().timestamp_micros();
Ok((Some(now - duration.as_micros() as i64), Some(now)))
}
Err(e) => {
error!("Failed to parse time range header: {:?}", e);
Err(InvalidJaegerQuerySnafu {
reason: format!("invalid time range header: {:?}", time_range),
}
.build())
}
},
Err(e) => {
error!("Failed to convert time range header to string: {:?}", e);
Err(InvalidJaegerQuerySnafu {
reason: format!("invalid time range header: {:?}", time_range),
}
.build())
}
}
} else {
Ok((query_params.start, query_params.end))
}
}
#[cfg(test)]
mod tests {
use serde_json::{json, Number, Value as JsonValue};

View File

@@ -189,6 +189,8 @@ pub trait JaegerQueryHandler {
ctx: QueryContextRef,
service_name: &str,
span_kind: Option<&str>,
start_time: Option<i64>,
end_time: Option<i64>,
) -> Result<Output>;
/// Get trace by trace id. It's used for `/api/traces/{trace_id}` API.

View File

@@ -19,6 +19,7 @@ use std::str::FromStr;
use api::prom_store::remote::WriteRequest;
use auth::user_provider_from_option;
use axum::http::{HeaderName, HeaderValue, StatusCode};
use chrono::Utc;
use common_error::status_code::StatusCode as ErrorCode;
use flate2::write::GzEncoder;
use flate2::Compression;
@@ -34,6 +35,7 @@ use serde_json::{json, Value};
use servers::http::handler::HealthResponse;
use servers::http::header::constants::GREPTIME_LOG_TABLE_NAME_HEADER_NAME;
use servers::http::header::{GREPTIME_DB_HEADER_NAME, GREPTIME_TIMEZONE_HEADER_NAME};
use servers::http::jaeger::JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER;
use servers::http::prometheus::{PrometheusJsonResponse, PrometheusResponse};
use servers::http::result::error_result::ErrorResponse;
use servers::http::result::greptime_result_v1::GreptimedbV1Response;
@@ -3108,7 +3110,7 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
// Test `/api/operations` API.
let res = client
.get("/v1/jaeger/api/operations?service=test-jaeger-query-api")
.get("/v1/jaeger/api/operations?service=test-jaeger-query-api&start=1738726754492421&end=1738726754642422")
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
@@ -3136,7 +3138,7 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
// Test `/api/services/{service_name}/operations` API.
let res = client
.get("/v1/jaeger/api/services/test-jaeger-query-api/operations")
.get("/v1/jaeger/api/services/test-jaeger-query-api/operations?start=1738726754492421&end=1738726754642422")
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
@@ -3170,10 +3172,10 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
"spans": [
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spanID": "008421dbbd33a3e9",
"operationName": "access-mysql",
"spanID": "ffa03416a7b9ea48",
"operationName": "access-redis",
"references": [],
"startTime": 1738726754492421,
"startTime": 1738726754492422,
"duration": 100000,
"tags": [
{
@@ -3184,7 +3186,7 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
{
"key": "operation.type",
"type": "string",
"value": "access-mysql"
"value": "access-redis"
},
{
"key": "otel.scope.name",
@@ -3212,10 +3214,10 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
},
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spanID": "ffa03416a7b9ea48",
"operationName": "access-redis",
"spanID": "008421dbbd33a3e9",
"operationName": "access-mysql",
"references": [],
"startTime": 1738726754492422,
"startTime": 1738726754492421,
"duration": 100000,
"tags": [
{
@@ -3226,7 +3228,7 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
{
"key": "operation.type",
"type": "string",
"value": "access-redis"
"value": "access-mysql"
},
{
"key": "otel.scope.name",
@@ -3472,6 +3474,34 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
}
}
]
},
{
"scope": {
"name": "test-jaeger-get-operations",
"version": "1.0.0"
},
"spans": [
{
"traceId": "5611dce1bc9ebed65352d99a027b08ff",
"spanId": "ffa03416a7b9ea48",
"name": "access-pg",
"kind": 2,
"startTimeUnixNano": "1738726754492422000",
"endTimeUnixNano": "1738726754592422000",
"attributes": [
{
"key": "operation.type",
"value": {
"stringValue": "access-pg"
}
}
],
"status": {
"message": "success",
"code": 0
}
}
]
}
],
"schemaUrl": "https://opentelemetry.io/schemas/1.4.0"
@@ -3480,8 +3510,22 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
}
"#;
let req: ExportTraceServiceRequest = serde_json::from_str(content).unwrap();
let mut req: ExportTraceServiceRequest = serde_json::from_str(content).unwrap();
// Modify timestamp fields
let now = Utc::now().timestamp_nanos_opt().unwrap() as u64;
for span in req.resource_spans.iter_mut() {
for scope_span in span.scope_spans.iter_mut() {
// Only modify the timestamp fields for the span with the name "test-jaeger-get-operations" to current time.
if scope_span.scope.as_ref().unwrap().name == "test-jaeger-get-operations" {
for span in scope_span.spans.iter_mut() {
span.start_time_unix_nano = now - 5_000_000_000; // 5 seconds ago
span.end_time_unix_nano = now;
}
}
}
}
let body = req.encode_to_vec();
// write traces data.
let res = send_req(
&client,
@@ -3532,6 +3576,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
let res = client
.get("/v1/jaeger/api/operations?service=test-jaeger-query-api")
.header("x-greptime-trace-table-name", "mytable")
.header(JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER, "3 days")
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
@@ -3539,15 +3584,11 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
{
"data": [
{
"name": "access-mysql",
"spanKind": "server"
},
{
"name": "access-redis",
"name": "access-pg",
"spanKind": "server"
}
],
"total": 2,
"total": 1,
"limit": 0,
"offset": 0,
"errors": []
@@ -3559,7 +3600,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
// Test `/api/services/{service_name}/operations` API.
let res = client
.get("/v1/jaeger/api/services/test-jaeger-query-api/operations")
.get("/v1/jaeger/api/services/test-jaeger-query-api/operations?start=1738726754492421&end=1738726754642422")
.header("x-greptime-trace-table-name", "mytable")
.send()
.await;
@@ -3594,10 +3635,10 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
"spans": [
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spanID": "008421dbbd33a3e9",
"operationName": "access-mysql",
"spanID": "ffa03416a7b9ea48",
"operationName": "access-redis",
"references": [],
"startTime": 1738726754492421,
"startTime": 1738726754492422,
"duration": 100000,
"tags": [
{
@@ -3608,7 +3649,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
{
"key": "operation.type",
"type": "string",
"value": "access-mysql"
"value": "access-redis"
},
{
"key": "otel.scope.name",
@@ -3636,10 +3677,10 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
},
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spanID": "ffa03416a7b9ea48",
"operationName": "access-redis",
"spanID": "008421dbbd33a3e9",
"operationName": "access-mysql",
"references": [],
"startTime": 1738726754492422,
"startTime": 1738726754492421,
"duration": 100000,
"tags": [
{
@@ -3650,7 +3691,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
{
"key": "operation.type",
"type": "string",
"value": "access-redis"
"value": "access-mysql"
},
{
"key": "otel.scope.name",