chore: improve search traces and jaeger resp (#7166)

* chore: add jaeger field in trace query

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update search v1 with tags

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update col matching using col names

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: minify code with macro

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: fix test

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: change macro to inline function

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: fix filter with tags & add test

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
shuiyisong
2025-11-04 13:49:08 +08:00
committed by GitHub
parent 421f4eec05
commit 6caff50d01
6 changed files with 247 additions and 30 deletions

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use async_trait::async_trait;
@@ -28,6 +28,7 @@ use common_function::scalars::udf::create_udf;
use common_query::{Output, OutputData};
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::util;
use common_telemetry::warn;
use datafusion::dataframe::DataFrame;
use datafusion::execution::SessionStateBuilder;
use datafusion::execution::context::SessionContext;
@@ -42,8 +43,9 @@ use servers::error::{
};
use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams};
use servers::otlp::trace::{
DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_KIND_COLUMN,
SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
DURATION_NANO_COLUMN, KEY_OTEL_STATUS_ERROR_KEY, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_ERROR,
TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
};
use servers::query_handler::JaegerQueryHandler;
use session::context::QueryContextRef;
@@ -322,6 +324,7 @@ async fn query_trace_table(
})?;
let is_data_model_v1 = table
.clone()
.table_info()
.meta
.options
@@ -330,6 +333,14 @@ async fn query_trace_table(
.map(|s| s.as_str())
== Some(TABLE_DATA_MODEL_TRACE_V1);
// collect to set
let col_names = table
.table_info()
.meta
.field_column_names()
.map(|s| format!("\"{}\"", s))
.collect::<HashSet<String>>();
let df_context = create_df_context(query_engine)?;
let dataframe = df_context
@@ -342,7 +353,7 @@ async fn query_trace_table(
let dataframe = filters
.into_iter()
.chain(tags.map_or(Ok(vec![]), |t| {
tags_filters(&dataframe, t, is_data_model_v1)
tags_filters(&dataframe, t, is_data_model_v1, &col_names)
})?)
.try_fold(dataframe, |df, expr| {
df.filter(expr).context(DataFusionSnafu)
@@ -472,23 +483,73 @@ fn json_tag_filters(
Ok(filters)
}
fn flatten_tag_filters(tags: HashMap<String, JsonValue>) -> ServerResult<Vec<Expr>> {
/// Helper function to check if span_key or resource_key exists in col_names and create an expression.
/// If neither exists, logs a warning and returns None.
#[inline]
fn check_col_and_build_expr<F>(
span_key: String,
resource_key: String,
key: &str,
col_names: &HashSet<String>,
expr_builder: F,
) -> Option<Expr>
where
F: FnOnce(String) -> Expr,
{
if col_names.contains(&span_key) {
return Some(expr_builder(span_key));
}
if col_names.contains(&resource_key) {
return Some(expr_builder(resource_key));
}
warn!("tag key {} not found in table columns", key);
None
}
fn flatten_tag_filters(
tags: HashMap<String, JsonValue>,
col_names: &HashSet<String>,
) -> ServerResult<Vec<Expr>> {
let filters = tags
.into_iter()
.filter_map(|(key, value)| {
let key = format!("\"span_attributes.{}\"", key);
if key == KEY_OTEL_STATUS_ERROR_KEY && value == JsonValue::Bool(true) {
return Some(col(SPAN_STATUS_CODE).eq(lit(SPAN_STATUS_ERROR)));
}
// TODO(shuiyisong): add more precise mapping from key to col name
let span_key = format!("\"span_attributes.{}\"", key);
let resource_key = format!("\"resource_attributes.{}\"", key);
match value {
JsonValue::String(value) => Some(col(key).eq(lit(value))),
JsonValue::String(value) => {
check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
col(k).eq(lit(value))
})
}
JsonValue::Number(value) => {
if value.is_f64() {
// safe to unwrap as checked previously
Some(col(key).eq(lit(value.as_f64().unwrap())))
let value = value.as_f64().unwrap();
check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
col(k).eq(lit(value))
})
} else {
Some(col(key).eq(lit(value.as_i64().unwrap())))
let value = value.as_i64().unwrap();
check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
col(k).eq(lit(value))
})
}
}
JsonValue::Bool(value) => Some(col(key).eq(lit(value))),
JsonValue::Null => Some(col(key).is_null()),
JsonValue::Bool(value) => {
check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
col(k).eq(lit(value))
})
}
JsonValue::Null => {
check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
col(k).is_null()
})
}
// not supported at the moment
JsonValue::Array(_value) => None,
JsonValue::Object(_value) => None,
@@ -502,9 +563,10 @@ fn tags_filters(
dataframe: &DataFrame,
tags: HashMap<String, JsonValue>,
is_data_model_v1: bool,
col_names: &HashSet<String>,
) -> ServerResult<Vec<Expr>> {
if is_data_model_v1 {
flatten_tag_filters(tags)
flatten_tag_filters(tags, col_names)
} else {
json_tag_filters(dataframe, tags)
}

View File

@@ -40,10 +40,12 @@ use crate::http::extractor::TraceTableName;
use crate::metrics::METRIC_JAEGER_QUERY_ELAPSED;
use crate::otlp::trace::{
DURATION_NANO_COLUMN, KEY_OTEL_SCOPE_NAME, KEY_OTEL_SCOPE_VERSION, KEY_OTEL_STATUS_CODE,
KEY_SERVICE_NAME, KEY_SPAN_KIND, RESOURCE_ATTRIBUTES_COLUMN, SCOPE_NAME_COLUMN,
SCOPE_VERSION_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_EVENTS_COLUMN,
SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE,
SPAN_STATUS_PREFIX, SPAN_STATUS_UNSET, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
KEY_OTEL_STATUS_ERROR_KEY, KEY_OTEL_STATUS_MESSAGE, KEY_OTEL_TRACE_STATE, KEY_SERVICE_NAME,
KEY_SPAN_KIND, RESOURCE_ATTRIBUTES_COLUMN, SCOPE_NAME_COLUMN, SCOPE_VERSION_COLUMN,
SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN,
SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_ERROR,
SPAN_STATUS_MESSAGE_COLUMN, SPAN_STATUS_PREFIX, SPAN_STATUS_UNSET, TIMESTAMP_COLUMN,
TRACE_ID_COLUMN, TRACE_STATE_COLUMN,
};
use crate::query_handler::JaegerQueryHandlerRef;
@@ -654,7 +656,10 @@ async fn covert_to_records(output: Output) -> Result<Option<HttpRecordsOutput>>
.await
.context(CollectRecordbatchSnafu)?,
)?;
debug!("The query records: {:?}", records);
debug!(
"The query records: {}",
serde_json::to_string(&records).unwrap()
);
Ok(Some(records))
}
// It's unlikely to happen. However, if the output is not a stream, return None.
@@ -859,6 +864,38 @@ fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
value_type: ValueType::String,
value: Value::String(normalize_status_code(&span_status)),
});
// set error to comply with the Jaeger API
if span_status == SPAN_STATUS_ERROR {
span.tags.push(KeyValue {
key: KEY_OTEL_STATUS_ERROR_KEY.to_string(),
value_type: ValueType::Boolean,
value: Value::Boolean(true),
});
}
}
}
SPAN_STATUS_MESSAGE_COLUMN => {
if let JsonValue::String(span_status_message) = cell
&& !span_status_message.is_empty()
{
span.tags.push(KeyValue {
key: KEY_OTEL_STATUS_MESSAGE.to_string(),
value_type: ValueType::String,
value: Value::String(span_status_message),
});
}
}
TRACE_STATE_COLUMN => {
if let JsonValue::String(trace_state) = cell
&& !trace_state.is_empty()
{
span.tags.push(KeyValue {
key: KEY_OTEL_TRACE_STATE.to_string(),
value_type: ValueType::String,
value: Value::String(trace_state),
});
}
}

