chore: optimize search traces from Grafana (#7298)

* chore: minor update

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: add test

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update ua setup

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
shuiyisong
2025-11-26 16:06:15 +08:00
committed by GitHub
parent 09d1074e23
commit 713525797a
3 changed files with 352 additions and 18 deletions

View File

@@ -32,15 +32,16 @@ use common_telemetry::warn;
use datafusion::dataframe::DataFrame;
use datafusion::execution::SessionStateBuilder;
use datafusion::execution::context::SessionContext;
use datafusion::functions_window::expr_fn::row_number;
use datafusion_expr::select_expr::SelectExpr;
use datafusion_expr::{Expr, SortExpr, col, lit, lit_timestamp_nano, wildcard};
use datafusion_expr::{Expr, ExprFunctionExt, SortExpr, col, lit, lit_timestamp_nano, wildcard};
use query::QueryEngineRef;
use serde_json::Value as JsonValue;
use servers::error::{
CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
TableNotFoundSnafu,
};
use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams};
use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams, TraceUserAgent};
use servers::otlp::trace::{
DURATION_NANO_COLUMN, KEY_OTEL_STATUS_ERROR_KEY, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_ERROR,
@@ -49,12 +50,14 @@ use servers::otlp::trace::{
use servers::query_handler::JaegerQueryHandler;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use table::TableRef;
use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
use table::table::adapter::DfTableProviderAdapter;
use crate::instance::Instance;
const DEFAULT_LIMIT: usize = 2000;
const KEY_RN: &str = "greptime_rn";
#[async_trait]
impl JaegerQueryHandler for Instance {
@@ -259,18 +262,41 @@ impl JaegerQueryHandler for Instance {
filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
}
Ok(query_trace_table(
ctx,
self.catalog_manager(),
self.query_engine(),
vec![wildcard()],
filters,
vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
None,
None,
vec![],
)
.await?)
match query_params.user_agent {
TraceUserAgent::Grafana => {
// grafana only use trace id and timestamp
// clicking the trace id will invoke the query trace api
// so we only need to return 1 span for each trace
let table_name = ctx
.extension(JAEGER_QUERY_TABLE_NAME_KEY)
.unwrap_or(TRACE_TABLE_NAME);
let table = get_table(ctx.clone(), self.catalog_manager(), table_name).await?;
Ok(find_traces_rank_3(
table,
self.query_engine(),
filters,
vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
)
.await?)
}
_ => {
// query all spans
Ok(query_trace_table(
ctx,
self.catalog_manager(),
self.query_engine(),
vec![wildcard()],
filters,
vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
None,
None,
vec![],
)
.await?)
}
}
}
}
@@ -392,6 +418,84 @@ async fn query_trace_table(
Ok(output)
}
async fn get_table(
ctx: QueryContextRef,
catalog_manager: &CatalogManagerRef,
table_name: &str,
) -> ServerResult<TableRef> {
catalog_manager
.table(
ctx.current_catalog(),
&ctx.current_schema(),
table_name,
Some(&ctx),
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table: table_name,
catalog: ctx.current_catalog(),
schema: ctx.current_schema(),
})
}
async fn find_traces_rank_3(
table: TableRef,
query_engine: &QueryEngineRef,
filters: Vec<Expr>,
sorts: Vec<SortExpr>,
) -> ServerResult<Output> {
let df_context = create_df_context(query_engine)?;
let dataframe = df_context
.read_table(Arc::new(DfTableProviderAdapter::new(table)))
.context(DataFusionSnafu)?;
let dataframe = dataframe
.select(vec![wildcard()])
.context(DataFusionSnafu)?;
// Apply all filters.
let dataframe = filters.into_iter().try_fold(dataframe, |df, expr| {
df.filter(expr).context(DataFusionSnafu)
})?;
// Apply the sorts if needed.
let dataframe = if !sorts.is_empty() {
dataframe.sort(sorts).context(DataFusionSnafu)?
} else {
dataframe
};
// create rank column, for each trace, get the earliest 3 spans
let trace_id_col = vec![col(TRACE_ID_COLUMN)];
let timestamp_asc = vec![col(TIMESTAMP_COLUMN).sort(true, false)];
let dataframe = dataframe
.with_column(
KEY_RN,
row_number()
.partition_by(trace_id_col)
.order_by(timestamp_asc)
.build()
.context(DataFusionSnafu)?,
)
.context(DataFusionSnafu)?;
let dataframe = dataframe
.filter(col(KEY_RN).lt_eq(lit(3)))
.context(DataFusionSnafu)?;
// Execute the query and collect the result.
let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
let output = Output::new_with_stream(Box::pin(
RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
));
Ok(output)
}
// The current implementation registers UDFs during the planning stage, which makes it difficult
// to utilize them through DataFrame APIs. To address this limitation, we create a new session
// context and register the required UDFs, allowing them to be decoupled from the global context.

