diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index 2bc5db9824..7dd6da9b4f 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -150,4 +150,9 @@ pub const TRACE_TABLE_NAME_SESSION_KEY: &str = "trace_table_name"; pub fn trace_services_table_name(trace_table_name: &str) -> String { format!("{}_services", trace_table_name) } + +/// Generate the trace operations table name from the trace table name by adding `_operations` suffix. +pub fn trace_operations_table_name(trace_table_name: &str) -> String { + format!("{}_operations", trace_table_name) +} // ---- End of special table and fields ---- diff --git a/src/frontend/src/instance/jaeger.rs b/src/frontend/src/instance/jaeger.rs index 6208866db2..7654f7bb41 100644 --- a/src/frontend/src/instance/jaeger.rs +++ b/src/frontend/src/instance/jaeger.rs @@ -17,7 +17,9 @@ use std::sync::Arc; use async_trait::async_trait; use catalog::CatalogManagerRef; -use common_catalog::consts::{TRACE_TABLE_NAME, trace_services_table_name}; +use common_catalog::consts::{ + TRACE_TABLE_NAME, trace_operations_table_name, trace_services_table_name, +}; use common_function::function::FunctionRef; use common_function::scalars::json::json_get::{ JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString, @@ -76,8 +78,6 @@ impl JaegerQueryHandler for Instance { ctx: QueryContextRef, service_name: &str, span_kind: Option<&str>, - start_time: Option, - end_time: Option, ) -> ServerResult { let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))]; @@ -89,16 +89,6 @@ impl JaegerQueryHandler for Instance { )))); } - if let Some(start_time) = start_time { - // Microseconds to nanoseconds. - filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time * 1_000))); - } - - if let Some(end_time) = end_time { - // Microseconds to nanoseconds. - filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time * 1_000))); - } - // It's equivalent to the following SQL query: // // ``` @@ -107,8 +97,6 @@ impl JaegerQueryHandler for Instance { // {db}.{trace_table} // WHERE // service_name = '{service_name}' AND - // timestamp >= {start_time} AND - // timestamp <= {end_time} AND // span_kind = '{span_kind}' // ORDER BY // span_name ASC @@ -301,12 +289,18 @@ async fn query_trace_table( .unwrap_or(TRACE_TABLE_NAME); // If only select services, use the trace services table. + // If querying operations (distinct by span_name and span_kind), use the trace operations table. let table_name = { if match selects.as_slice() { [SelectExpr::Expression(x)] => x == &col(SERVICE_NAME_COLUMN), _ => false, } { &trace_services_table_name(trace_table_name) + } else if !distincts.is_empty() + && distincts.contains(&col(SPAN_NAME_COLUMN)) + && distincts.contains(&col(SPAN_KIND_COLUMN)) + { + &trace_operations_table_name(trace_table_name) } else { trace_table_name } diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index cb63b07772..59ab06c95e 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -29,7 +29,8 @@ use catalog::CatalogManagerRef; use client::{OutputData, OutputMeta}; use common_catalog::consts::{ PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, TRACE_ID_COLUMN, TRACE_TABLE_NAME, - TRACE_TABLE_NAME_SESSION_KEY, default_engine, trace_services_table_name, + TRACE_TABLE_NAME_SESSION_KEY, default_engine, trace_operations_table_name, + trace_services_table_name, }; use common_grpc_expr::util::ColumnExpr; use common_meta::cache::TableFlownodeSetCacheRef; @@ -618,8 +619,10 @@ impl Inserter { // note that auto create table shouldn't be ttl instant table // for it's a very unexpected behavior and should be set by user explicitly for mut create_table in create_tables { - if create_table.table_name == trace_services_table_name(trace_table_name) { - // Disable append mode for trace services table since it requires upsert behavior. + if create_table.table_name == trace_services_table_name(trace_table_name) + || create_table.table_name == trace_operations_table_name(trace_table_name) + { + // Disable append mode for auxiliary tables (services/operations) since they require upsert behavior. create_table .table_options .insert(APPEND_MODE_KEY.to_string(), "false".to_string()); diff --git a/src/servers/src/http/jaeger.rs b/src/servers/src/http/jaeger.rs index 77b598ad1a..9420c5ca2f 100644 --- a/src/servers/src/http/jaeger.rs +++ b/src/servers/src/http/jaeger.rs @@ -21,7 +21,6 @@ use axum::Extension; use axum::extract::{Path, Query, State}; use axum::http::{HeaderMap, StatusCode as HttpStatusCode}; use axum::response::IntoResponse; -use chrono::Utc; use common_catalog::consts::{PARENT_SPAN_ID_COLUMN, TRACE_TABLE_NAME}; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; @@ -52,7 +51,6 @@ pub const JAEGER_QUERY_TABLE_NAME_KEY: &str = "jaeger_query_table_name"; const REF_TYPE_CHILD_OF: &str = "CHILD_OF"; const SPAN_KIND_TIME_FMTS: [&str; 2] = ["%Y-%m-%d %H:%M:%S%.6f%z", "%Y-%m-%d %H:%M:%S%.9f%z"]; -pub const JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER: &str = "x-greptime-jaeger-query-time-range"; /// JaegerAPIResponse is the response of Jaeger HTTP API. /// The original version is `structuredResponse` which is defined in https://github.com/jaegertracing/jaeger/blob/main/cmd/query/app/http_handler.go. @@ -528,13 +526,6 @@ pub async fn handle_get_operations( query_params, query_ctx, headers ); - let (start, end) = match parse_jaeger_time_range_for_operations(&headers, &query_params) { - Ok((start, end)) => (start, end), - Err(e) => return error_response(e), - }; - - debug!("Get operations with start: {:?}, end: {:?}", start, end); - if let Some(service_name) = &query_params.service_name { update_query_context(&mut query_ctx, table_name); let query_ctx = Arc::new(query_ctx); @@ -546,13 +537,7 @@ pub async fn handle_get_operations( .start_timer(); match handler - .get_operations( - query_ctx, - service_name, - query_params.span_kind.as_deref(), - start, - end, - ) + .get_operations(query_ctx, service_name, query_params.span_kind.as_deref()) .await { Ok(output) => match covert_to_records(output).await { @@ -625,15 +610,7 @@ pub async fn handle_get_operations_by_service( .with_label_values(&[&db, "/api/services"]) .start_timer(); - let (start, end) = match parse_jaeger_time_range_for_operations(&headers, &query_params) { - Ok((start, end)) => (start, end), - Err(e) => return error_response(e), - }; - - match handler - .get_operations(query_ctx, &service_name, None, start, end) - .await - { + match handler.get_operations(query_ctx, &service_name, None).await { Ok(output) => match covert_to_records(output).await { Ok(Some(records)) => match operations_from_records(records, false) { Ok(operations) => { @@ -1117,42 +1094,6 @@ fn convert_string_to_boolean(input: &serde_json::Value) -> Option Result<(Option, Option)> { - if let Some(time_range) = headers.get(JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER) { - match time_range.to_str() { - Ok(time_range) => match humantime::parse_duration(time_range) { - Ok(duration) => { - debug!( - "Get operations with time range: {:?}, duration: {:?}", - time_range, duration - ); - let now = Utc::now().timestamp_micros(); - Ok((Some(now - duration.as_micros() as i64), Some(now))) - } - Err(e) => { - error!("Failed to parse time range header: {:?}", e); - Err(InvalidJaegerQuerySnafu { - reason: format!("invalid time range header: {:?}", time_range), - } - .build()) - } - }, - Err(e) => { - error!("Failed to convert time range header to string: {:?}", e); - Err(InvalidJaegerQuerySnafu { - reason: format!("invalid time range header: {:?}", time_range), - } - .build()) - } - } - } else { - Ok((query_params.start, query_params.end)) - } -} - #[cfg(test)] mod tests { use serde_json::{Number, Value as JsonValue, json}; diff --git a/src/servers/src/otlp/trace/v0.rs b/src/servers/src/otlp/trace/v0.rs index 03f279fccb..d45b0e6802 100644 --- a/src/servers/src/otlp/trace/v0.rs +++ b/src/servers/src/otlp/trace/v0.rs @@ -16,7 +16,7 @@ use std::collections::HashSet; use api::v1::value::ValueData; use api::v1::{ColumnDataType, RowInsertRequests}; -use common_catalog::consts::trace_services_table_name; +use common_catalog::consts::{trace_operations_table_name, trace_services_table_name}; use common_grpc::precision::Precision; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use pipeline::{GreptimePipelineParams, PipelineWay}; @@ -35,6 +35,9 @@ use crate::row_writer::{self, MultiTableData, TableData}; const APPROXIMATE_COLUMN_COUNT: usize = 24; +// Use a timestamp(2100-01-01 00:00:00) as large as possible. +const MAX_TIMESTAMP: i64 = 4102444800000000000; + /// Convert SpanTraces to GreptimeDB row insert requests. /// Returns `InsertRequests` and total number of rows to ingest pub fn v0_to_grpc_insert_requests( @@ -49,23 +52,40 @@ pub fn v0_to_grpc_insert_requests( let mut multi_table_writer = MultiTableData::default(); let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len()); let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1); + let mut trace_operations_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1); let mut services = HashSet::new(); + let mut operations = HashSet::new(); for span in spans { if let Some(service_name) = &span.service_name { // Only insert the service name if it's not already in the set. if !services.contains(service_name) { services.insert(service_name.clone()); } + + // Collect operations (service_name + span_name + span_kind). + let operation = ( + service_name.clone(), + span.span_name.clone(), + span.span_kind.clone(), + ); + if !operations.contains(&operation) { + operations.insert(operation); + } } write_span_to_row(&mut trace_writer, span)?; } write_trace_services_to_row(&mut trace_services_writer, services)?; + write_trace_operations_to_row(&mut trace_operations_writer, operations)?; multi_table_writer.add_table_data( trace_services_table_name(&table_name), trace_services_writer, ); + multi_table_writer.add_table_data( + trace_operations_table_name(&table_name), + trace_operations_writer, + ); multi_table_writer.add_table_data(table_name, trace_writer); Ok(multi_table_writer.into_row_insert_requests()) @@ -161,7 +181,7 @@ fn write_trace_services_to_row(writer: &mut TableData, services: HashSet row_writer::write_ts_to_nanos( writer, TIMESTAMP_COLUMN, - Some(4102444800000000000), // Use a timestamp(2100-01-01 00:00:00) as large as possible. + Some(MAX_TIMESTAMP), Precision::Nanosecond, &mut row, )?; @@ -180,3 +200,35 @@ fn write_trace_services_to_row(writer: &mut TableData, services: HashSet Ok(()) } + +fn write_trace_operations_to_row( + writer: &mut TableData, + operations: HashSet<(String, String, String)>, +) -> Result<()> { + for (service_name, span_name, span_kind) in operations { + let mut row = writer.alloc_one_row(); + // Write the timestamp as 0. + row_writer::write_ts_to_nanos( + writer, + TIMESTAMP_COLUMN, + Some(MAX_TIMESTAMP), + Precision::Nanosecond, + &mut row, + )?; + + // Write the `service_name`, `span_name`, and `span_kind` columns. + row_writer::write_fields( + writer, + vec![ + make_string_column_data(SERVICE_NAME_COLUMN, Some(service_name)), + make_string_column_data(SPAN_NAME_COLUMN, Some(span_name)), + make_string_column_data(SPAN_KIND_COLUMN, Some(span_kind)), + ] + .into_iter(), + &mut row, + )?; + writer.add_row(row); + } + + Ok(()) +} diff --git a/src/servers/src/otlp/trace/v1.rs b/src/servers/src/otlp/trace/v1.rs index 306444bc18..b7dcdbce7c 100644 --- a/src/servers/src/otlp/trace/v1.rs +++ b/src/servers/src/otlp/trace/v1.rs @@ -16,7 +16,7 @@ use std::collections::HashSet; use api::v1::value::ValueData; use api::v1::{ColumnDataType, RowInsertRequests, Value}; -use common_catalog::consts::trace_services_table_name; +use common_catalog::consts::{trace_operations_table_name, trace_services_table_name}; use common_grpc::precision::Precision; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue; @@ -37,6 +37,9 @@ use crate::row_writer::{self, MultiTableData, TableData}; const APPROXIMATE_COLUMN_COUNT: usize = 30; +// Use a timestamp(2100-01-01 00:00:00) as large as possible. +const MAX_TIMESTAMP: i64 = 4102444800000000000; + /// Convert SpanTraces to GreptimeDB row insert requests. /// Returns `InsertRequests` and total number of rows to ingest /// @@ -60,23 +63,40 @@ pub fn v1_to_grpc_insert_requests( let mut multi_table_writer = MultiTableData::default(); let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len()); let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1); + let mut trace_operations_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1); let mut services = HashSet::new(); + let mut operations = HashSet::new(); for span in spans { if let Some(service_name) = &span.service_name { // Only insert the service name if it's not already in the set. if !services.contains(service_name) { services.insert(service_name.clone()); } + + // Only insert the operation if it's not already in the set. + let operation = ( + service_name.clone(), + span.span_name.clone(), + span.span_kind.clone(), + ); + if !operations.contains(&operation) { + operations.insert(operation); + } } write_span_to_row(&mut trace_writer, span)?; } write_trace_services_to_row(&mut trace_services_writer, services)?; + write_trace_operations_to_row(&mut trace_operations_writer, operations)?; multi_table_writer.add_table_data( trace_services_table_name(&table_name), trace_services_writer, ); + multi_table_writer.add_table_data( + trace_operations_table_name(&table_name), + trace_operations_writer, + ); multi_table_writer.add_table_data(table_name, trace_writer); Ok(multi_table_writer.into_row_insert_requests()) @@ -160,7 +180,7 @@ fn write_trace_services_to_row(writer: &mut TableData, services: HashSet row_writer::write_ts_to_nanos( writer, TIMESTAMP_COLUMN, - Some(4102444800000000000), // Use a timestamp(2100-01-01 00:00:00) as large as possible. + Some(MAX_TIMESTAMP), Precision::Nanosecond, &mut row, )?; @@ -177,6 +197,38 @@ fn write_trace_services_to_row(writer: &mut TableData, services: HashSet Ok(()) } +fn write_trace_operations_to_row( + writer: &mut TableData, + operations: HashSet<(String, String, String)>, +) -> Result<()> { + for (service_name, span_name, span_kind) in operations { + let mut row = writer.alloc_one_row(); + // Write the timestamp as 0. + row_writer::write_ts_to_nanos( + writer, + TIMESTAMP_COLUMN, + Some(MAX_TIMESTAMP), + Precision::Nanosecond, + &mut row, + )?; + + // Write the `service_name`, `span_name`, and `span_kind` columns as tags. + row_writer::write_tags( + writer, + vec![ + (SERVICE_NAME_COLUMN.to_string(), service_name), + (SPAN_NAME_COLUMN.to_string(), span_name), + (SPAN_KIND_COLUMN.to_string(), span_kind), + ] + .into_iter(), + &mut row, + )?; + writer.add_row(row); + } + + Ok(()) +} + fn write_attributes( writer: &mut TableData, prefix: &str, diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 74204be3e2..d41f68555b 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -198,8 +198,6 @@ pub trait JaegerQueryHandler { ctx: QueryContextRef, service_name: &str, span_kind: Option<&str>, - start_time: Option, - end_time: Option, ) -> Result; /// Retrieves a trace by its unique identifier. diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index f6c28c6602..9113b356ae 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -25,7 +25,8 @@ use auth::user_provider_from_option; use axum::http::{HeaderName, HeaderValue, StatusCode}; use chrono::Utc; use common_catalog::consts::{ - DEFAULT_PRIVATE_SCHEMA_NAME, TRACE_TABLE_NAME, trace_services_table_name, + DEFAULT_PRIVATE_SCHEMA_NAME, TRACE_TABLE_NAME, trace_operations_table_name, + trace_services_table_name, }; use common_error::status_code::StatusCode as ErrorCode; use common_frontend::slow_query_event::{ @@ -48,7 +49,6 @@ use servers::http::header::constants::{ GREPTIME_LOG_TABLE_NAME_HEADER_NAME, GREPTIME_PIPELINE_NAME_HEADER_NAME, }; use servers::http::header::{GREPTIME_DB_HEADER_NAME, GREPTIME_TIMEZONE_HEADER_NAME}; -use servers::http::jaeger::JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER; use servers::http::prometheus::{PrometheusJsonResponse, PrometheusResponse}; use servers::http::result::error_result::ErrorResponse; use servers::http::result::greptime_result_v1::GreptimedbV1Response; @@ -4496,6 +4496,19 @@ pub async fn test_otlp_traces_v0(store_type: StorageType) { ) .await; + // Validate operations table + let expected = r#"[["telemetrygen","SPAN_KIND_CLIENT","lets-go"],["telemetrygen","SPAN_KIND_SERVER","okey-dokey-0"]]"#; + validate_data( + "otlp_traces_operations", + &client, + &format!( + "select service_name, span_kind, span_name from {} order by span_kind, span_name;", + trace_operations_table_name(TRACE_TABLE_NAME) + ), + expected, + ) + .await; + // select traces data let expected = r#"[[1736480942444376000,1736480942444499000,123000,"c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23",null,"SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-server"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1736480942444376000,1736480942444499000,123000,"c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","d24f921c75f68e23","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-client"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1736480942444589000,1736480942444712000,123000,"cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179",null,"SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-server"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1736480942444589000,1736480942444712000,123000,"cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","eba7be77e3558179","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-client"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}]]"#; validate_data( @@ -4611,6 +4624,19 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) { ) .await; + // Validate operations table + let expected = r#"[["telemetrygen","SPAN_KIND_CLIENT","lets-go"],["telemetrygen","SPAN_KIND_SERVER","okey-dokey-0"]]"#; + validate_data( + "otlp_traces_operations_v1", + &client, + &format!( + "select service_name, span_kind, span_name from {} order by span_kind, span_name;", + trace_operations_table_name(trace_table_name) + ), + expected, + ) + .await; + // select traces data let expected = r#"[[1736480942444376000,1736480942444499000,123000,null,"c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444376000,1736480942444499000,123000,"d24f921c75f68e23","c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444589000,1736480942444712000,123000,null,"cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444589000,1736480942444712000,123000,"eba7be77e3558179","cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]]]"#; validate_data("otlp_traces", &client, "select * from mytable;", expected).await; @@ -6017,19 +6043,26 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { let res = client .get("/v1/jaeger/api/operations?service=test-jaeger-query-api") .header("x-greptime-trace-table-name", trace_table_name) - .header(JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER, "3 days") .send() .await; assert_eq!(StatusCode::OK, res.status()); let expected = r#" { "data": [ + { + "name": "access-mysql", + "spanKind": "server" + }, { "name": "access-pg", "spanKind": "server" + }, + { + "name": "access-redis", + "spanKind": "server" } ], - "total": 1, + "total": 3, "limit": 0, "offset": 0, "errors": [] @@ -6050,9 +6083,10 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { { "data": [ "access-mysql", + "access-pg", "access-redis" ], - "total": 2, + "total": 3, "limit": 0, "offset": 0, "errors": []