View File

@@ -34,11 +34,13 @@ pub const TIMESTAMP_COLUMN: &str = "timestamp";
pub const DURATION_NANO_COLUMN: &str = "duration_nano";
pub const SPAN_KIND_COLUMN: &str = "span_kind";
pub const SPAN_STATUS_CODE: &str = "span_status_code";
pub const SPAN_STATUS_MESSAGE_COLUMN: &str = "span_status_message";
pub const SPAN_ATTRIBUTES_COLUMN: &str = "span_attributes";
pub const SPAN_EVENTS_COLUMN: &str = "span_events";
pub const SCOPE_NAME_COLUMN: &str = "scope_name";
pub const SCOPE_VERSION_COLUMN: &str = "scope_version";
pub const RESOURCE_ATTRIBUTES_COLUMN: &str = "resource_attributes";
pub const TRACE_STATE_COLUMN: &str = "trace_state";
// const keys
pub const KEY_SERVICE_NAME: &str = "service.name";
@@ -49,6 +51,9 @@ pub const KEY_SPAN_KIND: &str = "span.kind";
pub const KEY_OTEL_SCOPE_NAME: &str = "otel.scope.name";
pub const KEY_OTEL_SCOPE_VERSION: &str = "otel.scope.version";
pub const KEY_OTEL_STATUS_CODE: &str = "otel.status_code";
pub const KEY_OTEL_STATUS_MESSAGE: &str = "otel.status_description";
pub const KEY_OTEL_STATUS_ERROR_KEY: &str = "error";
pub const KEY_OTEL_TRACE_STATE: &str = "w3c.tracestate";
/// The span kind prefix in the database.
/// If the span kind is `server`, it will be stored as `SPAN_KIND_SERVER` in the database.
@@ -57,6 +62,7 @@ pub const SPAN_KIND_PREFIX: &str = "SPAN_KIND_";
// The span status code prefix in the database.
pub const SPAN_STATUS_PREFIX: &str = "STATUS_CODE_";
pub const SPAN_STATUS_UNSET: &str = "STATUS_CODE_UNSET";
pub const SPAN_STATUS_ERROR: &str = "STATUS_CODE_ERROR";
/// Convert SpanTraces to GreptimeDB row insert requests.
/// Returns `InsertRequests` and total number of rows to ingest