View File

@@ -21,12 +21,14 @@ use axum::Extension;
use axum::extract::{Path, Query, State};
use axum::http::{HeaderMap, StatusCode as HttpStatusCode};
use axum::response::IntoResponse;
use axum_extra::TypedHeader;
use common_catalog::consts::{PARENT_SPAN_ID_COLUMN, TRACE_TABLE_NAME};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::{Output, OutputData};
use common_recordbatch::util;
use common_telemetry::{debug, error, tracing, warn};
use headers::UserAgent;
use serde::{Deserialize, Deserializer, Serialize, de};
use serde_json::Value as JsonValue;
use session::context::{Channel, QueryContext};
@@ -340,6 +342,30 @@ pub struct QueryTraceParams {
pub end_time: Option<i64>,
pub min_duration: Option<u64>,
pub max_duration: Option<u64>,
// The user agent of the trace query, mainly find traces
pub user_agent: TraceUserAgent,
}
#[derive(Debug, Default, PartialEq, Eq)]
pub enum TraceUserAgent {
Grafana,
// Jaeger-UI does not actually send user agent
// But it's a jaeger API, so let's treat it as jaeger
#[default]
Jaeger,
}
impl From<UserAgent> for TraceUserAgent {
fn from(value: UserAgent) -> Self {
let ua_str = value.as_str().to_lowercase();
debug!("received user agent: {}", ua_str);
if ua_str.contains("grafana") {
Self::Grafana
} else {
Self::Jaeger
}
}
}
/// Handle the GET `/api/services` request.
@@ -476,6 +502,7 @@ pub async fn handle_find_traces(
Query(query_params): Query<JaegerQueryParams>,
Extension(mut query_ctx): Extension<QueryContext>,
TraceTableName(table_name): TraceTableName,
optional_user_agent: Option<TypedHeader<UserAgent>>,
) -> impl IntoResponse {
debug!(
"Received Jaeger '/api/traces' request, query_params: {:?}, query_ctx: {:?}",
@@ -492,7 +519,10 @@ pub async fn handle_find_traces(
.start_timer();
match QueryTraceParams::from_jaeger_query_params(query_params) {
Ok(query_params) => {
Ok(mut query_params) => {
if let Some(TypedHeader(user_agent)) = optional_user_agent {
query_params.user_agent = user_agent.into();
}
let output = handler.find_traces(query_ctx, query_params).await;
match output {
Ok(output) => match covert_to_records(output).await {
@@ -1571,6 +1601,7 @@ mod tests {
("http.method".to_string(), JsonValue::String("GET".to_string())),
("http.path".to_string(), JsonValue::String("/api/v1/users".to_string())),
])),
user_agent: TraceUserAgent::Jaeger,
},
),
];

View File

@@ -6065,6 +6065,94 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
}
}
]
},
{
"scope": {
"name": "test-jaeger-grafana-ua",
"version": "1.0.0"
},
"spans": [
{
"traceId": "5611dce1bc9ebed65352d99a027b08fb",
"spanId": "ffa03416a7b9ea50",
"name": "access-span-1",
"kind": 2,
"startTimeUnixNano": "1738726754600000000",
"endTimeUnixNano": "1738726754700000000",
"attributes": [
{
"key": "operation.type",
"value": {
"stringValue": "access-span-1"
}
}
],
"status": {
"message": "success",
"code": 0
}
},
{
"traceId": "5611dce1bc9ebed65352d99a027b08fb",
"spanId": "ffa03416a7b9ea51",
"name": "access-span-2",
"kind": 2,
"startTimeUnixNano": "1738726754600001000",
"endTimeUnixNano": "1738726754700001000",
"attributes": [
{
"key": "operation.type",
"value": {
"stringValue": "access-span-2"
}
}
],
"status": {
"message": "success",
"code": 0
}
},
{
"traceId": "5611dce1bc9ebed65352d99a027b08fb",
"spanId": "ffa03416a7b9ea52",
"name": "access-span-3",
"kind": 2,
"startTimeUnixNano": "1738726754600002000",
"endTimeUnixNano": "1738726754700002000",
"attributes": [
{
"key": "operation.type",
"value": {
"stringValue": "access-span-3"
}
}
],
"status": {
"message": "success",
"code": 0
}
},
{
"traceId": "5611dce1bc9ebed65352d99a027b08fb",
"spanId": "ffa03416a7b9ea53",
"name": "access-span-4",
"kind": 2,
"startTimeUnixNano": "1738726754600003000",
"endTimeUnixNano": "1738726754700003000",
"attributes": [
{
"key": "operation.type",
"value": {
"stringValue": "access-span-4"
}
}
],
"status": {
"message": "success",
"code": 0
}
}
]
}
],
"schemaUrl": "https://opentelemetry.io/schemas/1.4.0"
@@ -6179,9 +6267,25 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
{
"name": "access-redis",
"spanKind": "server"
},
{
"name": "access-span-1",
"spanKind": "server"
},
{
"name": "access-span-2",
"spanKind": "server"
},
{
"name": "access-span-3",
"spanKind": "server"
},
{
"name": "access-span-4",
"spanKind": "server"
}
],
"total": 3,
"total": 7,
"limit": 0,
"offset": 0,
"errors": []
@@ -6203,9 +6307,13 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
"data": [
"access-mysql",
"access-pg",
"access-redis"
"access-redis",
"access-span-1",
"access-span-2",
"access-span-3",
"access-span-4"
],
"total": 3,
"total": 7,
"limit": 0,
"offset": 0,
"errors": []
@@ -6627,6 +6735,97 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
// Test `/api/traces` API with Grafana User-Agent.
// When user agent is Grafana, only return at most 3 spans per trace (earliest by timestamp).
// Trace `5611dce1bc9ebed65352d99a027b08fb` has 4 spans, so only 3 should be returned.
let res = client
.get("/v1/jaeger/api/traces?service=test-jaeger-query-api&start=1738726754600000&end=1738726754700004")
.header("x-greptime-trace-table-name", trace_table_name)
.header("User-Agent", "Grafana/8.0.0")
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
// Verify that the trace has exactly 3 spans (limited by Grafana user agent)
let data = resp.get("data").unwrap().as_array().unwrap();
assert_eq!(data.len(), 1, "Expected 1 trace");
let trace = &data[0];
assert_eq!(
trace.get("traceID").unwrap().as_str().unwrap(),
"5611dce1bc9ebed65352d99a027b08fb"
);
let spans = trace.get("spans").unwrap().as_array().unwrap();
assert_eq!(
spans.len(),
3,
"Expected 3 spans (limited by Grafana user agent), got {}",
spans.len()
);
// Verify that the 3 earliest spans are returned (by timestamp ascending)
let span_names: Vec<&str> = spans
.iter()
.map(|s| s.get("operationName").unwrap().as_str().unwrap())
.collect();
assert!(
span_names.contains(&"access-span-1"),
"Expected access-span-1 in spans"
);
assert!(
span_names.contains(&"access-span-2"),
"Expected access-span-2 in spans"
);
assert!(
span_names.contains(&"access-span-3"),
"Expected access-span-3 in spans"
);
assert!(
!span_names.contains(&"access-span-4"),
"access-span-4 should NOT be in spans (4th span)"
);
// Test `/api/traces` API without User-Agent (default behavior).
// All 4 spans should be returned for the trace.
let res = client
.get("/v1/jaeger/api/traces?service=test-jaeger-query-api&start=1738726754600000&end=1738726754700004")
.header("x-greptime-trace-table-name", trace_table_name)
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
let data = resp.get("data").unwrap().as_array().unwrap();
assert_eq!(data.len(), 1, "Expected 1 trace");
let trace = &data[0];
let spans = trace.get("spans").unwrap().as_array().unwrap();
assert_eq!(
spans.len(),
4,
"Expected 4 spans (no user agent limit), got {}",
spans.len()
);
// Test `/api/traces` API with Jaeger User-Agent (should return all spans like default).
let res = client
.get("/v1/jaeger/api/traces?service=test-jaeger-query-api&start=1738726754600000&end=1738726754700004")
.header("x-greptime-trace-table-name", trace_table_name)
.header("User-Agent", "Jaeger-Query/1.0.0")
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
let data = resp.get("data").unwrap().as_array().unwrap();
assert_eq!(data.len(), 1, "Expected 1 trace");
let trace = &data[0];
let spans = trace.get("spans").unwrap().as_array().unwrap();
assert_eq!(
spans.len(),
4,
"Expected 4 spans (Jaeger user agent, no limit), got {}",
spans.len()
);
guard.remove_all().await;
}