refactor: update jaeger api implementation for new trace modeling (#5655)

* refactor: update jaeger api implementation

* test: add tests for v1 data model

* feat: customize trace table name

* fix: update column requirements to use Column type instead of String

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: lint fix

* refactor: accumulate resource attributes for v1

* fix: add empty check for additional string

* feat: add table option to mark data model version

* fix: do not overwrite all tags

* feat: use table option to mark table data model version and process accordingly

* chore: update comments to reflect query changes

* feat: use header for jaeger table name

* feat: update index for service_name, drop index for span_name

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: zyy17 <zyylsxm@gmail.com>
This commit is contained in:
Ning Sun
2025-03-17 00:31:32 -07:00
committed by GitHub
parent 09dacc8e9b
commit 2260782c12
10 changed files with 985 additions and 299 deletions

1
Cargo.lock generated
View File

@@ -11930,6 +11930,7 @@ dependencies = [
"operator",
"partition",
"paste",
"pipeline",
"prost 0.13.3",
"query",
"rand",

View File

@@ -135,5 +135,6 @@ pub fn is_readonly_schema(schema: &str) -> bool {
pub const TRACE_ID_COLUMN: &str = "trace_id";
pub const SPAN_ID_COLUMN: &str = "span_id";
pub const SPAN_NAME_COLUMN: &str = "span_name";
pub const SERVICE_NAME_COLUMN: &str = "service_name";
pub const PARENT_SPAN_ID_COLUMN: &str = "parent_span_id";
// ---- End of special table and fields ----

View File

@@ -28,14 +28,14 @@ use common_recordbatch::adapter::RecordBatchStreamAdapter;
use datafusion::dataframe::DataFrame;
use datafusion::execution::context::SessionContext;
use datafusion::execution::SessionStateBuilder;
use datafusion_expr::{col, lit, lit_timestamp_nano, Expr};
use datafusion_expr::{col, lit, lit_timestamp_nano, wildcard, Expr};
use query::QueryEngineRef;
use serde_json::Value as JsonValue;
use servers::error::{
CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
TableNotFoundSnafu,
};
use servers::http::jaeger::{QueryTraceParams, FIND_TRACES_COLS};
use servers::http::jaeger::{QueryTraceParams, JAEGER_QUERY_TABLE_NAME_KEY};
use servers::otlp::trace::{
DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_KIND_COLUMN,
SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_TABLE_NAME,
@@ -43,6 +43,7 @@ use servers::otlp::trace::{
use servers::query_handler::JaegerQueryHandler;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
use table::table::adapter::DfTableProviderAdapter;
use super::Instance;
@@ -82,7 +83,19 @@ impl JaegerQueryHandler for Instance {
))));
}
// It's equivalent to `SELECT span_name, span_kind FROM {db}.{trace_table} WHERE service_name = '{service_name}'`.
// It's equivalent to
//
// ```
// SELECT
// span_name,
// span_kind
// FROM
// {db}.{trace_table}
// WHERE
// service_name = '{service_name}'
// ORDER BY
// timestamp
// ```.
Ok(query_trace_table(
ctx,
self.catalog_manager(),
@@ -101,9 +114,19 @@ impl JaegerQueryHandler for Instance {
}
async fn get_trace(&self, ctx: QueryContextRef, trace_id: &str) -> ServerResult<Output> {
// It's equivalent to `SELECT trace_id, timestamp, duration_nano, service_name, span_name, span_id, span_attributes, resource_attributes, parent_span_id
// FROM {db}.{trace_table} WHERE trace_id = '{trace_id}'`.
let selects: Vec<Expr> = FIND_TRACES_COLS.clone();
// It's equivalent to
//
// ```
// SELECT
// *
// FROM
// {db}.{trace_table}
// WHERE
// trace_id = '{trace_id}'
// ORDER BY
// timestamp
// ```.
let selects = vec![wildcard()];
let filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
@@ -125,7 +148,7 @@ impl JaegerQueryHandler for Instance {
ctx: QueryContextRef,
query_params: QueryTraceParams,
) -> ServerResult<Output> {
let selects: Vec<Expr> = FIND_TRACES_COLS.clone();
let selects = vec![wildcard()];
let mut filters = vec![];
@@ -174,17 +197,34 @@ async fn query_trace_table(
tags: Option<HashMap<String, JsonValue>>,
distinct: bool,
) -> ServerResult<Output> {
let db = ctx.get_db_string();
let table_name = ctx
.extension(JAEGER_QUERY_TABLE_NAME_KEY)
.unwrap_or(TRACE_TABLE_NAME);
let table = catalog_manager
.table(ctx.current_catalog(), &db, TRACE_TABLE_NAME, Some(&ctx))
.table(
ctx.current_catalog(),
&ctx.current_schema(),
table_name,
Some(&ctx),
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table: TRACE_TABLE_NAME,
table: table_name,
catalog: ctx.current_catalog(),
schema: db,
schema: ctx.current_schema(),
})?;
let is_data_model_v1 = table
.table_info()
.meta
.options
.extra_options
.get(TABLE_DATA_MODEL)
.map(|s| s.as_str())
== Some(TABLE_DATA_MODEL_TRACE_V1);
let df_context = create_df_context(query_engine, ctx.clone())?;
let dataframe = df_context
@@ -196,7 +236,9 @@ async fn query_trace_table(
// Apply all filters.
let dataframe = filters
.into_iter()
.chain(tags.map_or(Ok(vec![]), |t| tags_filters(&dataframe, t))?)
.chain(tags.map_or(Ok(vec![]), |t| {
tags_filters(&dataframe, t, is_data_model_v1)
})?)
.try_fold(dataframe, |df, expr| {
df.filter(expr).context(DataFusionSnafu)
})?;
@@ -205,7 +247,10 @@ async fn query_trace_table(
let dataframe = if distinct {
dataframe.distinct().context(DataFusionSnafu)?
} else {
// for non distinct query, sort by timestamp to make results stable
dataframe
.sort_by(vec![col(TIMESTAMP_COLUMN)])
.context(DataFusionSnafu)?
};
// Apply the limit if needed.
@@ -237,7 +282,7 @@ fn create_df_context(
SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
);
// The following JSON UDFs will be used for tags filters.
// The following JSON UDFs will be used for tags filters on v0 data model.
let udfs: Vec<FunctionRef> = vec![
Arc::new(JsonGetInt),
Arc::new(JsonGetFloat),
@@ -256,7 +301,7 @@ fn create_df_context(
Ok(df_context)
}
fn tags_filters(
fn json_tag_filters(
dataframe: &DataFrame,
tags: HashMap<String, JsonValue>,
) -> ServerResult<Vec<Expr>> {
@@ -322,3 +367,41 @@ fn tags_filters(
Ok(filters)
}
fn flatten_tag_filters(tags: HashMap<String, JsonValue>) -> ServerResult<Vec<Expr>> {
let filters = tags
.into_iter()
.filter_map(|(key, value)| {
let key = format!("\"span_attributes.{}\"", key);
match value {
JsonValue::String(value) => Some(col(key).eq(lit(value))),
JsonValue::Number(value) => {
if value.is_f64() {
// safe to unwrap as checked previously
Some(col(key).eq(lit(value.as_f64().unwrap())))
} else {
Some(col(key).eq(lit(value.as_i64().unwrap())))
}
}
JsonValue::Bool(value) => Some(col(key).eq(lit(value))),
JsonValue::Null => Some(col(key).is_null()),
// not supported at the moment
JsonValue::Array(_value) => None,
JsonValue::Object(_value) => None,
}
})
.collect();
Ok(filters)
}
fn tags_filters(
dataframe: &DataFrame,
tags: HashMap<String, JsonValue>,
is_data_model_v1: bool,
) -> ServerResult<Vec<Expr>> {
if is_data_model_v1 {
flatten_tag_filters(tags)
} else {
json_tag_filters(dataframe, tags)
}
}

View File

@@ -90,6 +90,8 @@ impl OpenTelemetryProtocolHandler for Instance {
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_execute(ctx.clone())?;
let is_trace_v1_model = matches!(pipeline, PipelineWay::OtlpTraceDirectV1);
let (requests, rows) = otlp::trace::to_grpc_insert_requests(
request,
pipeline,
@@ -101,10 +103,17 @@ impl OpenTelemetryProtocolHandler for Instance {
OTLP_TRACES_ROWS.inc_by(rows as u64);
self.handle_trace_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
if is_trace_v1_model {
self.handle_trace_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
} else {
self.handle_log_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
}
}
#[tracing::instrument(skip_all)]

View File

@@ -28,7 +28,7 @@ use api::v1::{
use catalog::CatalogManagerRef;
use client::{OutputData, OutputMeta};
use common_catalog::consts::{
default_engine, PARENT_SPAN_ID_COLUMN, SPAN_NAME_COLUMN, TRACE_ID_COLUMN,
default_engine, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, TRACE_ID_COLUMN,
};
use common_grpc_expr::util::ColumnExpr;
use common_meta::cache::TableFlownodeSetCacheRef;
@@ -54,7 +54,10 @@ use store_api::metric_engine_consts::{
use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY};
use store_api::storage::{RegionId, TableId};
use table::metadata::TableInfo;
use table::requests::{InsertRequest as TableInsertRequest, AUTO_CREATE_TABLE_KEY, TTL_KEY};
use table::requests::{
InsertRequest as TableInsertRequest, AUTO_CREATE_TABLE_KEY, TABLE_DATA_MODEL,
TABLE_DATA_MODEL_TRACE_V1, TTL_KEY,
};
use table::table_reference::TableReference;
use table::TableRef;
@@ -578,7 +581,8 @@ impl Inserter {
// - trace_id: when searching by trace id
// - parent_span_id: when searching root span
// - span_name: when searching certain types of span
let index_columns = [TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SPAN_NAME_COLUMN];
let index_columns =
[TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN];
for index_column in index_columns {
if let Some(col) = create_table
.column_defs
@@ -595,6 +599,12 @@ impl Inserter {
}
}
// use table_options to mark table model version
create_table.table_options.insert(
TABLE_DATA_MODEL.to_string(),
TABLE_DATA_MODEL_TRACE_V1.to_string(),
);
let table = self
.create_physical_table(
create_table,

View File

@@ -30,5 +30,5 @@ pub use etl::{
pub use manager::{
error, pipeline_operator, table, util, PipelineDefinition, PipelineInfo, PipelineRef,
PipelineTableRef, PipelineVersion, PipelineWay, SelectInfo,
GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME,
};

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::collections::HashMap;
use std::sync::Arc;
@@ -26,8 +25,6 @@ use common_error::status_code::StatusCode;
use common_query::{Output, OutputData};
use common_recordbatch::util;
use common_telemetry::{debug, error, tracing, warn};
use datafusion_expr::{col, Expr};
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use session::context::{Channel, QueryContext};
@@ -36,6 +33,7 @@ use snafu::{OptionExt, ResultExt};
use crate::error::{
status_code_to_http_status, CollectRecordbatchSnafu, Error, InvalidJaegerQuerySnafu, Result,
};
use crate::http::extractor::TraceTableName;
use crate::http::HttpRecordsOutput;
use crate::metrics::METRIC_JAEGER_QUERY_ELAPSED;
use crate::otlp::trace::{
@@ -47,43 +45,9 @@ use crate::otlp::trace::{
};
use crate::query_handler::JaegerQueryHandlerRef;
lazy_static! {
pub static ref FIND_TRACES_COLS: Vec<Expr> = vec![
col(TRACE_ID_COLUMN),
col(TIMESTAMP_COLUMN),
col(DURATION_NANO_COLUMN),
col(SERVICE_NAME_COLUMN),
col(SPAN_NAME_COLUMN),
col(SPAN_ID_COLUMN),
col(SPAN_ATTRIBUTES_COLUMN),
col(RESOURCE_ATTRIBUTES_COLUMN),
col(PARENT_SPAN_ID_COLUMN),
col(SPAN_EVENTS_COLUMN),
col(SCOPE_NAME_COLUMN),
col(SCOPE_VERSION_COLUMN),
col(SPAN_KIND_COLUMN),
col(SPAN_STATUS_CODE),
];
static ref FIND_TRACES_SCHEMA: Vec<(&'static str, &'static str)> = vec![
(TRACE_ID_COLUMN, "String"),
(TIMESTAMP_COLUMN, "TimestampNanosecond"),
(DURATION_NANO_COLUMN, "UInt64"),
(SERVICE_NAME_COLUMN, "String"),
(SPAN_NAME_COLUMN, "String"),
(SPAN_ID_COLUMN, "String"),
(SPAN_ATTRIBUTES_COLUMN, "Json"),
(RESOURCE_ATTRIBUTES_COLUMN, "Json"),
(PARENT_SPAN_ID_COLUMN, "String"),
(SPAN_EVENTS_COLUMN, "Json"),
(SCOPE_NAME_COLUMN, "String"),
(SCOPE_VERSION_COLUMN, "String"),
(SPAN_KIND_COLUMN, "String"),
(SPAN_STATUS_CODE, "String"),
];
}
pub const JAEGER_QUERY_TABLE_NAME_KEY: &str = "jaeger_query_table_name";
const REF_TYPE_CHILD_OF: &str = "CHILD_OF";
const SPAN_KIND_TIME_FMTS: [&str; 2] = ["%Y-%m-%d %H:%M:%S%.6f%z", "%Y-%m-%d %H:%M:%S%.9f%z"];
/// JaegerAPIResponse is the response of Jaeger HTTP API.
@@ -240,9 +204,6 @@ pub enum ValueType {
#[derive(Default, Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct JaegerQueryParams {
/// Database that the trace data stored in.
pub db: Option<String>,
/// Service name of the trace.
#[serde(rename = "service")]
pub service_name: Option<String>,
@@ -275,26 +236,27 @@ pub struct JaegerQueryParams {
pub span_kind: Option<String>,
}
fn update_query_context(query_ctx: &mut QueryContext, table_name: Option<String>) {
// db should be already handled by middlewares
query_ctx.set_channel(Channel::Jaeger);
if let Some(table) = table_name {
query_ctx.set_extension(JAEGER_QUERY_TABLE_NAME_KEY, table);
}
}
impl QueryTraceParams {
fn from_jaeger_query_params(db: &str, query_params: JaegerQueryParams) -> Result<Self> {
fn from_jaeger_query_params(query_params: JaegerQueryParams) -> Result<Self> {
let mut internal_query_params: QueryTraceParams = QueryTraceParams {
db: db.to_string(),
service_name: query_params.service_name.context(InvalidJaegerQuerySnafu {
reason: "service_name is required".to_string(),
})?,
operation_name: query_params.operation_name,
// Convert start time from microseconds to nanoseconds.
start_time: query_params.start.map(|start| start * 1000),
end_time: query_params.end.map(|end| end * 1000),
..Default::default()
};
internal_query_params.service_name =
query_params.service_name.context(InvalidJaegerQuerySnafu {
reason: "service_name is required".to_string(),
})?;
internal_query_params.operation_name = query_params.operation_name;
// Convert start time from microseconds to nanoseconds.
internal_query_params.start_time = query_params.start.map(|start| start * 1000);
// Convert end time from microseconds to nanoseconds.
internal_query_params.end_time = query_params.end.map(|end| end * 1000);
if let Some(max_duration) = query_params.max_duration {
let duration = humantime::parse_duration(&max_duration).map_err(|e| {
InvalidJaegerQuerySnafu {
@@ -343,7 +305,6 @@ impl QueryTraceParams {
#[derive(Debug, Default, PartialEq)]
pub struct QueryTraceParams {
pub db: String,
pub service_name: String,
pub operation_name: Option<String>,
@@ -367,12 +328,14 @@ pub async fn handle_get_services(
State(handler): State<JaegerQueryHandlerRef>,
Query(query_params): Query<JaegerQueryParams>,
Extension(mut query_ctx): Extension<QueryContext>,
TraceTableName(table_name): TraceTableName,
) -> impl IntoResponse {
debug!(
"Received Jaeger '/api/services' request, query_params: {:?}, query_ctx: {:?}",
query_params, query_ctx
);
query_ctx.set_channel(Channel::Jaeger);
update_query_context(&mut query_ctx, table_name);
let query_ctx = Arc::new(query_ctx);
let db = query_ctx.get_db_string();
@@ -418,12 +381,14 @@ pub async fn handle_get_trace(
Path(trace_id): Path<String>,
Query(query_params): Query<JaegerQueryParams>,
Extension(mut query_ctx): Extension<QueryContext>,
TraceTableName(table_name): TraceTableName,
) -> impl IntoResponse {
debug!(
"Received Jaeger '/api/traces/{}' request, query_params: {:?}, query_ctx: {:?}",
trace_id, query_params, query_ctx
);
query_ctx.set_channel(Channel::Jaeger);
update_query_context(&mut query_ctx, table_name);
let query_ctx = Arc::new(query_ctx);
let db = query_ctx.get_db_string();
@@ -472,12 +437,14 @@ pub async fn handle_find_traces(
State(handler): State<JaegerQueryHandlerRef>,
Query(query_params): Query<JaegerQueryParams>,
Extension(mut query_ctx): Extension<QueryContext>,
TraceTableName(table_name): TraceTableName,
) -> impl IntoResponse {
debug!(
"Received Jaeger '/api/traces' request, query_params: {:?}, query_ctx: {:?}",
query_params, query_ctx
);
query_ctx.set_channel(Channel::Jaeger);
update_query_context(&mut query_ctx, table_name);
let query_ctx = Arc::new(query_ctx);
let db = query_ctx.get_db_string();
@@ -486,7 +453,7 @@ pub async fn handle_find_traces(
.with_label_values(&[&db, "/api/traces"])
.start_timer();
match QueryTraceParams::from_jaeger_query_params(&db, query_params) {
match QueryTraceParams::from_jaeger_query_params(query_params) {
Ok(query_params) => {
let output = handler.find_traces(query_ctx, query_params).await;
match output {
@@ -521,13 +488,14 @@ pub async fn handle_get_operations(
State(handler): State<JaegerQueryHandlerRef>,
Query(query_params): Query<JaegerQueryParams>,
Extension(mut query_ctx): Extension<QueryContext>,
TraceTableName(table_name): TraceTableName,
) -> impl IntoResponse {
debug!(
"Received Jaeger '/api/operations' request, query_params: {:?}, query_ctx: {:?}",
query_params, query_ctx
);
if let Some(service_name) = query_params.service_name {
query_ctx.set_channel(Channel::Jaeger);
if let Some(service_name) = &query_params.service_name {
update_query_context(&mut query_ctx, table_name);
let query_ctx = Arc::new(query_ctx);
let db = query_ctx.get_db_string();
@@ -537,7 +505,7 @@ pub async fn handle_get_operations(
.start_timer();
match handler
.get_operations(query_ctx, &service_name, query_params.span_kind.as_deref())
.get_operations(query_ctx, service_name, query_params.span_kind.as_deref())
.await
{
Ok(output) => match covert_to_records(output).await {
@@ -593,12 +561,14 @@ pub async fn handle_get_operations_by_service(
Path(service_name): Path<String>,
Query(query_params): Query<JaegerQueryParams>,
Extension(mut query_ctx): Extension<QueryContext>,
TraceTableName(table_name): TraceTableName,
) -> impl IntoResponse {
debug!(
"Received Jaeger '/api/services/{}/operations' request, query_params: {:?}, query_ctx: {:?}",
service_name, query_params, query_ctx
);
query_ctx.set_channel(Channel::Jaeger);
update_query_context(&mut query_ctx, table_name);
let query_ctx = Arc::new(query_ctx);
let db = query_ctx.get_db_string();
@@ -690,11 +660,8 @@ fn error_response(err: Error) -> (HttpStatusCode, axum::Json<JaegerAPIResponse>)
}),
)
}
// Construct Jaeger traces from records.
fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
let expected_schema = FIND_TRACES_SCHEMA.clone();
check_schema(&records, &expected_schema)?;
fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
// maintain the mapping: trace_id -> (process_id -> service_name).
let mut trace_id_to_processes: HashMap<String, HashMap<String, String>> = HashMap::new();
// maintain the mapping: trace_id -> spans.
@@ -702,38 +669,202 @@ fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
// maintain the mapping: service.name -> resource.attributes.
let mut service_to_resource_attributes: HashMap<String, Vec<KeyValue>> = HashMap::new();
let is_span_attributes_flatten = !records
.schema
.column_schemas
.iter()
.any(|c| c.name == SPAN_ATTRIBUTES_COLUMN);
for row in records.rows.into_iter() {
let mut span = Span::default();
let mut row_iter = row.into_iter();
let mut service_name = None;
let mut resource_tags = vec![];
// Set trace id.
if let Some(JsonValue::String(trace_id)) = row_iter.next() {
span.trace_id = trace_id.clone();
trace_id_to_processes.entry(trace_id).or_default();
}
for (idx, cell) in row.into_iter().enumerate() {
// safe to use index here
let column_name = &records.schema.column_schemas[idx].name;
// Convert timestamp from nanoseconds to microseconds.
if let Some(JsonValue::Number(timestamp)) = row_iter.next() {
span.start_time = timestamp.as_u64().ok_or_else(|| {
InvalidJaegerQuerySnafu {
reason: "Failed to convert timestamp to u64".to_string(),
match column_name.as_str() {
TRACE_ID_COLUMN => {
if let JsonValue::String(trace_id) = cell {
span.trace_id = trace_id.clone();
trace_id_to_processes.entry(trace_id).or_default();
}
}
.build()
})? / 1000;
}
// Convert duration from nanoseconds to microseconds.
if let Some(JsonValue::Number(duration)) = row_iter.next() {
span.duration = duration.as_u64().ok_or_else(|| {
InvalidJaegerQuerySnafu {
reason: "Failed to convert duration to u64".to_string(),
TIMESTAMP_COLUMN => {
span.start_time = cell.as_u64().context(InvalidJaegerQuerySnafu {
reason: "Failed to convert timestamp to u64".to_string(),
})? / 1000;
}
.build()
})? / 1000;
DURATION_NANO_COLUMN => {
span.duration = cell.as_u64().context(InvalidJaegerQuerySnafu {
reason: "Failed to convert duration to u64".to_string(),
})? / 1000;
}
SERVICE_NAME_COLUMN => {
if let JsonValue::String(name) = cell {
service_name = Some(name);
}
}
SPAN_NAME_COLUMN => {
if let JsonValue::String(span_name) = cell {
span.operation_name = span_name;
}
}
SPAN_ID_COLUMN => {
if let JsonValue::String(span_id) = cell {
span.span_id = span_id;
}
}
SPAN_ATTRIBUTES_COLUMN => {
// for v0 data model, span_attributes are nested as a json
// data structure
if let JsonValue::Object(span_attrs) = cell {
span.tags.extend(object_to_tags(span_attrs));
}
}
RESOURCE_ATTRIBUTES_COLUMN => {
// for v0 data model, resource_attributes are nested as a json
// data structure
if let JsonValue::Object(mut resource_attrs) = cell {
resource_attrs.remove(KEY_SERVICE_NAME);
resource_tags = object_to_tags(resource_attrs);
}
}
PARENT_SPAN_ID_COLUMN => {
if let JsonValue::String(parent_span_id) = cell {
if !parent_span_id.is_empty() {
span.references.push(Reference {
trace_id: span.trace_id.clone(),
span_id: parent_span_id,
ref_type: REF_TYPE_CHILD_OF.to_string(),
});
}
}
}
SPAN_EVENTS_COLUMN => {
if let JsonValue::Array(events) = cell {
for event in events {
if let JsonValue::Object(mut obj) = event {
let Some(action) = obj.get("name").and_then(|v| v.as_str()) else {
continue;
};
let Some(t) =
obj.get("time").and_then(|t| t.as_str()).and_then(|s| {
SPAN_KIND_TIME_FMTS
.iter()
.find_map(|fmt| {
chrono::DateTime::parse_from_str(s, fmt).ok()
})
.map(|dt| dt.timestamp_micros() as u64)
})
else {
continue;
};
let mut fields = vec![KeyValue {
key: "event".to_string(),
value_type: ValueType::String,
value: Value::String(action.to_string()),
}];
// Add event attributes as fields
if let Some(JsonValue::Object(attrs)) = obj.remove("attributes") {
fields.extend(object_to_tags(attrs));
}
span.logs.push(Log {
timestamp: t,
fields,
});
}
}
}
}
SCOPE_NAME_COLUMN => {
if let JsonValue::String(scope_name) = cell {
if !scope_name.is_empty() {
span.tags.push(KeyValue {
key: KEY_OTEL_SCOPE_NAME.to_string(),
value_type: ValueType::String,
value: Value::String(scope_name),
});
}
}
}
SCOPE_VERSION_COLUMN => {
if let JsonValue::String(scope_version) = cell {
if !scope_version.is_empty() {
span.tags.push(KeyValue {
key: KEY_OTEL_SCOPE_VERSION.to_string(),
value_type: ValueType::String,
value: Value::String(scope_version),
});
}
}
}
SPAN_KIND_COLUMN => {
if let JsonValue::String(span_kind) = cell {
if !span_kind.is_empty() {
span.tags.push(KeyValue {
key: KEY_SPAN_KIND.to_string(),
value_type: ValueType::String,
value: Value::String(normalize_span_kind(&span_kind)),
});
}
}
}
SPAN_STATUS_CODE => {
if let JsonValue::String(span_status) = cell {
if span_status != SPAN_STATUS_UNSET && !span_status.is_empty() {
span.tags.push(KeyValue {
key: KEY_OTEL_STATUS_CODE.to_string(),
value_type: ValueType::String,
value: Value::String(normalize_status_code(&span_status)),
});
}
}
}
_ => {
// this this v1 data model
if is_span_attributes_flatten {
const SPAN_ATTR_PREFIX: &str = "span_attributes.";
const RESOURCE_ATTR_PREFIX: &str = "resource_attributes.";
// a span attributes column
if column_name.starts_with(SPAN_ATTR_PREFIX) {
if let Some(keyvalue) = to_keyvalue(
column_name
.strip_prefix(SPAN_ATTR_PREFIX)
.unwrap_or_default()
.to_string(),
cell,
) {
span.tags.push(keyvalue);
}
} else if column_name.starts_with(RESOURCE_ATTR_PREFIX) {
if let Some(keyvalue) = to_keyvalue(
column_name
.strip_prefix(RESOURCE_ATTR_PREFIX)
.unwrap_or_default()
.to_string(),
cell,
) {
resource_tags.push(keyvalue);
}
}
}
}
}
}
// Collect services to construct processes.
if let Some(JsonValue::String(service_name)) = row_iter.next() {
if let Some(service_name) = service_name {
if !service_to_resource_attributes.contains_key(&service_name) {
service_to_resource_attributes.insert(service_name.clone(), resource_tags);
}
if let Some(process) = trace_id_to_processes.get_mut(&span.trace_id) {
if let Some(process_id) = process.get(&service_name) {
span.process_id = process_id.clone();
@@ -746,127 +877,8 @@ fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
}
}
// Set operation name. In Jaeger, the operation name is the span name.
if let Some(JsonValue::String(span_name)) = row_iter.next() {
span.operation_name = span_name;
}
// Set span id.
if let Some(JsonValue::String(span_id)) = row_iter.next() {
span.span_id = span_id;
}
// Convert span attributes to tags.
if let Some(JsonValue::Object(object)) = row_iter.next() {
span.tags = object_to_tags(object);
}
// Save resource attributes with service name.
if let Some(JsonValue::Object(mut object)) = row_iter.next() {
if let Some(service_name) = object
.remove(KEY_SERVICE_NAME)
.and_then(|v| v.as_str().map(|s| s.to_string()))
{
match service_to_resource_attributes.entry(service_name) {
Occupied(_) => {}
Vacant(vacant) => {
let _ = vacant.insert(object_to_tags(object));
}
}
}
}
// Set parent span id.
if let Some(JsonValue::String(parent_span_id)) = row_iter.next()
&& !parent_span_id.is_empty()
{
span.references.push(Reference {
trace_id: span.trace_id.clone(),
span_id: parent_span_id,
ref_type: REF_TYPE_CHILD_OF.to_string(),
});
}
// Set span events to logs.
if let Some(JsonValue::Array(events)) = row_iter.next() {
for event in events {
if let JsonValue::Object(mut obj) = event {
let Some(action) = obj.get("name").and_then(|v| v.as_str()) else {
continue;
};
let Some(t) = obj.get("time").and_then(|t| t.as_str()).and_then(|s| {
SPAN_KIND_TIME_FMTS
.iter()
.find_map(|fmt| chrono::DateTime::parse_from_str(s, fmt).ok())
.map(|dt| dt.timestamp_micros() as u64)
}) else {
continue;
};
let mut fields = vec![KeyValue {
key: "event".to_string(),
value_type: ValueType::String,
value: Value::String(action.to_string()),
}];
// Add event attributes as fields
if let Some(JsonValue::Object(attrs)) = obj.remove("attributes") {
fields.extend(object_to_tags(attrs));
}
span.logs.push(Log {
timestamp: t,
fields,
});
}
}
}
// Set scope name.
if let Some(JsonValue::String(scope_name)) = row_iter.next()
&& !scope_name.is_empty()
{
span.tags.push(KeyValue {
key: KEY_OTEL_SCOPE_NAME.to_string(),
value_type: ValueType::String,
value: Value::String(scope_name),
});
}
// Set scope version.
if let Some(JsonValue::String(scope_version)) = row_iter.next()
&& !scope_version.is_empty()
{
span.tags.push(KeyValue {
key: KEY_OTEL_SCOPE_VERSION.to_string(),
value_type: ValueType::String,
value: Value::String(scope_version),
});
}
// Set span kind.
if let Some(JsonValue::String(span_kind)) = row_iter.next()
&& !span_kind.is_empty()
{
span.tags.push(KeyValue {
key: KEY_SPAN_KIND.to_string(),
value_type: ValueType::String,
value: Value::String(normalize_span_kind(&span_kind)),
});
}
// Set span status code.
if let Some(JsonValue::String(span_status_code)) = row_iter.next()
&& span_status_code != SPAN_STATUS_UNSET
&& !span_status_code.is_empty()
{
span.tags.push(KeyValue {
key: KEY_OTEL_STATUS_CODE.to_string(),
value_type: ValueType::String,
value: Value::String(normalize_status_code(&span_status_code)),
});
}
// ensure span tags order
span.tags.sort_by(|a, b| a.key.cmp(&b.key));
if let Some(spans) = trace_id_to_spans.get_mut(&span.trace_id) {
spans.push(span);
@@ -899,42 +911,41 @@ fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
Ok(traces)
}
#[inline]
fn to_keyvalue(key: String, value: JsonValue) -> Option<KeyValue> {
match value {
JsonValue::String(value) => Some(KeyValue {
key,
value_type: ValueType::String,
value: Value::String(value.to_string()),
}),
JsonValue::Number(value) => Some(KeyValue {
key,
value_type: ValueType::Int64,
value: Value::Int64(value.as_i64().unwrap_or(0)),
}),
JsonValue::Bool(value) => Some(KeyValue {
key,
value_type: ValueType::Boolean,
value: Value::Boolean(value),
}),
JsonValue::Array(value) => Some(KeyValue {
key,
value_type: ValueType::String,
value: Value::String(serde_json::to_string(&value).unwrap()),
}),
JsonValue::Object(value) => Some(KeyValue {
key,
value_type: ValueType::String,
value: Value::String(serde_json::to_string(&value).unwrap()),
}),
JsonValue::Null => None,
}
}
fn object_to_tags(object: serde_json::map::Map<String, JsonValue>) -> Vec<KeyValue> {
object
.into_iter()
.filter_map(|(key, value)| match value {
JsonValue::String(value) => Some(KeyValue {
key,
value_type: ValueType::String,
value: Value::String(value.to_string()),
}),
JsonValue::Number(value) => Some(KeyValue {
key,
value_type: ValueType::Int64,
value: Value::Int64(value.as_i64().unwrap_or(0)),
}),
JsonValue::Bool(value) => Some(KeyValue {
key,
value_type: ValueType::Boolean,
value: Value::Boolean(value),
}),
JsonValue::Array(value) => Some(KeyValue {
key,
value_type: ValueType::String,
value: Value::String(serde_json::to_string(&value).unwrap()),
}),
JsonValue::Object(value) => Some(KeyValue {
key,
value_type: ValueType::String,
value: Value::String(serde_json::to_string(&value).unwrap()),
}),
// FIXME(zyy17): Do we need to support other types?
_ => {
warn!("Unsupported value type: {:?}", value);
None
}
})
.filter_map(|(key, value)| to_keyvalue(key, value))
.collect()
}
@@ -1055,7 +1066,6 @@ fn convert_string_to_boolean(input: &serde_json::Value) -> Option<serde_json::Va
#[cfg(test)]
mod tests {
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use serde_json::{json, Number, Value as JsonValue};
use super::*;
@@ -1301,6 +1311,151 @@ mod tests {
}
}
#[test]
fn test_traces_from_v1_records() {
// The tests is the tuple of `(test_records, expected)`.
let tests = vec![(
HttpRecordsOutput {
schema: OutputSchema {
column_schemas: vec![
ColumnSchema {
name: "trace_id".to_string(),
data_type: "String".to_string(),
},
ColumnSchema {
name: "timestamp".to_string(),
data_type: "TimestampNanosecond".to_string(),
},
ColumnSchema {
name: "duration_nano".to_string(),
data_type: "UInt64".to_string(),
},
ColumnSchema {
name: "service_name".to_string(),
data_type: "String".to_string(),
},
ColumnSchema {
name: "span_name".to_string(),
data_type: "String".to_string(),
},
ColumnSchema {
name: "span_id".to_string(),
data_type: "String".to_string(),
},
ColumnSchema {
name: "span_attributes.http.request.method".to_string(),
data_type: "String".to_string(),
},
ColumnSchema {
name: "span_attributes.http.request.url".to_string(),
data_type: "String".to_string(),
},
ColumnSchema {
name: "span_attributes.http.status_code".to_string(),
data_type: "UInt64".to_string(),
},
],
},
rows: vec![
vec![
JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
JsonValue::Number(Number::from_u128(1738726754492422000).unwrap()),
JsonValue::Number(Number::from_u128(100000000).unwrap()),
JsonValue::String("test-service-0".to_string()),
JsonValue::String("access-mysql".to_string()),
JsonValue::String("008421dbbd33a3e9".to_string()),
JsonValue::String("GET".to_string()),
JsonValue::String("/data".to_string()),
JsonValue::Number(Number::from_u128(200).unwrap()),
],
vec![
JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
JsonValue::Number(Number::from_u128(1738726754642422000).unwrap()),
JsonValue::Number(Number::from_u128(100000000).unwrap()),
JsonValue::String("test-service-0".to_string()),
JsonValue::String("access-redis".to_string()),
JsonValue::String("ffa03416a7b9ea48".to_string()),
JsonValue::String("POST".to_string()),
JsonValue::String("/create".to_string()),
JsonValue::Number(Number::from_u128(400).unwrap()),
],
],
total_rows: 2,
metrics: HashMap::new(),
},
vec![Trace {
trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
spans: vec![
Span {
trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
span_id: "008421dbbd33a3e9".to_string(),
operation_name: "access-mysql".to_string(),
start_time: 1738726754492422,
duration: 100000,
tags: vec![
KeyValue {
key: "http.request.method".to_string(),
value_type: ValueType::String,
value: Value::String("GET".to_string()),
},
KeyValue {
key: "http.request.url".to_string(),
value_type: ValueType::String,
value: Value::String("/data".to_string()),
},
KeyValue {
key: "http.status_code".to_string(),
value_type: ValueType::Int64,
value: Value::Int64(200),
},
],
process_id: "p1".to_string(),
..Default::default()
},
Span {
trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
span_id: "ffa03416a7b9ea48".to_string(),
operation_name: "access-redis".to_string(),
start_time: 1738726754642422,
duration: 100000,
tags: vec![
KeyValue {
key: "http.request.method".to_string(),
value_type: ValueType::String,
value: Value::String("POST".to_string()),
},
KeyValue {
key: "http.request.url".to_string(),
value_type: ValueType::String,
value: Value::String("/create".to_string()),
},
KeyValue {
key: "http.status_code".to_string(),
value_type: ValueType::Int64,
value: Value::Int64(400),
},
],
process_id: "p1".to_string(),
..Default::default()
},
],
processes: HashMap::from([(
"p1".to_string(),
Process {
service_name: "test-service-0".to_string(),
tags: vec![],
},
)]),
..Default::default()
}],
)];
for (records, expected) in tests {
let traces = traces_from_records(records).unwrap();
assert_eq!(traces, expected);
}
}
#[test]
fn test_from_jaeger_query_params() {
// The tests is the tuple of `(test_query_params, expected)`.
@@ -1311,7 +1466,6 @@ mod tests {
..Default::default()
},
QueryTraceParams {
db: DEFAULT_SCHEMA_NAME.to_string(),
service_name: "test-service-0".to_string(),
..Default::default()
},
@@ -1329,7 +1483,6 @@ mod tests {
..Default::default()
},
QueryTraceParams {
db: DEFAULT_SCHEMA_NAME.to_string(),
service_name: "test-service-0".to_string(),
operation_name: Some("access-mysql".to_string()),
start_time: Some(1738726754492422000),
@@ -1349,9 +1502,7 @@ mod tests {
];
for (query_params, expected) in tests {
let query_params =
QueryTraceParams::from_jaeger_query_params(DEFAULT_SCHEMA_NAME, query_params)
.unwrap();
let query_params = QueryTraceParams::from_jaeger_query_params(query_params).unwrap();
assert_eq!(query_params, expected);
}
}

View File

@@ -43,6 +43,9 @@ pub const FILE_TABLE_LOCATION_KEY: &str = "location";
pub const FILE_TABLE_PATTERN_KEY: &str = "pattern";
pub const FILE_TABLE_FORMAT_KEY: &str = "format";
pub const TABLE_DATA_MODEL: &str = "table_data_model";
pub const TABLE_DATA_MODEL_TRACE_V1: &str = "greptime_trace_v1";
/// Returns true if the `key` is a valid key for any engine or storage.
pub fn validate_table_option(key: &str) -> bool {
if is_supported_in_s3(key) {
@@ -70,6 +73,8 @@ pub fn validate_table_option(key: &str) -> bool {
// metric engine keys:
PHYSICAL_TABLE_METADATA_KEY,
LOGICAL_TABLE_METADATA_KEY,
// table model info
TABLE_DATA_MODEL,
]
.contains(&key)
}

View File

@@ -92,6 +92,7 @@ itertools.workspace = true
opentelemetry-proto.workspace = true
partition.workspace = true
paste.workspace = true
pipeline.workspace = true
prost.workspace = true
rand.workspace = true
session = { workspace = true, features = ["testing"] }

View File

@@ -28,6 +28,7 @@ use loki_proto::prost_types::Timestamp;
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use pipeline::GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME;
use prost::Message;
use serde_json::{json, Value};
use servers::http::handler::HealthResponse;
@@ -105,6 +106,7 @@ macro_rules! http_tests {
test_elasticsearch_logs_with_index,
test_log_query,
test_jaeger_query_api,
test_jaeger_query_api_for_trace_v1,
);
)*
};
@@ -2160,7 +2162,6 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
// init
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_traces").await;
const TRACE_V1: &str = "greptime_trace_v1";
let content = r#"
{"resourceSpans":[{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"telemetrygen"}}],"droppedAttributesCount":0},"scopeSpans":[{"scope":{"name":"telemetrygen","version":"","attributes":[],"droppedAttributesCount":0},"spans":[{"traceId":"c05d7a4ec8e1f231f02ed6e8da8655b4","spanId":"9630f2916e2f7909","traceState":"","parentSpanId":"d24f921c75f68e23","flags":256,"name":"okey-dokey-0","kind":2,"startTimeUnixNano":"1736480942444376000","endTimeUnixNano":"1736480942444499000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-client"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"c05d7a4ec8e1f231f02ed6e8da8655b4","spanId":"d24f921c75f68e23","traceState":"","parentSpanId":"","flags":256,"name":"lets-go","kind":3,"startTimeUnixNano":"1736480942444376000","endTimeUnixNano":"1736480942444499000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-server"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"cc9e0991a2e63d274984bd44ee669203","spanId":"8f847259b0f6e1ab","traceState":"","parentSpanId":"eba7be77e3558179","flags":256,"name":"okey-dokey-0","kind":2,"startTimeUnixNano":"1736480942444589000","endTimeUnixNano":"1736480942444712000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-client"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"cc9e0991a2e63d274984bd44ee669203","spanId":"eba7be77e3558179","traceState":"","parentSpanId":"","flags":256,"name":"lets-go","kind":3,"startTimeUnixNano":"1736480942444589000","endTimeUnixNano":"1736480942444712000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-server"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}}],"schemaUrl":""}],"schemaUrl":"https://opentelemetry.io/schemas/1.4.0"}]}
@@ -2182,7 +2183,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
),
(
HeaderName::from_static("x-greptime-log-pipeline-name"),
HeaderValue::from_static(TRACE_V1),
HeaderValue::from_static(GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME),
),
(
HeaderName::from_static("x-greptime-trace-table-name"),
@@ -2200,7 +2201,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
let expected = r#"[[1736480942444376000,1736480942444499000,123000,"c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","d24f921c75f68e23","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444376000,1736480942444499000,123000,"c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23",null,"SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444589000,1736480942444712000,123000,"cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","eba7be77e3558179","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444589000,1736480942444712000,123000,"cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179",null,"SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]]]"#;
validate_data("otlp_traces", &client, "select * from mytable;", expected).await;
let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL,\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"trace_id\", \"span_id\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'A',\n trace_id >= 'A' AND trace_id < 'B' OR trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'B' AND trace_id < 'C' OR trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'C' AND trace_id < 'D' OR trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'D' AND trace_id < 'E' OR trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'E' AND trace_id < 'F' OR trace_id >= 'e' AND trace_id < 'f',\n trace_id >= 'F' AND trace_id < 'a' OR trace_id >= 'f'\n)\nENGINE=mito\nWITH(\n append_mode = 'true'\n)"]]"#;
let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"trace_id\", \"span_id\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'A',\n trace_id >= 'A' AND trace_id < 'B' OR trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'B' AND trace_id < 'C' OR trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'C' AND trace_id < 'D' OR trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'D' AND trace_id < 'E' OR trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'E' AND trace_id < 'F' OR trace_id >= 'e' AND trace_id < 'f',\n trace_id >= 'F' AND trace_id < 'a' OR trace_id >= 'f'\n)\nENGINE=mito\nWITH(\n append_mode = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#;
validate_data(
"otlp_traces",
&client,
@@ -2223,7 +2224,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
),
(
HeaderName::from_static("x-greptime-log-pipeline-name"),
HeaderValue::from_static(TRACE_V1),
HeaderValue::from_static(GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME),
),
(
HeaderName::from_static("x-greptime-trace-table-name"),
@@ -2729,8 +2730,8 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
"spanId": "008421dbbd33a3e9",
"name": "access-mysql",
"kind": 2,
"startTimeUnixNano": "1738726754492422000",
"endTimeUnixNano": "1738726754592422000",
"startTimeUnixNano": "1738726754492421000",
"endTimeUnixNano": "1738726754592421000",
"attributes": [
{
"key": "operation.type",
@@ -2807,6 +2808,7 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
let req: ExportTraceServiceRequest = serde_json::from_str(content).unwrap();
let body = req.encode_to_vec();
// write traces data.
let res = send_req(
&client,
@@ -2906,7 +2908,7 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
"spanID": "008421dbbd33a3e9",
"operationName": "access-mysql",
"references": [],
"startTime": 1738726754492422,
"startTime": 1738726754492421,
"duration": 100000,
"tags": [
{
@@ -2919,11 +2921,6 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
"type": "string",
"value": "access-mysql"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "otel.scope.name",
"type": "string",
@@ -2934,6 +2931,11 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
"type": "string",
"value": "1.0.0"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "span.kind",
"type": "string",
@@ -2961,11 +2963,6 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
"type": "string",
"value": "access-redis"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "otel.scope.name",
"type": "string",
@@ -2976,6 +2973,11 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
"type": "string",
"value": "1.0.0"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "span.kind",
"type": "string",
@@ -3008,7 +3010,7 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
// Test `/api/traces` API.
let res = client
.get("/v1/jaeger/api/traces?service=test-jaeger-query-api&operation=access-mysql&start=1738726754492422&end=1738726754642422&tags=%7B%22operation.type%22%3A%22access-mysql%22%7D")
.get("/v1/jaeger/api/traces?service=test-jaeger-query-api&operation=access-mysql&start=1738726754492421&end=1738726754642422&tags=%7B%22operation.type%22%3A%22access-mysql%22%7D")
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
@@ -3023,7 +3025,7 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
"spanID": "008421dbbd33a3e9",
"operationName": "access-mysql",
"references": [],
"startTime": 1738726754492422,
"startTime": 1738726754492421,
"duration": 100000,
"tags": [
{
@@ -3036,11 +3038,6 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
"type": "string",
"value": "access-mysql"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "otel.scope.name",
"type": "string",
@@ -3051,6 +3048,11 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
"type": "string",
"value": "1.0.0"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "span.kind",
"type": "string",
@@ -3084,6 +3086,429 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
guard.remove_all().await;
}
pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
setup_test_http_app_with_frontend(store_type, "test_jaeger_query_api_v1").await;
let client = TestClient::new(app).await;
// Test empty response for `/api/services` API before writing any traces.
let res = client
.get("/v1/jaeger/api/services")
.header("x-greptime-trace-table-name", "mytable")
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
let expected = r#"
{
"data": null,
"total": 0,
"limit": 0,
"offset": 0,
"errors": []
}
"#;
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
let content = r#"
{
"resourceSpans": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "test-jaeger-query-api"
}
}
]
},
"scopeSpans": [
{
"scope": {
"name": "test-jaeger-query-api",
"version": "1.0.0"
},
"spans": [
{
"traceId": "5611dce1bc9ebed65352d99a027b08ea",
"spanId": "008421dbbd33a3e9",
"name": "access-mysql",
"kind": 2,
"startTimeUnixNano": "1738726754492421000",
"endTimeUnixNano": "1738726754592421000",
"attributes": [
{
"key": "operation.type",
"value": {
"stringValue": "access-mysql"
}
},
{
"key": "net.peer.ip",
"value": {
"stringValue": "1.2.3.4"
}
},
{
"key": "peer.service",
"value": {
"stringValue": "test-jaeger-query-api"
}
}
],
"status": {
"message": "success",
"code": 0
}
}
]
},
{
"scope": {
"name": "test-jaeger-query-api",
"version": "1.0.0"
},
"spans": [
{
"traceId": "5611dce1bc9ebed65352d99a027b08ea",
"spanId": "ffa03416a7b9ea48",
"name": "access-redis",
"kind": 2,
"startTimeUnixNano": "1738726754492422000",
"endTimeUnixNano": "1738726754592422000",
"attributes": [
{
"key": "operation.type",
"value": {
"stringValue": "access-redis"
}
},
{
"key": "net.peer.ip",
"value": {
"stringValue": "1.2.3.4"
}
},
{
"key": "peer.service",
"value": {
"stringValue": "test-jaeger-query-api"
}
}
],
"status": {
"message": "success",
"code": 0
}
}
]
}
],
"schemaUrl": "https://opentelemetry.io/schemas/1.4.0"
}
]
}
"#;
let req: ExportTraceServiceRequest = serde_json::from_str(content).unwrap();
let body = req.encode_to_vec();
// write traces data.
let res = send_req(
&client,
vec![
(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/x-protobuf"),
),
(
HeaderName::from_static("x-greptime-log-pipeline-name"),
HeaderValue::from_static(GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME),
),
(
HeaderName::from_static("x-greptime-trace-table-name"),
HeaderValue::from_static("mytable"),
),
],
"/v1/otlp/v1/traces",
body.clone(),
false,
)
.await;
assert_eq!(StatusCode::OK, res.status());
// Test `/api/services` API.
let res = client
.get("/v1/jaeger/api/services")
.header("x-greptime-trace-table-name", "mytable")
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
let expected = r#"
{
"data": [
"test-jaeger-query-api"
],
"total": 1,
"limit": 0,
"offset": 0,
"errors": []
}
"#;
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
// Test `/api/operations` API.
let res = client
.get("/v1/jaeger/api/operations?service=test-jaeger-query-api")
.header("x-greptime-trace-table-name", "mytable")
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
let expected = r#"
{
"data": [
{
"name": "access-mysql",
"spanKind": "server"
},
{
"name": "access-redis",
"spanKind": "server"
}
],
"total": 2,
"limit": 0,
"offset": 0,
"errors": []
}
"#;
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
// Test `/api/services/{service_name}/operations` API.
let res = client
.get("/v1/jaeger/api/services/test-jaeger-query-api/operations")
.header("x-greptime-trace-table-name", "mytable")
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
let expected = r#"
{
"data": [
"access-mysql",
"access-redis"
],
"total": 2,
"limit": 0,
"offset": 0,
"errors": []
}
"#;
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
// Test `/api/traces/{trace_id}` API.
let res = client
.get("/v1/jaeger/api/traces/5611dce1bc9ebed65352d99a027b08ea")
.header("x-greptime-trace-table-name", "mytable")
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
let expected = r#"{
"data": [
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spans": [
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spanID": "008421dbbd33a3e9",
"operationName": "access-mysql",
"references": [],
"startTime": 1738726754492421,
"duration": 100000,
"tags": [
{
"key": "net.peer.ip",
"type": "string",
"value": "1.2.3.4"
},
{
"key": "operation.type",
"type": "string",
"value": "access-mysql"
},
{
"key": "otel.scope.name",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "otel.scope.version",
"type": "string",
"value": "1.0.0"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "span.kind",
"type": "string",
"value": "server"
}
],
"logs": [],
"processID": "p1"
},
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spanID": "ffa03416a7b9ea48",
"operationName": "access-redis",
"references": [],
"startTime": 1738726754492422,
"duration": 100000,
"tags": [
{
"key": "net.peer.ip",
"type": "string",
"value": "1.2.3.4"
},
{
"key": "operation.type",
"type": "string",
"value": "access-redis"
},
{
"key": "otel.scope.name",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "otel.scope.version",
"type": "string",
"value": "1.0.0"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "span.kind",
"type": "string",
"value": "server"
}
],
"logs": [],
"processID": "p1"
}
],
"processes": {
"p1": {
"serviceName": "test-jaeger-query-api",
"tags": []
}
}
}
],
"total": 0,
"limit": 0,
"offset": 0,
"errors": []
}
"#;
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
// Test `/api/traces` API.
let res = client
.get("/v1/jaeger/api/traces?service=test-jaeger-query-api&operation=access-mysql&start=1738726754492421&end=1738726754642422&tags=%7B%22operation.type%22%3A%22access-mysql%22%7D")
.header("x-greptime-trace-table-name", "mytable")
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
let expected = r#"
{
"data": [
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spans": [
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spanID": "008421dbbd33a3e9",
"operationName": "access-mysql",
"references": [],
"startTime": 1738726754492421,
"duration": 100000,
"tags": [
{
"key": "net.peer.ip",
"type": "string",
"value": "1.2.3.4"
},
{
"key": "operation.type",
"type": "string",
"value": "access-mysql"
},
{
"key": "otel.scope.name",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "otel.scope.version",
"type": "string",
"value": "1.0.0"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "span.kind",
"type": "string",
"value": "server"
}
],
"logs": [],
"processID": "p1"
}
],
"processes": {
"p1": {
"serviceName": "test-jaeger-query-api",
"tags": []
}
}
}
],
"total": 0,
"limit": 0,
"offset": 0,
"errors": []
}
"#;
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
guard.remove_all().await;
}
async fn validate_data(test_name: &str, client: &TestClient, sql: &str, expected: &str) {
let res = client
.get(format!("/v1/sql?sql={sql}").as_str())