View File

@@ -26,8 +26,8 @@ use crate::error::Result;
use crate::otlp::trace::span::{TraceSpan, parse};
use crate::otlp::trace::{
DURATION_NANO_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN,
TRACE_ID_COLUMN,
SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, SPAN_STATUS_CODE,
SPAN_STATUS_MESSAGE_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_STATE_COLUMN,
};
use crate::otlp::utils::{make_column_data, make_string_column_data};
use crate::query_handler::PipelineHandlerRef;
@@ -124,9 +124,9 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
make_string_column_data(PARENT_SPAN_ID_COLUMN, span.parent_span_id),
make_string_column_data(SPAN_KIND_COLUMN, Some(span.span_kind)),
make_string_column_data(SPAN_NAME_COLUMN, Some(span.span_name)),
make_string_column_data("span_status_code", Some(span.span_status_code)),
make_string_column_data("span_status_message", Some(span.span_status_message)),
make_string_column_data("trace_state", Some(span.trace_state)),
make_string_column_data(SPAN_STATUS_CODE, Some(span.span_status_code)),
make_string_column_data(SPAN_STATUS_MESSAGE_COLUMN, Some(span.span_status_message)),
make_string_column_data(TRACE_STATE_COLUMN, Some(span.trace_state)),
];
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;

View File

