From cf1440fc327c825f0e915a042fe6a687ac9caa5d Mon Sep 17 00:00:00 2001 From: zyy17 Date: Mon, 7 Apr 2025 17:07:31 +0800 Subject: [PATCH] refactor: add time range for jager get operations API (#5791) * refactor: add default time range for jager get operations API * refactor: use desc order for timestamp colomn * chore: modify http header name --- src/frontend/src/instance/jaeger.rs | 59 ++++++++++++------ src/servers/src/http/jaeger.rs | 82 +++++++++++++++++++++---- src/servers/src/query_handler.rs | 2 + tests-integration/tests/http.rs | 93 +++++++++++++++++++++-------- 4 files changed, 181 insertions(+), 55 deletions(-) diff --git a/src/frontend/src/instance/jaeger.rs b/src/frontend/src/instance/jaeger.rs index 94bffb75ce..edd94d450d 100644 --- a/src/frontend/src/instance/jaeger.rs +++ b/src/frontend/src/instance/jaeger.rs @@ -28,7 +28,7 @@ use common_recordbatch::adapter::RecordBatchStreamAdapter; use datafusion::dataframe::DataFrame; use datafusion::execution::context::SessionContext; use datafusion::execution::SessionStateBuilder; -use datafusion_expr::{col, lit, lit_timestamp_nano, wildcard, Expr}; +use datafusion_expr::{col, lit, lit_timestamp_nano, wildcard, Expr, SortExpr}; use query::QueryEngineRef; use serde_json::Value as JsonValue; use servers::error::{ @@ -60,9 +60,10 @@ impl JaegerQueryHandler for Instance { self.query_engine(), vec![col(SERVICE_NAME_COLUMN)], vec![], + vec![], Some(DEFAULT_LIMIT), None, - true, + vec![col(SERVICE_NAME_COLUMN)], ) .await?) } @@ -72,6 +73,8 @@ impl JaegerQueryHandler for Instance { ctx: QueryContextRef, service_name: &str, span_kind: Option<&str>, + start_time: Option, + end_time: Option, ) -> ServerResult { let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))]; @@ -83,18 +86,29 @@ impl JaegerQueryHandler for Instance { )))); } + if let Some(start_time) = start_time { + // Microseconds to nanoseconds. + filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time * 1_000))); + } + + if let Some(end_time) = end_time { + // Microseconds to nanoseconds. + filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time * 1_000))); + } + // It's equivalent to // // ``` - // SELECT - // span_name, - // span_kind + // SELECT DISTINCT span_name, span_kind // FROM // {db}.{trace_table} // WHERE - // service_name = '{service_name}' + // service_name = '{service_name}' AND + // timestamp >= {start_time} AND + // timestamp <= {end_time} AND + // span_kind = '{span_kind}' // ORDER BY - // timestamp + // span_name ASC // ```. Ok(query_trace_table( ctx, @@ -104,11 +118,13 @@ impl JaegerQueryHandler for Instance { col(SPAN_NAME_COLUMN), col(SPAN_KIND_COLUMN), col(SERVICE_NAME_COLUMN), + col(TIMESTAMP_COLUMN), ], filters, + vec![col(SPAN_NAME_COLUMN).sort(true, false)], // Sort by span_name in ascending order. Some(DEFAULT_LIMIT), None, - false, + vec![col(SPAN_NAME_COLUMN), col(SPAN_KIND_COLUMN)], ) .await?) } @@ -124,7 +140,7 @@ impl JaegerQueryHandler for Instance { // WHERE // trace_id = '{trace_id}' // ORDER BY - // timestamp + // timestamp DESC // ```. let selects = vec![wildcard()]; @@ -136,9 +152,10 @@ impl JaegerQueryHandler for Instance { self.query_engine(), selects, filters, + vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order. Some(DEFAULT_LIMIT), None, - false, + vec![], ) .await?) } @@ -178,9 +195,10 @@ impl JaegerQueryHandler for Instance { self.query_engine(), selects, filters, + vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order. Some(DEFAULT_LIMIT), query_params.tags, - false, + vec![], ) .await?) } @@ -193,9 +211,10 @@ async fn query_trace_table( query_engine: &QueryEngineRef, selects: Vec, filters: Vec, + sorts: Vec, limit: Option, tags: Option>, - distinct: bool, + distincts: Vec, ) -> ServerResult { let table_name = ctx .extension(JAEGER_QUERY_TABLE_NAME_KEY) @@ -244,13 +263,19 @@ async fn query_trace_table( })?; // Apply the distinct if needed. - let dataframe = if distinct { - dataframe.distinct().context(DataFusionSnafu)? - } else { - // for non distinct query, sort by timestamp to make results stable + let dataframe = if !distincts.is_empty() { dataframe - .sort_by(vec![col(TIMESTAMP_COLUMN)]) + .distinct_on(distincts.clone(), distincts, None) .context(DataFusionSnafu)? + } else { + dataframe + }; + + // Apply the sorts if needed. + let dataframe = if !sorts.is_empty() { + dataframe.sort(sorts).context(DataFusionSnafu)? + } else { + dataframe }; // Apply the limit if needed. diff --git a/src/servers/src/http/jaeger.rs b/src/servers/src/http/jaeger.rs index 82c72e97d2..e50b22e2b5 100644 --- a/src/servers/src/http/jaeger.rs +++ b/src/servers/src/http/jaeger.rs @@ -16,9 +16,10 @@ use std::collections::HashMap; use std::sync::Arc; use axum::extract::{Path, Query, State}; -use axum::http::StatusCode as HttpStatusCode; +use axum::http::{HeaderMap, StatusCode as HttpStatusCode}; use axum::response::IntoResponse; use axum::Extension; +use chrono::Utc; use common_catalog::consts::PARENT_SPAN_ID_COLUMN; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; @@ -49,6 +50,7 @@ pub const JAEGER_QUERY_TABLE_NAME_KEY: &str = "jaeger_query_table_name"; const REF_TYPE_CHILD_OF: &str = "CHILD_OF"; const SPAN_KIND_TIME_FMTS: [&str; 2] = ["%Y-%m-%d %H:%M:%S%.6f%z", "%Y-%m-%d %H:%M:%S%.9f%z"]; +pub const JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER: &str = "x-greptime-jaeger-query-time-range"; /// JaegerAPIResponse is the response of Jaeger HTTP API. /// The original version is `structuredResponse` which is defined in https://github.com/jaegertracing/jaeger/blob/main/cmd/query/app/http_handler.go. @@ -489,11 +491,20 @@ pub async fn handle_get_operations( Query(query_params): Query, Extension(mut query_ctx): Extension, TraceTableName(table_name): TraceTableName, + headers: HeaderMap, ) -> impl IntoResponse { debug!( - "Received Jaeger '/api/operations' request, query_params: {:?}, query_ctx: {:?}", - query_params, query_ctx + "Received Jaeger '/api/operations' request, query_params: {:?}, query_ctx: {:?}, headers: {:?}", + query_params, query_ctx, headers ); + + let (start, end) = match parse_jaeger_time_range_for_operations(&headers, &query_params) { + Ok((start, end)) => (start, end), + Err(e) => return error_response(e), + }; + + debug!("Get operations with start: {:?}, end: {:?}", start, end); + if let Some(service_name) = &query_params.service_name { update_query_context(&mut query_ctx, table_name); let query_ctx = Arc::new(query_ctx); @@ -505,7 +516,13 @@ pub async fn handle_get_operations( .start_timer(); match handler - .get_operations(query_ctx, service_name, query_params.span_kind.as_deref()) + .get_operations( + query_ctx, + service_name, + query_params.span_kind.as_deref(), + start, + end, + ) .await { Ok(output) => match covert_to_records(output).await { @@ -562,10 +579,11 @@ pub async fn handle_get_operations_by_service( Query(query_params): Query, Extension(mut query_ctx): Extension, TraceTableName(table_name): TraceTableName, + headers: HeaderMap, ) -> impl IntoResponse { debug!( - "Received Jaeger '/api/services/{}/operations' request, query_params: {:?}, query_ctx: {:?}", - service_name, query_params, query_ctx + "Received Jaeger '/api/services/{}/operations' request, query_params: {:?}, query_ctx: {:?}, headers: {:?}", + service_name, query_params, query_ctx, headers ); update_query_context(&mut query_ctx, table_name); @@ -577,7 +595,15 @@ pub async fn handle_get_operations_by_service( .with_label_values(&[&db, "/api/services"]) .start_timer(); - match handler.get_operations(query_ctx, &service_name, None).await { + let (start, end) = match parse_jaeger_time_range_for_operations(&headers, &query_params) { + Ok((start, end)) => (start, end), + Err(e) => return error_response(e), + }; + + match handler + .get_operations(query_ctx, &service_name, None, start, end) + .await + { Ok(output) => match covert_to_records(output).await { Ok(Some(records)) => match operations_from_records(records, false) { Ok(operations) => { @@ -969,11 +995,7 @@ fn operations_from_records( records: HttpRecordsOutput, contain_span_kind: bool, ) -> Result> { - let expected_schema = vec![ - (SPAN_NAME_COLUMN, "String"), - (SPAN_KIND_COLUMN, "String"), - (SERVICE_NAME_COLUMN, "String"), - ]; + let expected_schema = vec![(SPAN_NAME_COLUMN, "String"), (SPAN_KIND_COLUMN, "String")]; check_schema(&records, &expected_schema)?; let mut operations = Vec::with_capacity(records.total_rows); @@ -1064,6 +1086,42 @@ fn convert_string_to_boolean(input: &serde_json::Value) -> Option Result<(Option, Option)> { + if let Some(time_range) = headers.get(JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER) { + match time_range.to_str() { + Ok(time_range) => match humantime::parse_duration(time_range) { + Ok(duration) => { + debug!( + "Get operations with time range: {:?}, duration: {:?}", + time_range, duration + ); + let now = Utc::now().timestamp_micros(); + Ok((Some(now - duration.as_micros() as i64), Some(now))) + } + Err(e) => { + error!("Failed to parse time range header: {:?}", e); + Err(InvalidJaegerQuerySnafu { + reason: format!("invalid time range header: {:?}", time_range), + } + .build()) + } + }, + Err(e) => { + error!("Failed to convert time range header to string: {:?}", e); + Err(InvalidJaegerQuerySnafu { + reason: format!("invalid time range header: {:?}", time_range), + } + .build()) + } + } + } else { + Ok((query_params.start, query_params.end)) + } +} + #[cfg(test)] mod tests { use serde_json::{json, Number, Value as JsonValue}; diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index fbc19f160d..b4e734fca2 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -189,6 +189,8 @@ pub trait JaegerQueryHandler { ctx: QueryContextRef, service_name: &str, span_kind: Option<&str>, + start_time: Option, + end_time: Option, ) -> Result; /// Get trace by trace id. It's used for `/api/traces/{trace_id}` API. diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 018d4e2196..8cb67e49ec 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -19,6 +19,7 @@ use std::str::FromStr; use api::prom_store::remote::WriteRequest; use auth::user_provider_from_option; use axum::http::{HeaderName, HeaderValue, StatusCode}; +use chrono::Utc; use common_error::status_code::StatusCode as ErrorCode; use flate2::write::GzEncoder; use flate2::Compression; @@ -34,6 +35,7 @@ use serde_json::{json, Value}; use servers::http::handler::HealthResponse; use servers::http::header::constants::GREPTIME_LOG_TABLE_NAME_HEADER_NAME; use servers::http::header::{GREPTIME_DB_HEADER_NAME, GREPTIME_TIMEZONE_HEADER_NAME}; +use servers::http::jaeger::JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER; use servers::http::prometheus::{PrometheusJsonResponse, PrometheusResponse}; use servers::http::result::error_result::ErrorResponse; use servers::http::result::greptime_result_v1::GreptimedbV1Response; @@ -3108,7 +3110,7 @@ pub async fn test_jaeger_query_api(store_type: StorageType) { // Test `/api/operations` API. let res = client - .get("/v1/jaeger/api/operations?service=test-jaeger-query-api") + .get("/v1/jaeger/api/operations?service=test-jaeger-query-api&start=1738726754492421&end=1738726754642422") .send() .await; assert_eq!(StatusCode::OK, res.status()); @@ -3136,7 +3138,7 @@ pub async fn test_jaeger_query_api(store_type: StorageType) { // Test `/api/services/{service_name}/operations` API. let res = client - .get("/v1/jaeger/api/services/test-jaeger-query-api/operations") + .get("/v1/jaeger/api/services/test-jaeger-query-api/operations?start=1738726754492421&end=1738726754642422") .send() .await; assert_eq!(StatusCode::OK, res.status()); @@ -3170,10 +3172,10 @@ pub async fn test_jaeger_query_api(store_type: StorageType) { "spans": [ { "traceID": "5611dce1bc9ebed65352d99a027b08ea", - "spanID": "008421dbbd33a3e9", - "operationName": "access-mysql", + "spanID": "ffa03416a7b9ea48", + "operationName": "access-redis", "references": [], - "startTime": 1738726754492421, + "startTime": 1738726754492422, "duration": 100000, "tags": [ { @@ -3184,7 +3186,7 @@ pub async fn test_jaeger_query_api(store_type: StorageType) { { "key": "operation.type", "type": "string", - "value": "access-mysql" + "value": "access-redis" }, { "key": "otel.scope.name", @@ -3212,10 +3214,10 @@ pub async fn test_jaeger_query_api(store_type: StorageType) { }, { "traceID": "5611dce1bc9ebed65352d99a027b08ea", - "spanID": "ffa03416a7b9ea48", - "operationName": "access-redis", + "spanID": "008421dbbd33a3e9", + "operationName": "access-mysql", "references": [], - "startTime": 1738726754492422, + "startTime": 1738726754492421, "duration": 100000, "tags": [ { @@ -3226,7 +3228,7 @@ pub async fn test_jaeger_query_api(store_type: StorageType) { { "key": "operation.type", "type": "string", - "value": "access-redis" + "value": "access-mysql" }, { "key": "otel.scope.name", @@ -3472,6 +3474,34 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { } } ] + }, + { + "scope": { + "name": "test-jaeger-get-operations", + "version": "1.0.0" + }, + "spans": [ + { + "traceId": "5611dce1bc9ebed65352d99a027b08ff", + "spanId": "ffa03416a7b9ea48", + "name": "access-pg", + "kind": 2, + "startTimeUnixNano": "1738726754492422000", + "endTimeUnixNano": "1738726754592422000", + "attributes": [ + { + "key": "operation.type", + "value": { + "stringValue": "access-pg" + } + } + ], + "status": { + "message": "success", + "code": 0 + } + } + ] } ], "schemaUrl": "https://opentelemetry.io/schemas/1.4.0" @@ -3480,8 +3510,22 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { } "#; - let req: ExportTraceServiceRequest = serde_json::from_str(content).unwrap(); + let mut req: ExportTraceServiceRequest = serde_json::from_str(content).unwrap(); + // Modify timestamp fields + let now = Utc::now().timestamp_nanos_opt().unwrap() as u64; + for span in req.resource_spans.iter_mut() { + for scope_span in span.scope_spans.iter_mut() { + // Only modify the timestamp fields for the span with the name "test-jaeger-get-operations" to current time. + if scope_span.scope.as_ref().unwrap().name == "test-jaeger-get-operations" { + for span in scope_span.spans.iter_mut() { + span.start_time_unix_nano = now - 5_000_000_000; // 5 seconds ago + span.end_time_unix_nano = now; + } + } + } + } let body = req.encode_to_vec(); + // write traces data. let res = send_req( &client, @@ -3532,6 +3576,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { let res = client .get("/v1/jaeger/api/operations?service=test-jaeger-query-api") .header("x-greptime-trace-table-name", "mytable") + .header(JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER, "3 days") .send() .await; assert_eq!(StatusCode::OK, res.status()); @@ -3539,15 +3584,11 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { { "data": [ { - "name": "access-mysql", - "spanKind": "server" - }, - { - "name": "access-redis", + "name": "access-pg", "spanKind": "server" } ], - "total": 2, + "total": 1, "limit": 0, "offset": 0, "errors": [] @@ -3559,7 +3600,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { // Test `/api/services/{service_name}/operations` API. let res = client - .get("/v1/jaeger/api/services/test-jaeger-query-api/operations") + .get("/v1/jaeger/api/services/test-jaeger-query-api/operations?start=1738726754492421&end=1738726754642422") .header("x-greptime-trace-table-name", "mytable") .send() .await; @@ -3594,10 +3635,10 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { "spans": [ { "traceID": "5611dce1bc9ebed65352d99a027b08ea", - "spanID": "008421dbbd33a3e9", - "operationName": "access-mysql", + "spanID": "ffa03416a7b9ea48", + "operationName": "access-redis", "references": [], - "startTime": 1738726754492421, + "startTime": 1738726754492422, "duration": 100000, "tags": [ { @@ -3608,7 +3649,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { { "key": "operation.type", "type": "string", - "value": "access-mysql" + "value": "access-redis" }, { "key": "otel.scope.name", @@ -3636,10 +3677,10 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { }, { "traceID": "5611dce1bc9ebed65352d99a027b08ea", - "spanID": "ffa03416a7b9ea48", - "operationName": "access-redis", + "spanID": "008421dbbd33a3e9", + "operationName": "access-mysql", "references": [], - "startTime": 1738726754492422, + "startTime": 1738726754492421, "duration": 100000, "tags": [ { @@ -3650,7 +3691,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { { "key": "operation.type", "type": "string", - "value": "access-redis" + "value": "access-mysql" }, { "key": "otel.scope.name",