diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index 6c6ec5c5bf..62308112c6 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -137,4 +137,12 @@ pub const SPAN_ID_COLUMN: &str = "span_id"; pub const SPAN_NAME_COLUMN: &str = "span_name"; pub const SERVICE_NAME_COLUMN: &str = "service_name"; pub const PARENT_SPAN_ID_COLUMN: &str = "parent_span_id"; +pub const TRACE_TABLE_NAME: &str = "opentelemetry_traces"; +pub const TRACE_TABLE_NAME_SESSION_KEY: &str = "trace_table_name"; +// ---- End of special table and fields ---- + +/// Generate the trace services table name from the trace table name by adding `_services` suffix. +pub fn trace_services_table_name(trace_table_name: &str) -> String { + format!("{}_services", trace_table_name) +} // ---- End of special table and fields ---- diff --git a/src/frontend/src/instance/jaeger.rs b/src/frontend/src/instance/jaeger.rs index edd94d450d..d1d7fbaf5d 100644 --- a/src/frontend/src/instance/jaeger.rs +++ b/src/frontend/src/instance/jaeger.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use async_trait::async_trait; use catalog::CatalogManagerRef; +use common_catalog::consts::{trace_services_table_name, TRACE_TABLE_NAME}; use common_function::function::{Function, FunctionRef}; use common_function::scalars::json::json_get::{ JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString, @@ -38,7 +39,7 @@ use servers::error::{ use servers::http::jaeger::{QueryTraceParams, JAEGER_QUERY_TABLE_NAME_KEY}; use servers::otlp::trace::{ DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_KIND_COLUMN, - SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_TABLE_NAME, + SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, }; use servers::query_handler::JaegerQueryHandler; use session::context::QueryContextRef; @@ -48,7 +49,7 @@ use table::table::adapter::DfTableProviderAdapter; use super::Instance; -const DEFAULT_LIMIT: usize = 100; +const DEFAULT_LIMIT: usize = 2000; #[async_trait] impl JaegerQueryHandler for Instance { @@ -61,7 +62,7 @@ impl JaegerQueryHandler for Instance { vec![col(SERVICE_NAME_COLUMN)], vec![], vec![], - Some(DEFAULT_LIMIT), + None, None, vec![col(SERVICE_NAME_COLUMN)], ) @@ -216,10 +217,19 @@ async fn query_trace_table( tags: Option>, distincts: Vec, ) -> ServerResult { - let table_name = ctx + let trace_table_name = ctx .extension(JAEGER_QUERY_TABLE_NAME_KEY) .unwrap_or(TRACE_TABLE_NAME); + // If only select services, use the trace services table. + let table_name = { + if selects.len() == 1 && selects[0] == col(SERVICE_NAME_COLUMN) { + &trace_services_table_name(trace_table_name) + } else { + trace_table_name + } + }; + let table = catalog_manager .table( ctx.current_catalog(), diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 579be92e44..b2742685fb 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -28,7 +28,8 @@ use api::v1::{ use catalog::CatalogManagerRef; use client::{OutputData, OutputMeta}; use common_catalog::consts::{ - default_engine, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, TRACE_ID_COLUMN, + default_engine, trace_services_table_name, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, + TRACE_ID_COLUMN, TRACE_TABLE_NAME, TRACE_TABLE_NAME_SESSION_KEY, }; use common_grpc_expr::util::ColumnExpr; use common_meta::cache::TableFlownodeSetCacheRef; @@ -571,53 +572,69 @@ impl Inserter { } AutoCreateTableType::Trace => { + let trace_table_name = ctx + .extension(TRACE_TABLE_NAME_SESSION_KEY) + .unwrap_or(TRACE_TABLE_NAME); + // note that auto create table shouldn't be ttl instant table // for it's a very unexpected behavior and should be set by user explicitly for mut create_table in create_tables { - // prebuilt partition rules for uuid data: see the function - // for more information - let partitions = partition_rule_for_hexstring(TRACE_ID_COLUMN); - // add skip index to - // - trace_id: when searching by trace id - // - parent_span_id: when searching root span - // - span_name: when searching certain types of span - let index_columns = - [TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN]; - for index_column in index_columns { - if let Some(col) = create_table - .column_defs - .iter_mut() - .find(|c| c.name == index_column) - { - col.options = options_from_skipping(&SkippingIndexOptions::default()) - .context(ColumnOptionsSnafu)?; - } else { - warn!( - "Column {} not found when creating index for trace table: {}.", - index_column, create_table.table_name - ); + if create_table.table_name == trace_services_table_name(trace_table_name) { + let table = self + .create_physical_table(create_table, None, ctx, statement_executor) + .await?; + let table_info = table.table_info(); + if table_info.is_ttl_instant_table() { + instant_table_ids.insert(table_info.table_id()); + } + table_infos.insert(table_info.table_id(), table.table_info()); + } else { + // prebuilt partition rules for uuid data: see the function + // for more information + let partitions = partition_rule_for_hexstring(TRACE_ID_COLUMN); + // add skip index to + // - trace_id: when searching by trace id + // - parent_span_id: when searching root span + // - span_name: when searching certain types of span + let index_columns = + [TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN]; + for index_column in index_columns { + if let Some(col) = create_table + .column_defs + .iter_mut() + .find(|c| c.name == index_column) + { + col.options = + options_from_skipping(&SkippingIndexOptions::default()) + .context(ColumnOptionsSnafu)?; + } else { + warn!( + "Column {} not found when creating index for trace table: {}.", + index_column, create_table.table_name + ); + } } - } - // use table_options to mark table model version - create_table.table_options.insert( - TABLE_DATA_MODEL.to_string(), - TABLE_DATA_MODEL_TRACE_V1.to_string(), - ); + // use table_options to mark table model version + create_table.table_options.insert( + TABLE_DATA_MODEL.to_string(), + TABLE_DATA_MODEL_TRACE_V1.to_string(), + ); - let table = self - .create_physical_table( - create_table, - Some(partitions), - ctx, - statement_executor, - ) - .await?; - let table_info = table.table_info(); - if table_info.is_ttl_instant_table() { - instant_table_ids.insert(table_info.table_id()); + let table = self + .create_physical_table( + create_table, + Some(partitions), + ctx, + statement_executor, + ) + .await?; + let table_info = table.table_info(); + if table_info.is_ttl_instant_table() { + instant_table_ids.insert(table_info.table_id()); + } + table_infos.insert(table_info.table_id(), table.table_info()); } - table_infos.insert(table_info.table_id(), table.table_info()); } for alter_expr in alter_tables.into_iter() { statement_executor diff --git a/src/servers/src/http/header.rs b/src/servers/src/http/header.rs index b112743f05..0170346e79 100644 --- a/src/servers/src/http/header.rs +++ b/src/servers/src/http/header.rs @@ -58,6 +58,7 @@ pub mod constants { pub const GREPTIME_LOG_TABLE_NAME_HEADER_NAME: &str = "x-greptime-log-table-name"; pub const GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME: &str = "x-greptime-log-extract-keys"; pub const GREPTIME_TRACE_TABLE_NAME_HEADER_NAME: &str = "x-greptime-trace-table-name"; + /// The header key that contains the pipeline params. pub const GREPTIME_PIPELINE_PARAMS_HEADER: &str = "x-greptime-pipeline-params"; } diff --git a/src/servers/src/http/jaeger.rs b/src/servers/src/http/jaeger.rs index e50b22e2b5..9d59c9ae29 100644 --- a/src/servers/src/http/jaeger.rs +++ b/src/servers/src/http/jaeger.rs @@ -20,7 +20,7 @@ use axum::http::{HeaderMap, StatusCode as HttpStatusCode}; use axum::response::IntoResponse; use axum::Extension; use chrono::Utc; -use common_catalog::consts::PARENT_SPAN_ID_COLUMN; +use common_catalog::consts::{PARENT_SPAN_ID_COLUMN, TRACE_TABLE_NAME}; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_query::{Output, OutputData}; @@ -42,7 +42,7 @@ use crate::otlp::trace::{ KEY_SERVICE_NAME, KEY_SPAN_KIND, RESOURCE_ATTRIBUTES_COLUMN, SCOPE_NAME_COLUMN, SCOPE_VERSION_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, - SPAN_STATUS_PREFIX, SPAN_STATUS_UNSET, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_TABLE_NAME, + SPAN_STATUS_PREFIX, SPAN_STATUS_UNSET, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, }; use crate::query_handler::JaegerQueryHandlerRef; @@ -337,7 +337,11 @@ pub async fn handle_get_services( query_params, query_ctx ); - update_query_context(&mut query_ctx, table_name); + query_ctx.set_channel(Channel::Jaeger); + if let Some(table) = table_name { + query_ctx.set_extension(JAEGER_QUERY_TABLE_NAME_KEY, table); + } + let query_ctx = Arc::new(query_ctx); let db = query_ctx.get_db_string(); diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index 90f97843f3..a5bc0c6410 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -19,6 +19,7 @@ use axum::http::header; use axum::response::IntoResponse; use axum::Extension; use bytes::Bytes; +use common_catalog::consts::{TRACE_TABLE_NAME, TRACE_TABLE_NAME_SESSION_KEY}; use common_telemetry::tracing; use opentelemetry_proto::tonic::collector::logs::v1::{ ExportLogsServiceRequest, ExportLogsServiceResponse, @@ -38,7 +39,6 @@ use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF}; use crate::error::{self, PipelineSnafu, Result}; use crate::http::extractor::{LogTableName, PipelineInfo, SelectInfoWrapper, TraceTableName}; use crate::metrics::METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED; -use crate::otlp::trace::TRACE_TABLE_NAME; use crate::query_handler::{OpenTelemetryProtocolHandlerRef, PipelineHandler}; #[axum_macros::debug_handler] @@ -82,6 +82,8 @@ pub async fn traces( let table_name = table_name.unwrap_or_else(|| TRACE_TABLE_NAME.to_string()); query_ctx.set_channel(Channel::Otlp); + query_ctx.set_extension(TRACE_TABLE_NAME_SESSION_KEY, &table_name); + let query_ctx = Arc::new(query_ctx); let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED .with_label_values(&[db.as_str()]) diff --git a/src/servers/src/otlp/trace.rs b/src/servers/src/otlp/trace.rs index 633dd498ca..7c2d9da2f9 100644 --- a/src/servers/src/otlp/trace.rs +++ b/src/servers/src/otlp/trace.rs @@ -28,8 +28,6 @@ use session::context::QueryContextRef; use crate::error::{NotSupportedSnafu, Result}; use crate::query_handler::PipelineHandlerRef; -pub const TRACE_TABLE_NAME: &str = "opentelemetry_traces"; - // column names pub const SERVICE_NAME_COLUMN: &str = "service_name"; pub const TIMESTAMP_COLUMN: &str = "timestamp"; diff --git a/src/servers/src/otlp/trace/v0.rs b/src/servers/src/otlp/trace/v0.rs index b46d29a1f2..12db06635e 100644 --- a/src/servers/src/otlp/trace/v0.rs +++ b/src/servers/src/otlp/trace/v0.rs @@ -12,8 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; + use api::v1::value::ValueData; use api::v1::{ColumnDataType, RowInsertRequests}; +use common_catalog::consts::trace_services_table_name; use common_grpc::precision::Precision; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use pipeline::{GreptimePipelineParams, PipelineWay}; @@ -44,15 +47,26 @@ pub fn v0_to_grpc_insert_requests( ) -> Result<(RowInsertRequests, usize)> { let spans = parse(request); let mut multi_table_writer = MultiTableData::default(); - let one_table_writer = multi_table_writer.get_or_default_table_data( - table_name, - APPROXIMATE_COLUMN_COUNT, - spans.len(), - ); + let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len()); + let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1); + let mut services = HashSet::new(); for span in spans { - write_span_to_row(one_table_writer, span)?; + if let Some(service_name) = &span.service_name { + // Only insert the service name if it's not already in the set. + if !services.contains(service_name) { + services.insert(service_name.clone()); + } + } + write_span_to_row(&mut trace_writer, span)?; } + write_trace_services_to_row(&mut trace_services_writer, services)?; + + multi_table_writer.add_table_data( + trace_services_table_name(&table_name), + trace_services_writer, + ); + multi_table_writer.add_table_data(table_name, trace_writer); Ok(multi_table_writer.into_row_insert_requests()) } @@ -142,3 +156,27 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> Ok(()) } + +fn write_trace_services_to_row(writer: &mut TableData, services: HashSet) -> Result<()> { + for service_name in services { + let mut row = writer.alloc_one_row(); + // Write the timestamp as 0. + row_writer::write_ts_to_nanos( + writer, + TIMESTAMP_COLUMN, + Some(4102444800000000000), // Use a timestamp(2100-01-01 00:00:00) as large as possible. + Precision::Nanosecond, + &mut row, + )?; + + // Write the `service_name` column. + row_writer::write_fields( + writer, + std::iter::once(make_string_column_data(SERVICE_NAME_COLUMN, service_name)), + &mut row, + )?; + writer.add_row(row); + } + + Ok(()) +} diff --git a/src/servers/src/otlp/trace/v1.rs b/src/servers/src/otlp/trace/v1.rs index 442c9c49d9..2118083c65 100644 --- a/src/servers/src/otlp/trace/v1.rs +++ b/src/servers/src/otlp/trace/v1.rs @@ -12,8 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; + use api::v1::value::ValueData; use api::v1::{ColumnDataType, RowInsertRequests, Value}; +use common_catalog::consts::trace_services_table_name; use common_grpc::precision::Precision; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue; @@ -55,16 +58,26 @@ pub fn v1_to_grpc_insert_requests( ) -> Result<(RowInsertRequests, usize)> { let spans = parse(request); let mut multi_table_writer = MultiTableData::default(); + let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len()); + let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1); - let one_table_writer = multi_table_writer.get_or_default_table_data( - table_name, - APPROXIMATE_COLUMN_COUNT, - spans.len(), - ); - + let mut services = HashSet::new(); for span in spans { - write_span_to_row(one_table_writer, span)?; + if let Some(service_name) = &span.service_name { + // Only insert the service name if it's not already in the set. + if !services.contains(service_name) { + services.insert(service_name.clone()); + } + } + write_span_to_row(&mut trace_writer, span)?; } + write_trace_services_to_row(&mut trace_services_writer, services)?; + + multi_table_writer.add_table_data( + trace_services_table_name(&table_name), + trace_services_writer, + ); + multi_table_writer.add_table_data(table_name, trace_writer); Ok(multi_table_writer.into_row_insert_requests()) } @@ -150,6 +163,30 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> Ok(()) } +fn write_trace_services_to_row(writer: &mut TableData, services: HashSet) -> Result<()> { + for service_name in services { + let mut row = writer.alloc_one_row(); + // Write the timestamp as 0. + row_writer::write_ts_to_nanos( + writer, + TIMESTAMP_COLUMN, + Some(4102444800000000000), // Use a timestamp(2100-01-01 00:00:00) as large as possible. + Precision::Nanosecond, + &mut row, + )?; + + // Write the `service_name` column. + row_writer::write_fields( + writer, + std::iter::once(make_string_column_data(SERVICE_NAME_COLUMN, service_name)), + &mut row, + )?; + writer.add_row(row); + } + + Ok(()) +} + fn write_attributes( writer: &mut TableData, prefix: &str, diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 8cb67e49ec..1c665d96e1 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -20,6 +20,7 @@ use api::prom_store::remote::WriteRequest; use auth::user_provider_from_option; use axum::http::{HeaderName, HeaderValue, StatusCode}; use chrono::Utc; +use common_catalog::consts::{trace_services_table_name, TRACE_TABLE_NAME}; use common_error::status_code::StatusCode as ErrorCode; use flate2::write::GzEncoder; use flate2::Compression; @@ -2356,6 +2357,18 @@ pub async fn test_otlp_traces_v0(store_type: StorageType) { .await; assert_eq!(StatusCode::OK, res.status()); + let expected = r#"[["telemetrygen"]]"#; + validate_data( + "otlp_traces", + &client, + &format!( + "select service_name from {};", + trace_services_table_name(TRACE_TABLE_NAME) + ), + expected, + ) + .await; + // select traces data let expected = r#"[[1736480942444376000,1736480942444499000,123000,"telemetrygen","c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23","","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-server"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1736480942444376000,1736480942444499000,123000,"telemetrygen","c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","d24f921c75f68e23","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-client"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1736480942444589000,1736480942444712000,123000,"telemetrygen","cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179","","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-server"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1736480942444589000,1736480942444712000,123000,"telemetrygen","cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","eba7be77e3558179","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-client"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}]]"#; validate_data( @@ -2428,6 +2441,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) { {"resourceSpans":[{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"telemetrygen"}}],"droppedAttributesCount":0},"scopeSpans":[{"scope":{"name":"telemetrygen","version":"","attributes":[],"droppedAttributesCount":0},"spans":[{"traceId":"c05d7a4ec8e1f231f02ed6e8da8655b4","spanId":"9630f2916e2f7909","traceState":"","parentSpanId":"d24f921c75f68e23","flags":256,"name":"okey-dokey-0","kind":2,"startTimeUnixNano":"1736480942444376000","endTimeUnixNano":"1736480942444499000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-client"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"c05d7a4ec8e1f231f02ed6e8da8655b4","spanId":"d24f921c75f68e23","traceState":"","parentSpanId":"","flags":256,"name":"lets-go","kind":3,"startTimeUnixNano":"1736480942444376000","endTimeUnixNano":"1736480942444499000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-server"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"cc9e0991a2e63d274984bd44ee669203","spanId":"8f847259b0f6e1ab","traceState":"","parentSpanId":"eba7be77e3558179","flags":256,"name":"okey-dokey-0","kind":2,"startTimeUnixNano":"1736480942444589000","endTimeUnixNano":"1736480942444712000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-client"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"cc9e0991a2e63d274984bd44ee669203","spanId":"eba7be77e3558179","traceState":"","parentSpanId":"","flags":256,"name":"lets-go","kind":3,"startTimeUnixNano":"1736480942444589000","endTimeUnixNano":"1736480942444712000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-server"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}}],"schemaUrl":""}],"schemaUrl":"https://opentelemetry.io/schemas/1.4.0"}]} "#; + let trace_table_name = "mytable"; let req: ExportTraceServiceRequest = serde_json::from_str(content).unwrap(); let body = req.encode_to_vec(); @@ -2448,7 +2462,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) { ), ( HeaderName::from_static("x-greptime-trace-table-name"), - HeaderValue::from_static("mytable"), + HeaderValue::from_static(trace_table_name), ), ], "/v1/otlp/v1/traces", @@ -2458,6 +2472,18 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) { .await; assert_eq!(StatusCode::OK, res.status()); + let expected = r#"[["telemetrygen"]]"#; + validate_data( + "otlp_traces", + &client, + &format!( + "select service_name from {};", + trace_services_table_name(trace_table_name) + ), + expected, + ) + .await; + // select traces data let expected = r#"[[1736480942444376000,1736480942444499000,123000,null,"c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444376000,1736480942444499000,123000,"d24f921c75f68e23","c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444589000,1736480942444712000,123000,null,"cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444589000,1736480942444712000,123000,"eba7be77e3558179","cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]]]"#; validate_data("otlp_traces", &client, "select * from mytable;", expected).await; @@ -2471,6 +2497,18 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) { ) .await; + let expected_ddl = r#"[["mytable_services","CREATE TABLE IF NOT EXISTS \"mytable_services\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"service_name\" STRING NULL,\n TIME INDEX (\"timestamp\")\n)\n\nENGINE=mito\nWITH(\n append_mode = 'true'\n)"]]"#; + validate_data( + "otlp_traces", + &client, + &format!( + "show create table {};", + trace_services_table_name(trace_table_name) + ), + expected_ddl, + ) + .await; + // drop table let res = client.get("/v1/sql?sql=drop table mytable;").send().await; assert_eq!(res.status(), StatusCode::OK); @@ -2489,7 +2527,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) { ), ( HeaderName::from_static("x-greptime-trace-table-name"), - HeaderValue::from_static("mytable"), + HeaderValue::from_static(trace_table_name), ), ], "/v1/otlp/v1/traces", @@ -3361,11 +3399,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { let client = TestClient::new(app).await; // Test empty response for `/api/services` API before writing any traces. - let res = client - .get("/v1/jaeger/api/services") - .header("x-greptime-trace-table-name", "mytable") - .send() - .await; + let res = client.get("/v1/jaeger/api/services").send().await; assert_eq!(StatusCode::OK, res.status()); let expected = r#" { @@ -3526,6 +3560,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { } let body = req.encode_to_vec(); + let trace_table_name = "mytable"; // write traces data. let res = send_req( &client, @@ -3540,7 +3575,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { ), ( HeaderName::from_static("x-greptime-trace-table-name"), - HeaderValue::from_static("mytable"), + HeaderValue::from_static(trace_table_name), ), ], "/v1/otlp/v1/traces", @@ -3553,7 +3588,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { // Test `/api/services` API. let res = client .get("/v1/jaeger/api/services") - .header("x-greptime-trace-table-name", "mytable") + .header("x-greptime-trace-table-name", trace_table_name) .send() .await; assert_eq!(StatusCode::OK, res.status()); @@ -3575,7 +3610,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { // Test `/api/operations` API. let res = client .get("/v1/jaeger/api/operations?service=test-jaeger-query-api") - .header("x-greptime-trace-table-name", "mytable") + .header("x-greptime-trace-table-name", trace_table_name) .header(JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER, "3 days") .send() .await; @@ -3601,7 +3636,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { // Test `/api/services/{service_name}/operations` API. let res = client .get("/v1/jaeger/api/services/test-jaeger-query-api/operations?start=1738726754492421&end=1738726754642422") - .header("x-greptime-trace-table-name", "mytable") + .header("x-greptime-trace-table-name", trace_table_name) .send() .await; assert_eq!(StatusCode::OK, res.status()); @@ -3624,7 +3659,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { // Test `/api/traces/{trace_id}` API. let res = client .get("/v1/jaeger/api/traces/5611dce1bc9ebed65352d99a027b08ea") - .header("x-greptime-trace-table-name", "mytable") + .header("x-greptime-trace-table-name", trace_table_name) .send() .await; assert_eq!(StatusCode::OK, res.status()); @@ -3740,7 +3775,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { // Test `/api/traces` API. let res = client .get("/v1/jaeger/api/traces?service=test-jaeger-query-api&operation=access-mysql&start=1738726754492421&end=1738726754642422&tags=%7B%22operation.type%22%3A%22access-mysql%22%7D") - .header("x-greptime-trace-table-name", "mytable") + .header("x-greptime-trace-table-name", trace_table_name) .send() .await; assert_eq!(StatusCode::OK, res.status());