@@ -27,9 +27,10 @@ use crate::error::Result;
use crate::otlp::trace::attributes::Attributes;
use crate::otlp::trace::span::{TraceSpan, parse};
use crate::otlp::trace::{
DURATION_NANO_COLUMN, KEY_SERVICE_NAME, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN,
SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN,
TRACE_ID_COLUMN,
DURATION_NANO_COLUMN, KEY_SERVICE_NAME, PARENT_SPAN_ID_COLUMN, SCOPE_NAME_COLUMN,
SCOPE_VERSION_COLUMN, SERVICE_NAME_COLUMN, SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN,
SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_MESSAGE_COLUMN,
TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_STATE_COLUMN,
};
use crate::otlp::utils::{any_value_to_jsonb, make_column_data, make_string_column_data};
use crate::query_handler::PipelineHandlerRef;
@@ -135,11 +136,11 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
make_string_column_data(SPAN_ID_COLUMN, Some(span.span_id)),
make_string_column_data(SPAN_KIND_COLUMN, Some(span.span_kind)),
make_string_column_data(SPAN_NAME_COLUMN, Some(span.span_name)),
make_string_column_data("span_status_code", Some(span.span_status_code)),
make_string_column_data("span_status_message", Some(span.span_status_message)),
make_string_column_data("trace_state", Some(span.trace_state)),
make_string_column_data("scope_name", Some(span.scope_name)),
make_string_column_data("scope_version", Some(span.scope_version)),
make_string_column_data(SPAN_STATUS_CODE, Some(span.span_status_code)),
make_string_column_data(SPAN_STATUS_MESSAGE_COLUMN, Some(span.span_status_message)),
make_string_column_data(TRACE_STATE_COLUMN, Some(span.trace_state)),
make_string_column_data(SCOPE_NAME_COLUMN, Some(span.scope_name)),
make_string_column_data(SCOPE_VERSION_COLUMN, Some(span.scope_version)),
];
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;

View File

@@ -5633,6 +5633,11 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
"type": "string",
"value": "1.0.0"
},
{
"key": "otel.status_description",
"type": "string",
"value": "success"
},
{
"key": "peer.service",
"type": "string",
@@ -5675,6 +5680,11 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
"type": "string",
"value": "1.0.0"
},
{
"key": "otel.status_description",
"type": "string",
"value": "success"
},
{
"key": "peer.service",
"type": "string",
@@ -5750,6 +5760,11 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
"type": "string",
"value": "1.0.0"
},
{
"key": "otel.status_description",
"type": "string",
"value": "success"
},
{
"key": "peer.service",
"type": "string",
@@ -5792,6 +5807,11 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
"type": "string",
"value": "1.0.0"
},
{
"key": "otel.status_description",
"type": "string",
"value": "success"
},
{
"key": "peer.service",
"type": "string",
@@ -5975,6 +5995,34 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
}
}
]
},
{
"scope": {
"name": "test-jaeger-find-traces",
"version": "1.0.0"
},
"spans": [
{
"traceId": "5611dce1bc9ebed65352d99a027b08fa",
"spanId": "ffa03416a7b9ea49",
"name": "access-pg",
"kind": 2,
"startTimeUnixNano": "1738726754492422000",
"endTimeUnixNano": "1738726754592422000",
"attributes": [
{
"key": "operation.type",
"value": {
"stringValue": "access-pg"
}
}
],
"status": {
"message": "success",
"code": 0
}
}
]
}
],
"schemaUrl": "https://opentelemetry.io/schemas/1.4.0"
@@ -6143,6 +6191,11 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
"type": "string",
"value": "1.0.0"
},
{
"key": "otel.status_description",
"type": "string",
"value": "success"
},
{
"key": "peer.service",
"type": "string",
@@ -6185,6 +6238,11 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
"type": "string",
"value": "1.0.0"
},
{
"key": "otel.status_description",
"type": "string",
"value": "success"
},
{
"key": "peer.service",
"type": "string",
@@ -6259,6 +6317,11 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
"type": "string",
"value": "1.0.0"
},
{
"key": "otel.status_description",
"type": "string",
"value": "success"
},
{
"key": "peer.service",
"type": "string",
@@ -6301,6 +6364,11 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
"type": "string",
"value": "1.0.0"
},
{
"key": "otel.status_description",
"type": "string",
"value": "success"
},
{
"key": "peer.service",
"type": "string",
@@ -6376,6 +6444,11 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
"type": "string",
"value": "1.0.0"
},
{
"key": "otel.status_description",
"type": "string",
"value": "success"
},
{
"key": "peer.service",
"type": "string",
@@ -6418,6 +6491,11 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
"type": "string",
"value": "1.0.0"
},
{
"key": "otel.status_description",
"type": "string",
"value": "success"
},
{
"key": "peer.service",
"type": "string",
@@ -6452,6 +6530,39 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
// Test `/api/traces` API with tags.
// 1. first query without tags, get 2 results
let res = client
.get("/v1/jaeger/api/traces?service=test-jaeger-query-api&start=1738726754492422&end=1738726754592422")
.header("x-greptime-trace-table-name", trace_table_name)
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
let expected = r#"
{"data":[{"processes":{"p1":{"serviceName":"test-jaeger-query-api","tags":[]}},"spans":[{"duration":100000,"logs":[],"operationName":"access-pg","processID":"p1","references":[],"spanID":"ffa03416a7b9ea49","startTime":1738726754492422,"tags":[{"key":"operation.type","type":"string","value":"access-pg"},{"key":"otel.scope.name","type":"string","value":"test-jaeger-find-traces"},{"key":"otel.scope.version","type":"string","value":"1.0.0"},{"key":"otel.status_description","type":"string","value":"success"},{"key":"span.kind","type":"string","value":"server"}],"traceID":"5611dce1bc9ebed65352d99a027b08fa"}],"traceID":"5611dce1bc9ebed65352d99a027b08fa"},{"processes":{"p1":{"serviceName":"test-jaeger-query-api","tags":[]}},"spans":[{"duration":100000,"logs":[],"operationName":"access-redis","processID":"p1","references":[],"spanID":"ffa03416a7b9ea48","startTime":1738726754492422,"tags":[{"key":"net.peer.ip","type":"string","value":"1.2.3.4"},{"key":"operation.type","type":"string","value":"access-redis"},{"key":"otel.scope.name","type":"string","value":"test-jaeger-query-api"},{"key":"otel.scope.version","type":"string","value":"1.0.0"},{"key":"otel.status_description","type":"string","value":"success"},{"key":"peer.service","type":"string","value":"test-jaeger-query-api"},{"key":"span.kind","type":"string","value":"server"}],"traceID":"5611dce1bc9ebed65352d99a027b08ea"}],"traceID":"5611dce1bc9ebed65352d99a027b08ea"}],"errors":[],"limit":0,"offset":0,"total":0}
"#;
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
// 2. second query with tags, get 1 result
let res = client
.get("/v1/jaeger/api/traces?service=test-jaeger-query-api&start=1738726754492422&end=1738726754592422&tags=%7B%22operation.type%22%3A%22access-pg%22%7D")
.header("x-greptime-trace-table-name", trace_table_name)
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
let expected = r#"
{"data":[{"processes":{"p1":{"serviceName":"test-jaeger-query-api","tags":[]}},"spans":[{"duration":100000,"logs":[],"operationName":"access-pg","processID":"p1","references":[],"spanID":"ffa03416a7b9ea49","startTime":1738726754492422,"tags":[{"key":"operation.type","type":"string","value":"access-pg"},{"key":"otel.scope.name","type":"string","value":"test-jaeger-find-traces"},{"key":"otel.scope.version","type":"string","value":"1.0.0"},{"key":"otel.status_description","type":"string","value":"success"},{"key":"span.kind","type":"string","value":"server"}],"traceID":"5611dce1bc9ebed65352d99a027b08fa"}],"traceID":"5611dce1bc9ebed65352d99a027b08fa"}],"errors":[],"limit":0,"offset":0,"total":0}
"#;
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
guard.remove_all().await;
}