diff --git a/Cargo.lock b/Cargo.lock index 2920b50303..bdb317b202 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4336,6 +4336,7 @@ dependencies = [ "common-test-util", "common-time", "common-version", + "datafusion", "datafusion-expr", "datanode", "datatypes", diff --git a/config/config.md b/config/config.md index 5f348e0a6d..1831a2f644 100644 --- a/config/config.md +++ b/config/config.md @@ -60,6 +60,8 @@ | `opentsdb.enable` | Bool | `true` | Whether to enable OpenTSDB put in HTTP API. | | `influxdb` | -- | -- | InfluxDB protocol options. | | `influxdb.enable` | Bool | `true` | Whether to enable InfluxDB protocol in HTTP API. | +| `jaeger` | -- | -- | Jaeger protocol options. | +| `jaeger.enable` | Bool | `true` | Whether to enable Jaeger protocol in HTTP API. | | `prom_store` | -- | -- | Prometheus remote storage options | | `prom_store.enable` | Bool | `true` | Whether to enable Prometheus remote write and read in HTTP API. | | `prom_store.with_metric_engine` | Bool | `true` | Whether to store the data from Prometheus remote write in metric engine. | @@ -256,6 +258,8 @@ | `opentsdb.enable` | Bool | `true` | Whether to enable OpenTSDB put in HTTP API. | | `influxdb` | -- | -- | InfluxDB protocol options. | | `influxdb.enable` | Bool | `true` | Whether to enable InfluxDB protocol in HTTP API. | +| `jaeger` | -- | -- | Jaeger protocol options. | +| `jaeger.enable` | Bool | `true` | Whether to enable Jaeger protocol in HTTP API. | | `prom_store` | -- | -- | Prometheus remote storage options | | `prom_store.enable` | Bool | `true` | Whether to enable Prometheus remote write and read in HTTP API. | | `prom_store.with_metric_engine` | Bool | `true` | Whether to store the data from Prometheus remote write in metric engine. | diff --git a/config/frontend.example.toml b/config/frontend.example.toml index addea0454a..c2e2f4a208 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -138,6 +138,11 @@ enable = true ## Whether to enable InfluxDB protocol in HTTP API. enable = true +## Jaeger protocol options. +[jaeger] +## Whether to enable Jaeger protocol in HTTP API. +enable = true + ## Prometheus remote storage options [prom_store] ## Whether to enable Prometheus remote write and read in HTTP API. diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 005ff282f6..bea6984a65 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -142,6 +142,11 @@ enable = true ## Whether to enable InfluxDB protocol in HTTP API. enable = true +## Jaeger protocol options. +[jaeger] +## Whether to enable Jaeger protocol in HTTP API. +enable = true + ## Prometheus remote storage options [prom_store] ## Whether to enable Prometheus remote write and read in HTTP API. diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 5ef998e082..8faca3e78b 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -60,7 +60,8 @@ use frontend::instance::builder::FrontendBuilder; use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager}; use frontend::server::Services; use frontend::service_config::{ - InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions, + InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, + PromStoreOptions, }; use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ}; use mito2::config::MitoConfig; @@ -140,6 +141,7 @@ pub struct StandaloneOptions { pub postgres: PostgresOptions, pub opentsdb: OpentsdbOptions, pub influxdb: InfluxdbOptions, + pub jaeger: JaegerOptions, pub prom_store: PromStoreOptions, pub wal: DatanodeWalConfig, pub storage: StorageConfig, @@ -169,6 +171,7 @@ impl Default for StandaloneOptions { postgres: PostgresOptions::default(), opentsdb: OpentsdbOptions::default(), influxdb: InfluxdbOptions::default(), + jaeger: JaegerOptions::default(), prom_store: PromStoreOptions::default(), wal: DatanodeWalConfig::default(), storage: StorageConfig::default(), @@ -217,6 +220,7 @@ impl StandaloneOptions { postgres: cloned_opts.postgres, opentsdb: cloned_opts.opentsdb, influxdb: cloned_opts.influxdb, + jaeger: cloned_opts.jaeger, prom_store: cloned_opts.prom_store, meta_client: None, logging: cloned_opts.logging, diff --git a/src/common/function/src/scalars/json.rs b/src/common/function/src/scalars/json.rs index 2c420c1661..9cde42dcdb 100644 --- a/src/common/function/src/scalars/json.rs +++ b/src/common/function/src/scalars/json.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::sync::Arc; -mod json_get; +pub mod json_get; mod json_is; mod json_path_exists; mod json_path_match; diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index d0ce4a9d01..ab933cb24b 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -35,6 +35,7 @@ common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true common-version.workspace = true +datafusion.workspace = true datafusion-expr.workspace = true datanode.workspace = true datatypes.workspace = true @@ -52,6 +53,7 @@ promql-parser.workspace = true prost.workspace = true query.workspace = true serde.workspace = true +serde_json.workspace = true servers.workspace = true session.workspace = true snafu.workspace = true diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index a424c1c809..f97ac4681d 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -24,7 +24,8 @@ use servers::heartbeat_options::HeartbeatOptions; use servers::http::HttpOptions; use crate::service_config::{ - InfluxdbOptions, MysqlOptions, OpentsdbOptions, OtlpOptions, PostgresOptions, PromStoreOptions, + InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, OtlpOptions, PostgresOptions, + PromStoreOptions, }; #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] @@ -40,6 +41,7 @@ pub struct FrontendOptions { pub opentsdb: OpentsdbOptions, pub influxdb: InfluxdbOptions, pub prom_store: PromStoreOptions, + pub jaeger: JaegerOptions, pub otlp: OtlpOptions, pub meta_client: Option, pub logging: LoggingOptions, @@ -62,6 +64,7 @@ impl Default for FrontendOptions { postgres: PostgresOptions::default(), opentsdb: OpentsdbOptions::default(), influxdb: InfluxdbOptions::default(), + jaeger: JaegerOptions::default(), prom_store: PromStoreOptions::default(), otlp: OtlpOptions::default(), meta_client: None, diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 1654544a48..eb3e1d997b 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -15,6 +15,7 @@ pub mod builder; mod grpc; mod influxdb; +mod jaeger; mod log_handler; mod logs; mod opentsdb; @@ -65,7 +66,7 @@ use servers::prometheus_handler::PrometheusHandler; use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::{ - InfluxdbLineProtocolHandler, LogQueryHandler, OpenTelemetryProtocolHandler, + InfluxdbLineProtocolHandler, JaegerQueryHandler, LogQueryHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler, PipelineHandler, PromStoreProtocolHandler, }; use servers::server::ServerHandlers; @@ -100,6 +101,7 @@ pub trait FrontendInstance: + PrometheusHandler + PipelineHandler + LogQueryHandler + + JaegerQueryHandler + Send + Sync + 'static @@ -167,6 +169,10 @@ impl Instance { &self.catalog_manager } + pub fn query_engine(&self) -> &QueryEngineRef { + &self.query_engine + } + pub fn plugins(&self) -> Plugins { self.plugins.clone() } diff --git a/src/frontend/src/instance/jaeger.rs b/src/frontend/src/instance/jaeger.rs new file mode 100644 index 0000000000..b82cf65e2d --- /dev/null +++ b/src/frontend/src/instance/jaeger.rs @@ -0,0 +1,337 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use catalog::CatalogManagerRef; +use common_function::function::{Function, FunctionRef}; +use common_function::scalars::json::json_get::{ + JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString, +}; +use common_function::scalars::udf::create_udf; +use common_function::state::FunctionState; +use common_query::Output; +use common_recordbatch::adapter::RecordBatchStreamAdapter; +use datafusion::dataframe::DataFrame; +use datafusion::execution::context::SessionContext; +use datafusion::execution::SessionStateBuilder; +use datafusion_expr::{col, lit, lit_timestamp_nano, Expr}; +use query::QueryEngineRef; +use serde_json::Value as JsonValue; +use servers::error::{ + CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult, + TableNotFoundSnafu, +}; +use servers::http::jaeger::QueryTraceParams; +use servers::otlp::trace::{ + DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_ID_COLUMN, + SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, + TRACE_TABLE_NAME, +}; +use servers::query_handler::JaegerQueryHandler; +use session::context::QueryContextRef; +use snafu::{OptionExt, ResultExt}; +use table::table::adapter::DfTableProviderAdapter; + +use super::Instance; + +const DEFAULT_LIMIT: usize = 100; + +#[async_trait] +impl JaegerQueryHandler for Instance { + async fn get_services(&self, ctx: QueryContextRef) -> ServerResult { + // It's equivalent to `SELECT DISTINCT(service_name) FROM {db}.{trace_table}`. + Ok(query_trace_table( + ctx, + self.catalog_manager(), + self.query_engine(), + vec![col(SERVICE_NAME_COLUMN)], + vec![], + Some(DEFAULT_LIMIT), + None, + true, + ) + .await?) + } + + async fn get_operations( + &self, + ctx: QueryContextRef, + service_name: &str, + span_kind: Option<&str>, + ) -> ServerResult { + let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))]; + + if let Some(span_kind) = span_kind { + filters.push(col(SPAN_KIND_COLUMN).eq(lit(format!( + "{}{}", + SPAN_KIND_PREFIX, + span_kind.to_uppercase() + )))); + } + + // It's equivalent to `SELECT span_name, span_kind FROM {db}.{trace_table} WHERE service_name = '{service_name}'`. + Ok(query_trace_table( + ctx, + self.catalog_manager(), + self.query_engine(), + vec![ + col(SPAN_NAME_COLUMN), + col(SPAN_KIND_COLUMN), + col(SERVICE_NAME_COLUMN), + ], + filters, + Some(DEFAULT_LIMIT), + None, + false, + ) + .await?) + } + + async fn get_trace(&self, ctx: QueryContextRef, trace_id: &str) -> ServerResult { + // It's equivalent to `SELECT trace_id, timestamp, duration_nano, service_name, span_name, span_id, span_attributes FROM {db}.{trace_table} WHERE trace_id = '{trace_id}'`. + let selects = vec![ + col(TRACE_ID_COLUMN), + col(TIMESTAMP_COLUMN), + col(DURATION_NANO_COLUMN), + col(SERVICE_NAME_COLUMN), + col(SPAN_NAME_COLUMN), + col(SPAN_ID_COLUMN), + col(SPAN_ATTRIBUTES_COLUMN), + ]; + + let filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))]; + + Ok(query_trace_table( + ctx, + self.catalog_manager(), + self.query_engine(), + selects, + filters, + Some(DEFAULT_LIMIT), + None, + false, + ) + .await?) + } + + async fn find_traces( + &self, + ctx: QueryContextRef, + query_params: QueryTraceParams, + ) -> ServerResult { + let selects = vec![ + col(TRACE_ID_COLUMN), + col(TIMESTAMP_COLUMN), + col(DURATION_NANO_COLUMN), + col(SERVICE_NAME_COLUMN), + col(SPAN_NAME_COLUMN), + col(SPAN_ID_COLUMN), + col(SPAN_ATTRIBUTES_COLUMN), + ]; + + let mut filters = vec![]; + + if let Some(operation_name) = query_params.operation_name { + filters.push(col(SPAN_NAME_COLUMN).eq(lit(operation_name))); + } + + if let Some(start_time) = query_params.start_time { + filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time))); + } + + if let Some(end_time) = query_params.end_time { + filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time))); + } + + if let Some(min_duration) = query_params.min_duration { + filters.push(col(DURATION_NANO_COLUMN).gt_eq(lit(min_duration))); + } + + if let Some(max_duration) = query_params.max_duration { + filters.push(col(DURATION_NANO_COLUMN).lt_eq(lit(max_duration))); + } + + Ok(query_trace_table( + ctx, + self.catalog_manager(), + self.query_engine(), + selects, + filters, + Some(DEFAULT_LIMIT), + query_params.tags, + false, + ) + .await?) + } +} + +#[allow(clippy::too_many_arguments)] +async fn query_trace_table( + ctx: QueryContextRef, + catalog_manager: &CatalogManagerRef, + query_engine: &QueryEngineRef, + selects: Vec, + filters: Vec, + limit: Option, + tags: Option>, + distinct: bool, +) -> ServerResult { + let db = ctx.get_db_string(); + let table = catalog_manager + .table(ctx.current_catalog(), &db, TRACE_TABLE_NAME, Some(&ctx)) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table: TRACE_TABLE_NAME, + catalog: ctx.current_catalog(), + schema: db, + })?; + + let df_context = create_df_context(query_engine, ctx.clone())?; + + let dataframe = df_context + .read_table(Arc::new(DfTableProviderAdapter::new(table))) + .context(DataFusionSnafu)?; + + let dataframe = dataframe.select(selects).context(DataFusionSnafu)?; + + // Apply all filters. + let dataframe = filters + .into_iter() + .chain(tags.map_or(Ok(vec![]), |t| tags_filters(&dataframe, t))?) + .try_fold(dataframe, |df, expr| { + df.filter(expr).context(DataFusionSnafu) + })?; + + // Apply the distinct if needed. + let dataframe = if distinct { + dataframe.distinct().context(DataFusionSnafu)? + } else { + dataframe + }; + + // Apply the limit if needed. + let dataframe = if let Some(limit) = limit { + dataframe.limit(0, Some(limit)).context(DataFusionSnafu)? + } else { + dataframe + }; + + // Execute the query and collect the result. + let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?; + + let output = Output::new_with_stream(Box::pin( + RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?, + )); + + Ok(output) +} + +// The current implementation registers UDFs during the planning stage, which makes it difficult +// to utilize them through DataFrame APIs. To address this limitation, we create a new session +// context and register the required UDFs, allowing them to be decoupled from the global context. +// TODO(zyy17): Is it possible or necessary to reuse the existing session context? +fn create_df_context( + query_engine: &QueryEngineRef, + ctx: QueryContextRef, +) -> ServerResult { + let df_context = SessionContext::new_with_state( + SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(), + ); + + // The following JSON UDFs will be used for tags filters. + let udfs: Vec = vec![ + Arc::new(JsonGetInt), + Arc::new(JsonGetFloat), + Arc::new(JsonGetBool), + Arc::new(JsonGetString), + ]; + + for udf in udfs { + df_context + .register_udf(create_udf(udf, ctx.clone(), Arc::new(FunctionState::default())).into()); + } + + Ok(df_context) +} + +fn tags_filters( + dataframe: &DataFrame, + tags: HashMap, +) -> ServerResult> { + let mut filters = vec![]; + + // NOTE: The key of the tags may contain `.`, for example: `http.status_code`, so we need to use `["http.status_code"]` in json path to access the value. + for (key, value) in tags.iter() { + if let JsonValue::String(value) = value { + filters.push( + dataframe + .registry() + .udf(JsonGetString {}.name()) + .context(DataFusionSnafu)? + .call(vec![ + col(SPAN_ATTRIBUTES_COLUMN), + lit(format!("[\"{}\"]", key)), + ]) + .eq(lit(value)), + ); + } + if let JsonValue::Number(value) = value { + if value.is_i64() { + filters.push( + dataframe + .registry() + .udf(JsonGetInt {}.name()) + .context(DataFusionSnafu)? + .call(vec![ + col(SPAN_ATTRIBUTES_COLUMN), + lit(format!("[\"{}\"]", key)), + ]) + .eq(lit(value.as_i64().unwrap())), + ); + } + if value.is_f64() { + filters.push( + dataframe + .registry() + .udf(JsonGetFloat {}.name()) + .context(DataFusionSnafu)? + .call(vec![ + col(SPAN_ATTRIBUTES_COLUMN), + lit(format!("[\"{}\"]", key)), + ]) + .eq(lit(value.as_f64().unwrap())), + ); + } + } + if let JsonValue::Bool(value) = value { + filters.push( + dataframe + .registry() + .udf(JsonGetBool {}.name()) + .context(DataFusionSnafu)? + .call(vec![ + col(SPAN_ATTRIBUTES_COLUMN), + lit(format!("[\"{}\"]", key)), + ]) + .eq(lit(*value)), + ); + } + } + + Ok(filters) +} diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index cb3284c9f8..1ca3d40e9a 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -112,6 +112,11 @@ where if opts.otlp.enable { builder = builder.with_otlp_handler(self.instance.clone()); } + + if opts.jaeger.enable { + builder = builder.with_jaeger_handler(self.instance.clone()); + } + builder } diff --git a/src/frontend/src/service_config.rs b/src/frontend/src/service_config.rs index a3fa7ad180..cd4c0dabe9 100644 --- a/src/frontend/src/service_config.rs +++ b/src/frontend/src/service_config.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod influxdb; +pub mod jaeger; pub mod mysql; pub mod opentsdb; pub mod otlp; @@ -20,6 +21,7 @@ pub mod postgres; pub mod prom_store; pub use influxdb::InfluxdbOptions; +pub use jaeger::JaegerOptions; pub use mysql::MysqlOptions; pub use opentsdb::OpentsdbOptions; pub use otlp::OtlpOptions; diff --git a/src/frontend/src/service_config/jaeger.rs b/src/frontend/src/service_config/jaeger.rs new file mode 100644 index 0000000000..47c8725be6 --- /dev/null +++ b/src/frontend/src/service_config/jaeger.rs @@ -0,0 +1,39 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use serde::{Deserialize, Serialize}; + +/// Options for Jaeger query APIs. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct JaegerOptions { + /// Whether to enable Jaeger query APIs. + pub enable: bool, +} + +impl Default for JaegerOptions { + fn default() -> Self { + Self { enable: true } + } +} + +#[cfg(test)] +mod tests { + use super::JaegerOptions; + + #[test] + fn test_jaeger_options() { + let default = JaegerOptions::default(); + assert!(default.enable); + } +} diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 87674866d2..0eb31f31b5 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -292,7 +292,7 @@ impl QueryEngineState { .unwrap_or(false) } - pub(crate) fn session_state(&self) -> SessionState { + pub fn session_state(&self) -> SessionState { self.df_context.state() } diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index adfe3ab841..7c5bc48909 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -25,6 +25,7 @@ use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use common_telemetry::{error, warn}; +use datafusion::error::DataFusionError; use datatypes::prelude::ConcreteDataType; use headers::ContentType; use http::header::InvalidHeaderValue; @@ -598,6 +599,21 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Invalid Jaeger query, reason: {}", reason))] + InvalidJaegerQuery { + reason: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("DataFusion error"))] + DataFusion { + #[snafu(source)] + error: DataFusionError, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -668,7 +684,8 @@ impl ErrorExt for Error { | InvalidTableName { .. } | PrepareStatementNotFound { .. } | FailedToParseQuery { .. } - | InvalidElasticsearchInput { .. } => StatusCode::InvalidArguments, + | InvalidElasticsearchInput { .. } + | InvalidJaegerQuery { .. } => StatusCode::InvalidArguments, Catalog { source, .. } => source.status_code(), RowWriter { source, .. } => source.status_code(), @@ -711,7 +728,7 @@ impl ErrorExt for Error { ConvertScalarValue { source, .. } => source.status_code(), - ToJson { .. } => StatusCode::Internal, + ToJson { .. } | DataFusion { .. } => StatusCode::Internal, ConvertSqlValue { source, .. } => source.status_code(), diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 67e3da1817..02652ea1e7 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -72,8 +72,9 @@ use crate::metrics_handler::MetricsHandler; use crate::prometheus_handler::PrometheusHandlerRef; use crate::query_handler::sql::ServerSqlQueryHandlerRef; use crate::query_handler::{ - InfluxdbLineProtocolHandlerRef, LogQueryHandlerRef, OpenTelemetryProtocolHandlerRef, - OpentsdbProtocolHandlerRef, PipelineHandlerRef, PromStoreProtocolHandlerRef, + InfluxdbLineProtocolHandlerRef, JaegerQueryHandlerRef, LogQueryHandlerRef, + OpenTelemetryProtocolHandlerRef, OpentsdbProtocolHandlerRef, PipelineHandlerRef, + PromStoreProtocolHandlerRef, }; use crate::server::Server; @@ -86,6 +87,7 @@ mod extractor; pub mod handler; pub mod header; pub mod influxdb; +pub mod jaeger; pub mod logs; pub mod loki; pub mod mem_prof; @@ -652,6 +654,16 @@ impl HttpServerBuilder { } } + pub fn with_jaeger_handler(self, handler: JaegerQueryHandlerRef) -> Self { + Self { + router: self.router.nest( + &format!("/{HTTP_API_VERSION}/jaeger"), + HttpServer::route_jaeger(handler), + ), + ..self + } + } + pub fn with_extra_router(self, router: Router) -> Self { Self { router: self.router.merge(router), @@ -1054,6 +1066,25 @@ impl HttpServer { .route("/config", routing::get(handler::config)) .with_state(state) } + + fn route_jaeger(handler: JaegerQueryHandlerRef) -> Router { + Router::new() + .route("/api/services", routing::get(jaeger::handle_get_services)) + .route( + "/api/services/{service_name}/operations", + routing::get(jaeger::handle_get_operations_by_service), + ) + .route( + "/api/operations", + routing::get(jaeger::handle_get_operations), + ) + .route("/api/traces", routing::get(jaeger::handle_find_traces)) + .route( + "/api/traces/{trace_id}", + routing::get(jaeger::handle_get_trace), + ) + .with_state(handler) + } } pub const HTTP_SERVER: &str = "HTTP_SERVER"; diff --git a/src/servers/src/http/jaeger.rs b/src/servers/src/http/jaeger.rs new file mode 100644 index 0000000000..e4bc29d95f --- /dev/null +++ b/src/servers/src/http/jaeger.rs @@ -0,0 +1,1278 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use axum::extract::{Path, Query, State}; +use axum::http::StatusCode; +use axum::response::IntoResponse; +use axum::Extension; +use common_error::ext::ErrorExt; +use common_query::{Output, OutputData}; +use common_recordbatch::util; +use common_telemetry::{debug, error, tracing, warn}; +use serde::{Deserialize, Serialize}; +use serde_json::Value as JsonValue; +use session::context::{Channel, QueryContext}; +use snafu::{OptionExt, ResultExt}; + +use crate::error::{ + status_code_to_http_status, CollectRecordbatchSnafu, Error, InvalidJaegerQuerySnafu, Result, +}; +use crate::http::HttpRecordsOutput; +use crate::metrics::METRIC_JAEGER_QUERY_ELAPSED; +use crate::otlp::trace::{ + DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_ID_COLUMN, + SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, +}; +use crate::query_handler::JaegerQueryHandlerRef; + +/// JaegerAPIResponse is the response of Jaeger HTTP API. +/// The original version is `structuredResponse` which is defined in https://github.com/jaegertracing/jaeger/blob/main/cmd/query/app/http_handler.go. +#[derive(Default, Debug, Serialize, Deserialize, PartialEq)] +pub struct JaegerAPIResponse { + pub data: Option, + pub total: usize, + pub limit: usize, + pub offset: usize, + pub errors: Vec, +} + +/// JaegerData is the query result of Jaeger HTTP API. +#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[serde(untagged)] +pub enum JaegerData { + ServiceNames(Vec), + OperationsNames(Vec), + Operations(Vec), + Traces(Vec), +} + +/// JaegerAPIError is the error of Jaeger HTTP API. +#[derive(Default, Debug, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct JaegerAPIError { + pub code: i32, + pub msg: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub trace_id: Option, +} + +/// Operation is an operation in a service. +#[derive(Debug, Default, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct Operation { + pub name: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub span_kind: Option, +} + +/// Trace is a collection of spans. +#[derive(Debug, Default, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct Trace { + #[serde(rename = "traceID")] + pub trace_id: String, + pub spans: Vec, + + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub processes: HashMap, + + #[serde(skip_serializing_if = "Vec::is_empty")] + pub warnings: Vec, +} + +/// Span is a single operation within a trace. +#[derive(Debug, Default, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct Span { + #[serde(rename = "traceID")] + pub trace_id: String, + + #[serde(rename = "spanID")] + pub span_id: String, + + #[serde(rename = "parentSpanID")] + #[serde(skip_serializing_if = "String::is_empty")] + pub parent_span_id: String, + + #[serde(skip_serializing_if = "Option::is_none")] + pub flags: Option, + + pub operation_name: String, + pub references: Vec, + pub start_time: u64, // microseconds since unix epoch + pub duration: u64, // microseconds + pub tags: Vec, + pub logs: Vec, + + #[serde(rename = "processID")] + #[serde(skip_serializing_if = "String::is_empty")] + pub process_id: String, + + #[serde(skip_serializing_if = "Option::is_none")] + pub process: Option, + + #[serde(skip_serializing_if = "Vec::is_empty")] + pub warnings: Vec, +} + +/// Reference is a reference from one span to another. +#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct Reference { + #[serde(rename = "traceID")] + pub trace_id: String, + #[serde(rename = "spanID")] + pub span_id: String, + pub ref_type: String, +} + +/// Process is the process emitting a set of spans. +#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct Process { + pub service_name: String, + pub tags: Vec, +} + +/// Log is a log emitted in a span. +#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct Log { + pub timestamp: i64, + pub fields: Vec, +} + +/// KeyValue is a key-value pair with typed value. +#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct KeyValue { + pub key: String, + #[serde(rename = "type")] + pub value_type: ValueType, + pub value: Value, +} + +/// Value is the value of a key-value pair in Jaeger Span attributes. +#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[serde(untagged)] +#[serde(rename_all = "camelCase")] +pub enum Value { + String(String), + Int64(i64), + Float64(f64), + Boolean(bool), + Binary(Vec), +} + +/// ValueType is the type of a value stored in KeyValue struct. +#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "lowercase")] +pub enum ValueType { + String, + Int64, + Float64, + Boolean, + Binary, +} + +/// JaegerQueryParams is the query parameters of Jaeger HTTP API. +#[derive(Default, Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct JaegerQueryParams { + /// Database that the trace data stored in. + pub db: Option, + + /// Service name of the trace. + #[serde(rename = "service")] + pub service_name: Option, + + /// Operation name of the trace. + #[serde(rename = "operation")] + pub operation_name: Option, + + /// Limit the return data. + pub limit: Option, + + /// Start time of the trace in microseconds since unix epoch. + pub start: Option, + + /// End time of the trace in microseconds since unix epoch. + pub end: Option, + + /// Max duration string value of the trace. Units can be `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`. + pub max_duration: Option, + + /// Min duration string value of the trace. Units can be `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`. + pub min_duration: Option, + + /// Tags of the trace in JSON format. It will be URL encoded in the raw query. + /// The decoded format is like: tags="{\"http.status_code\":\"200\",\"latency\":\"11.234\",\"error\":\"false\",\"http.method\":\"GET\",\"http.path\":\"/api/v1/users\"}". + /// The key and value of the map are both strings. The key and value is the attribute name and value of the span. The value will be converted to the corresponding type when querying. + pub tags: Option, + + /// The span kind of the trace. + pub span_kind: Option, +} + +impl QueryTraceParams { + fn from_jaeger_query_params(db: &str, query_params: JaegerQueryParams) -> Result { + let mut internal_query_params: QueryTraceParams = QueryTraceParams { + db: db.to_string(), + ..Default::default() + }; + + internal_query_params.service_name = + query_params.service_name.context(InvalidJaegerQuerySnafu { + reason: "service_name is required".to_string(), + })?; + + internal_query_params.operation_name = query_params.operation_name; + + // Convert start time from microseconds to nanoseconds. + internal_query_params.start_time = query_params.start.map(|start| start * 1000); + + // Convert end time from microseconds to nanoseconds. + internal_query_params.end_time = query_params.end.map(|end| end * 1000); + + if let Some(max_duration) = query_params.max_duration { + let duration = humantime::parse_duration(&max_duration).map_err(|e| { + InvalidJaegerQuerySnafu { + reason: format!("parse maxDuration '{}' failed: {}", max_duration, e), + } + .build() + })?; + internal_query_params.max_duration = Some(duration.as_nanos() as u64); + } + + if let Some(min_duration) = query_params.min_duration { + let duration = humantime::parse_duration(&min_duration).map_err(|e| { + InvalidJaegerQuerySnafu { + reason: format!("parse minDuration '{}' failed: {}", min_duration, e), + } + .build() + })?; + internal_query_params.min_duration = Some(duration.as_nanos() as u64); + } + + if let Some(tags) = query_params.tags { + // Serialize the tags to a JSON map. + let mut tags_map: HashMap = + serde_json::from_str(&tags).map_err(|e| { + InvalidJaegerQuerySnafu { + reason: format!("parse tags '{}' failed: {}", tags, e), + } + .build() + })?; + for (_, v) in tags_map.iter_mut() { + if let Some(number) = convert_string_to_number(v) { + *v = number; + } + if let Some(boolean) = convert_string_to_boolean(v) { + *v = boolean; + } + } + internal_query_params.tags = Some(tags_map); + } + + internal_query_params.limit = query_params.limit; + + Ok(internal_query_params) + } +} + +#[derive(Debug, Default, PartialEq)] +pub struct QueryTraceParams { + pub db: String, + pub service_name: String, + pub operation_name: Option, + + // The limit of the number of traces to return. + pub limit: Option, + + // Select the traces with the given tags(span attributes). + pub tags: Option>, + + // The unit of the following time related parameters is nanoseconds. + pub start_time: Option, + pub end_time: Option, + pub min_duration: Option, + pub max_duration: Option, +} + +/// Handle the GET `/api/services` request. +#[axum_macros::debug_handler] +#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_services"))] +pub async fn handle_get_services( + State(handler): State, + Query(query_params): Query, + Extension(mut query_ctx): Extension, +) -> impl IntoResponse { + debug!( + "Received Jaeger '/api/services' request, query_params: {:?}, query_ctx: {:?}", + query_params, query_ctx + ); + query_ctx.set_channel(Channel::Jaeger); + let query_ctx = Arc::new(query_ctx); + let db = query_ctx.get_db_string(); + + // Record the query time histogram. + let _timer = METRIC_JAEGER_QUERY_ELAPSED + .with_label_values(&[&db, "/api/services"]) + .start_timer(); + + match handler.get_services(query_ctx).await { + Ok(output) => match covert_to_records(output).await { + Ok(Some(records)) => match services_from_records(records) { + Ok(services) => { + let services_num = services.len(); + ( + StatusCode::OK, + axum::Json(JaegerAPIResponse { + data: Some(JaegerData::ServiceNames(services)), + total: services_num, + ..Default::default() + }), + ) + } + Err(err) => { + error!("Failed to get services: {:?}", err); + error_response(err) + } + }, + Ok(None) => (StatusCode::OK, axum::Json(JaegerAPIResponse::default())), + Err(err) => { + error!("Failed to get services: {:?}", err); + error_response(err) + } + }, + Err(err) => error_response(err), + } +} + +/// Handle the GET `/api/traces/{trace_id}` request. +#[axum_macros::debug_handler] +#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_trace"))] +pub async fn handle_get_trace( + State(handler): State, + Path(trace_id): Path, + Query(query_params): Query, + Extension(mut query_ctx): Extension, +) -> impl IntoResponse { + debug!( + "Received Jaeger '/api/traces/{}' request, query_params: {:?}, query_ctx: {:?}", + trace_id, query_params, query_ctx + ); + query_ctx.set_channel(Channel::Jaeger); + let query_ctx = Arc::new(query_ctx); + let db = query_ctx.get_db_string(); + + // Record the query time histogram. + let _timer = METRIC_JAEGER_QUERY_ELAPSED + .with_label_values(&[&db, "/api/traces"]) + .start_timer(); + + match handler.get_trace(query_ctx, &trace_id).await { + Ok(output) => match covert_to_records(output).await { + Ok(Some(records)) => match traces_from_records(records) { + Ok(traces) => ( + StatusCode::OK, + axum::Json(JaegerAPIResponse { + data: Some(JaegerData::Traces(traces)), + ..Default::default() + }), + ), + Err(err) => { + error!("Failed to get trace '{}': {:?}", trace_id, err); + error_response(err) + } + }, + Ok(None) => (StatusCode::OK, axum::Json(JaegerAPIResponse::default())), + Err(err) => { + error!("Failed to get trace '{}': {:?}", trace_id, err); + error_response(err) + } + }, + Err(err) => error_response(err), + } +} + +/// Handle the GET `/api/traces` request. +#[axum_macros::debug_handler] +#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "find_traces"))] +pub async fn handle_find_traces( + State(handler): State, + Query(query_params): Query, + Extension(mut query_ctx): Extension, +) -> impl IntoResponse { + debug!( + "Received Jaeger '/api/traces' request, query_params: {:?}, query_ctx: {:?}", + query_params, query_ctx + ); + query_ctx.set_channel(Channel::Jaeger); + let query_ctx = Arc::new(query_ctx); + let db = query_ctx.get_db_string(); + + // Record the query time histogram. + let _timer = METRIC_JAEGER_QUERY_ELAPSED + .with_label_values(&[&db, "/api/traces"]) + .start_timer(); + + match QueryTraceParams::from_jaeger_query_params(&db, query_params) { + Ok(query_params) => { + let output = handler.find_traces(query_ctx, query_params).await; + match output { + Ok(output) => match covert_to_records(output).await { + Ok(Some(records)) => match traces_from_records(records) { + Ok(traces) => ( + StatusCode::OK, + axum::Json(JaegerAPIResponse { + data: Some(JaegerData::Traces(traces)), + ..Default::default() + }), + ), + Err(err) => { + error!("Failed to find traces: {:?}", err); + error_response(err) + } + }, + Ok(None) => (StatusCode::OK, axum::Json(JaegerAPIResponse::default())), + Err(err) => error_response(err), + }, + Err(err) => { + error!("Failed to find traces: {:?}", err); + error_response(err) + } + } + } + Err(e) => error_response(e), + } +} + +/// Handle the GET `/api/operations` request. +#[axum_macros::debug_handler] +#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_operations"))] +pub async fn handle_get_operations( + State(handler): State, + Query(query_params): Query, + Extension(mut query_ctx): Extension, +) -> impl IntoResponse { + debug!( + "Received Jaeger '/api/operations' request, query_params: {:?}, query_ctx: {:?}", + query_params, query_ctx + ); + if let Some(service_name) = query_params.service_name { + query_ctx.set_channel(Channel::Jaeger); + let query_ctx = Arc::new(query_ctx); + let db = query_ctx.get_db_string(); + + // Record the query time histogram. + let _timer = METRIC_JAEGER_QUERY_ELAPSED + .with_label_values(&[&db, "/api/operations"]) + .start_timer(); + + match handler + .get_operations(query_ctx, &service_name, query_params.span_kind.as_deref()) + .await + { + Ok(output) => match covert_to_records(output).await { + Ok(Some(records)) => match operations_from_records(records, true) { + Ok(operations) => { + let total = operations.len(); + ( + StatusCode::OK, + axum::Json(JaegerAPIResponse { + data: Some(JaegerData::Operations(operations)), + total, + ..Default::default() + }), + ) + } + Err(err) => { + error!("Failed to get operations: {:?}", err); + error_response(err) + } + }, + Ok(None) => (StatusCode::OK, axum::Json(JaegerAPIResponse::default())), + Err(err) => error_response(err), + }, + Err(err) => { + error!( + "Failed to get operations for service '{}': {:?}", + service_name, err + ); + error_response(err) + } + } + } else { + ( + StatusCode::BAD_REQUEST, + axum::Json(JaegerAPIResponse { + errors: vec![JaegerAPIError { + code: 400, + msg: "parameter 'service' is required".to_string(), + trace_id: None, + }], + ..Default::default() + }), + ) + } +} + +/// Handle the GET `/api/services/{service_name}/operations` request. +#[axum_macros::debug_handler] +#[tracing::instrument( + skip_all, + fields(protocol = "jaeger", request_type = "get_operations_by_service") +)] +pub async fn handle_get_operations_by_service( + State(handler): State, + Path(service_name): Path, + Query(query_params): Query, + Extension(mut query_ctx): Extension, +) -> impl IntoResponse { + debug!( + "Received Jaeger '/api/services/{}/operations' request, query_params: {:?}, query_ctx: {:?}", + service_name, query_params, query_ctx + ); + query_ctx.set_channel(Channel::Jaeger); + let query_ctx = Arc::new(query_ctx); + let db = query_ctx.get_db_string(); + + // Record the query time histogram. + let _timer = METRIC_JAEGER_QUERY_ELAPSED + .with_label_values(&[&db, "/api/services"]) + .start_timer(); + + match handler.get_operations(query_ctx, &service_name, None).await { + Ok(output) => match covert_to_records(output).await { + Ok(Some(records)) => match operations_from_records(records, false) { + Ok(operations) => { + let operations: Vec = + operations.into_iter().map(|op| op.name).collect(); + let total = operations.len(); + ( + StatusCode::OK, + axum::Json(JaegerAPIResponse { + data: Some(JaegerData::OperationsNames(operations)), + total, + ..Default::default() + }), + ) + } + Err(err) => { + error!( + "Failed to get operations for service '{}': {:?}", + service_name, err + ); + error_response(err) + } + }, + Ok(None) => (StatusCode::OK, axum::Json(JaegerAPIResponse::default())), + Err(err) => error_response(err), + }, + Err(err) => { + error!( + "Failed to get operations for service '{}': {:?}", + service_name, err + ); + error_response(err) + } + } +} + +async fn covert_to_records(output: Output) -> Result> { + match output.data { + OutputData::Stream(stream) => { + let records = HttpRecordsOutput::try_new( + stream.schema().clone(), + util::collect(stream) + .await + .context(CollectRecordbatchSnafu)?, + )?; + debug!("The query records: {:?}", records); + Ok(Some(records)) + } + // It's unlikely to happen. However, if the output is not a stream, return None. + _ => Ok(None), + } +} + +fn error_response(err: Error) -> (StatusCode, axum::Json) { + ( + status_code_to_http_status(&err.status_code()), + axum::Json(JaegerAPIResponse { + errors: vec![JaegerAPIError { + code: err.status_code() as i32, + msg: err.to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) +} +// Construct Jaeger traces from records. +fn traces_from_records(records: HttpRecordsOutput) -> Result> { + let expected_schema = vec![ + (TRACE_ID_COLUMN, "String"), + (TIMESTAMP_COLUMN, "TimestampNanosecond"), + (DURATION_NANO_COLUMN, "UInt64"), + (SERVICE_NAME_COLUMN, "String"), + (SPAN_NAME_COLUMN, "String"), + (SPAN_ID_COLUMN, "String"), + (SPAN_ATTRIBUTES_COLUMN, "Json"), + ]; + check_schema(&records, &expected_schema)?; + + // maintain the mapping: trace_id -> (process_id -> service_name). + let mut trace_id_to_processes: HashMap> = HashMap::new(); + // maintain the mapping: trace_id -> spans. + let mut trace_id_to_spans: HashMap> = HashMap::new(); + + for row in records.rows.into_iter() { + let mut span = Span::default(); + let mut row_iter = row.into_iter(); + + // Set trace id. + if let Some(JsonValue::String(trace_id)) = row_iter.next() { + span.trace_id = trace_id.clone(); + trace_id_to_processes.entry(trace_id).or_default(); + } + + // Convert timestamp from nanoseconds to microseconds. + if let Some(JsonValue::Number(timestamp)) = row_iter.next() { + span.start_time = timestamp.as_u64().ok_or_else(|| { + InvalidJaegerQuerySnafu { + reason: "Failed to convert timestamp to u64".to_string(), + } + .build() + })? / 1000; + } + + // Convert duration from nanoseconds to microseconds. + if let Some(JsonValue::Number(duration)) = row_iter.next() { + span.duration = duration.as_u64().ok_or_else(|| { + InvalidJaegerQuerySnafu { + reason: "Failed to convert duration to u64".to_string(), + } + .build() + })? / 1000; + } + + // Collect services to construct processes. + if let Some(JsonValue::String(service_name)) = row_iter.next() { + if let Some(process) = trace_id_to_processes.get_mut(&span.trace_id) { + if let Some(process_id) = process.get(&service_name) { + span.process_id = process_id.clone(); + } else { + // Allocate a new process id. + let process_id = format!("p{}", process.len() + 1); + process.insert(service_name, process_id.clone()); + span.process_id = process_id; + } + } + } + + // Set operation name. In Jaeger, the operation name is the span name. + if let Some(JsonValue::String(span_name)) = row_iter.next() { + span.operation_name = span_name; + } + + // Set span id. + if let Some(JsonValue::String(span_id)) = row_iter.next() { + span.span_id = span_id; + } + + // Convert span attributes to tags. + if let Some(JsonValue::Object(object)) = row_iter.next() { + let tags = object + .into_iter() + .filter_map(|(key, value)| match value { + JsonValue::String(value) => Some(KeyValue { + key, + value_type: ValueType::String, + value: Value::String(value.to_string()), + }), + JsonValue::Number(value) => Some(KeyValue { + key, + value_type: ValueType::Int64, + value: Value::Int64(value.as_i64().unwrap_or(0)), + }), + JsonValue::Bool(value) => Some(KeyValue { + key, + value_type: ValueType::Boolean, + value: Value::Boolean(value), + }), + // FIXME(zyy17): Do we need to support other types? + _ => { + warn!("Unsupported value type: {:?}", value); + None + } + }) + .collect(); + span.tags = tags; + } + + if let Some(spans) = trace_id_to_spans.get_mut(&span.trace_id) { + spans.push(span); + } else { + trace_id_to_spans.insert(span.trace_id.clone(), vec![span]); + } + } + + let mut traces = Vec::new(); + for (trace_id, spans) in trace_id_to_spans { + let mut trace = Trace { + trace_id, + spans, + ..Default::default() + }; + + if let Some(processes) = trace_id_to_processes.remove(&trace.trace_id) { + let mut process_id_to_process = HashMap::new(); + for (service_name, process_id) in processes.into_iter() { + process_id_to_process.insert( + process_id, + Process { + service_name, + tags: vec![], + }, + ); + } + trace.processes = process_id_to_process; + } + traces.push(trace); + } + + Ok(traces) +} + +fn services_from_records(records: HttpRecordsOutput) -> Result> { + let expected_schema = vec![(SERVICE_NAME_COLUMN, "String")]; + check_schema(&records, &expected_schema)?; + + let mut services = Vec::with_capacity(records.total_rows); + for row in records.rows.into_iter() { + for value in row.into_iter() { + if let JsonValue::String(service_name) = value { + services.push(service_name); + } + } + } + Ok(services) +} + +// Construct Jaeger operations from records. +fn operations_from_records( + records: HttpRecordsOutput, + contain_span_kind: bool, +) -> Result> { + let expected_schema = vec![ + (SPAN_NAME_COLUMN, "String"), + (SPAN_KIND_COLUMN, "String"), + (SERVICE_NAME_COLUMN, "String"), + ]; + check_schema(&records, &expected_schema)?; + + let mut operations = Vec::with_capacity(records.total_rows); + for row in records.rows.into_iter() { + let mut row_iter = row.into_iter(); + if let Some(JsonValue::String(operation)) = row_iter.next() { + let mut operation = Operation { + name: operation, + span_kind: None, + }; + if contain_span_kind { + if let Some(JsonValue::String(span_kind)) = row_iter.next() { + operation.span_kind = Some(normalize_span_kind(&span_kind)); + } + } else { + // skip span kind. + row_iter.next(); + } + operations.push(operation); + } + } + + Ok(operations) +} + +// Check whether the schema of the records is correct. +fn check_schema(records: &HttpRecordsOutput, expected_schema: &[(&str, &str)]) -> Result<()> { + for (i, column) in records.schema.column_schemas.iter().enumerate() { + if column.name != expected_schema[i].0 || column.data_type != expected_schema[i].1 { + InvalidJaegerQuerySnafu { + reason: "query result schema is not correct".to_string(), + } + .fail()? + } + } + Ok(()) +} + +// By default, the span kind is stored as `SPAN_KIND_` in GreptimeDB. +// However, in Jaeger API, the span kind is returned as `` which is the lowercase of the span kind and without the `SPAN_KIND_` prefix. +fn normalize_span_kind(span_kind: &str) -> String { + // If the span_kind starts with `SPAN_KIND_` prefix, remove it and convert to lowercase. + if let Some(stripped) = span_kind.strip_prefix(SPAN_KIND_PREFIX) { + stripped.to_lowercase() + } else { + // It's unlikely to happen. However, we still convert it to lowercase for consistency. + span_kind.to_lowercase() + } +} + +fn convert_string_to_number(input: &serde_json::Value) -> Option { + if let Some(data) = input.as_str() { + if let Ok(number) = data.parse::() { + return Some(serde_json::Value::Number(serde_json::Number::from(number))); + } + if let Ok(number) = data.parse::() { + if let Some(number) = serde_json::Number::from_f64(number) { + return Some(serde_json::Value::Number(number)); + } + } + } + + None +} + +fn convert_string_to_boolean(input: &serde_json::Value) -> Option { + if let Some(data) = input.as_str() { + if data == "true" { + return Some(serde_json::Value::Bool(true)); + } + if data == "false" { + return Some(serde_json::Value::Bool(false)); + } + } + + None +} + +#[cfg(test)] +mod tests { + use common_catalog::consts::DEFAULT_SCHEMA_NAME; + use serde_json::{json, Number, Value as JsonValue}; + + use super::*; + use crate::http::{ColumnSchema, HttpRecordsOutput, OutputSchema}; + + #[test] + fn test_services_from_records() { + // The tests is the tuple of `(test_records, expected)`. + let tests = vec![( + HttpRecordsOutput { + schema: OutputSchema { + column_schemas: vec![ColumnSchema { + name: "service_name".to_string(), + data_type: "String".to_string(), + }], + }, + rows: vec![ + vec![JsonValue::String("test-service-0".to_string())], + vec![JsonValue::String("test-service-1".to_string())], + ], + total_rows: 2, + metrics: HashMap::new(), + }, + vec!["test-service-0".to_string(), "test-service-1".to_string()], + )]; + + for (records, expected) in tests { + let services = services_from_records(records).unwrap(); + assert_eq!(services, expected); + } + } + + #[test] + fn test_operations_from_records() { + // The tests is the tuple of `(test_records, contain_span_kind, expected)`. + let tests = vec![ + ( + HttpRecordsOutput { + schema: OutputSchema { + column_schemas: vec![ + ColumnSchema { + name: "span_name".to_string(), + data_type: "String".to_string(), + }, + ColumnSchema { + name: "span_kind".to_string(), + data_type: "String".to_string(), + }, + ], + }, + rows: vec![ + vec![ + JsonValue::String("access-mysql".to_string()), + JsonValue::String("SPAN_KIND_SERVER".to_string()), + ], + vec![ + JsonValue::String("access-redis".to_string()), + JsonValue::String("SPAN_KIND_CLIENT".to_string()), + ], + ], + total_rows: 2, + metrics: HashMap::new(), + }, + false, + vec![ + Operation { + name: "access-mysql".to_string(), + span_kind: None, + }, + Operation { + name: "access-redis".to_string(), + span_kind: None, + }, + ], + ), + ( + HttpRecordsOutput { + schema: OutputSchema { + column_schemas: vec![ + ColumnSchema { + name: "span_name".to_string(), + data_type: "String".to_string(), + }, + ColumnSchema { + name: "span_kind".to_string(), + data_type: "String".to_string(), + }, + ], + }, + rows: vec![ + vec![ + JsonValue::String("access-mysql".to_string()), + JsonValue::String("SPAN_KIND_SERVER".to_string()), + ], + vec![ + JsonValue::String("access-redis".to_string()), + JsonValue::String("SPAN_KIND_CLIENT".to_string()), + ], + ], + total_rows: 2, + metrics: HashMap::new(), + }, + true, + vec![ + Operation { + name: "access-mysql".to_string(), + span_kind: Some("server".to_string()), + }, + Operation { + name: "access-redis".to_string(), + span_kind: Some("client".to_string()), + }, + ], + ), + ]; + + for (records, contain_span_kind, expected) in tests { + let operations = operations_from_records(records, contain_span_kind).unwrap(); + assert_eq!(operations, expected); + } + } + + #[test] + fn test_traces_from_records() { + // The tests is the tuple of `(test_records, expected)`. + let tests = vec![( + HttpRecordsOutput { + schema: OutputSchema { + column_schemas: vec![ + ColumnSchema { + name: "trace_id".to_string(), + data_type: "String".to_string(), + }, + ColumnSchema { + name: "timestamp".to_string(), + data_type: "TimestampNanosecond".to_string(), + }, + ColumnSchema { + name: "duration_nano".to_string(), + data_type: "UInt64".to_string(), + }, + ColumnSchema { + name: "service_name".to_string(), + data_type: "String".to_string(), + }, + ColumnSchema { + name: "span_name".to_string(), + data_type: "String".to_string(), + }, + ColumnSchema { + name: "span_id".to_string(), + data_type: "String".to_string(), + }, + ColumnSchema { + name: "span_attributes".to_string(), + data_type: "Json".to_string(), + }, + ], + }, + rows: vec![ + vec![ + JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()), + JsonValue::Number(Number::from_u128(1738726754492422000).unwrap()), + JsonValue::Number(Number::from_u128(100000000).unwrap()), + JsonValue::String("test-service-0".to_string()), + JsonValue::String("access-mysql".to_string()), + JsonValue::String("008421dbbd33a3e9".to_string()), + JsonValue::Object( + json!({ + "operation.type": "access-mysql", + }) + .as_object() + .unwrap() + .clone(), + ), + ], + vec![ + JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()), + JsonValue::Number(Number::from_u128(1738726754642422000).unwrap()), + JsonValue::Number(Number::from_u128(100000000).unwrap()), + JsonValue::String("test-service-0".to_string()), + JsonValue::String("access-redis".to_string()), + JsonValue::String("ffa03416a7b9ea48".to_string()), + JsonValue::Object( + json!({ + "operation.type": "access-redis", + }) + .as_object() + .unwrap() + .clone(), + ), + ], + ], + total_rows: 2, + metrics: HashMap::new(), + }, + vec![Trace { + trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(), + spans: vec![ + Span { + trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(), + span_id: "008421dbbd33a3e9".to_string(), + operation_name: "access-mysql".to_string(), + start_time: 1738726754492422, + duration: 100000, + tags: vec![KeyValue { + key: "operation.type".to_string(), + value_type: ValueType::String, + value: Value::String("access-mysql".to_string()), + }], + process_id: "p1".to_string(), + ..Default::default() + }, + Span { + trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(), + span_id: "ffa03416a7b9ea48".to_string(), + operation_name: "access-redis".to_string(), + start_time: 1738726754642422, + duration: 100000, + tags: vec![KeyValue { + key: "operation.type".to_string(), + value_type: ValueType::String, + value: Value::String("access-redis".to_string()), + }], + process_id: "p1".to_string(), + ..Default::default() + }, + ], + processes: HashMap::from([( + "p1".to_string(), + Process { + service_name: "test-service-0".to_string(), + tags: vec![], + }, + )]), + ..Default::default() + }], + )]; + + for (records, expected) in tests { + let traces = traces_from_records(records).unwrap(); + assert_eq!(traces, expected); + } + } + + #[test] + fn test_from_jaeger_query_params() { + // The tests is the tuple of `(test_query_params, expected)`. + let tests = vec![ + ( + JaegerQueryParams { + service_name: Some("test-service-0".to_string()), + ..Default::default() + }, + QueryTraceParams { + db: DEFAULT_SCHEMA_NAME.to_string(), + service_name: "test-service-0".to_string(), + ..Default::default() + }, + ), + ( + JaegerQueryParams { + service_name: Some("test-service-0".to_string()), + operation_name: Some("access-mysql".to_string()), + start: Some(1738726754492422), + end: Some(1738726754642422), + max_duration: Some("100ms".to_string()), + min_duration: Some("50ms".to_string()), + limit: Some(10), + tags: Some("{\"http.status_code\":\"200\",\"latency\":\"11.234\",\"error\":\"false\",\"http.method\":\"GET\",\"http.path\":\"/api/v1/users\"}".to_string()), + ..Default::default() + }, + QueryTraceParams { + db: DEFAULT_SCHEMA_NAME.to_string(), + service_name: "test-service-0".to_string(), + operation_name: Some("access-mysql".to_string()), + start_time: Some(1738726754492422000), + end_time: Some(1738726754642422000), + min_duration: Some(50000000), + max_duration: Some(100000000), + limit: Some(10), + tags: Some(HashMap::from([ + ("http.status_code".to_string(), JsonValue::Number(Number::from(200))), + ("latency".to_string(), JsonValue::Number(Number::from_f64(11.234).unwrap())), + ("error".to_string(), JsonValue::Bool(false)), + ("http.method".to_string(), JsonValue::String("GET".to_string())), + ("http.path".to_string(), JsonValue::String("/api/v1/users".to_string())), + ])), + }, + ), + ]; + + for (query_params, expected) in tests { + let query_params = + QueryTraceParams::from_jaeger_query_params(DEFAULT_SCHEMA_NAME, query_params) + .unwrap(); + assert_eq!(query_params, expected); + } + } + + #[test] + fn test_check_schema() { + // The tests is the tuple of `(test_records, expected_schema, is_ok)`. + let tests = vec![( + HttpRecordsOutput { + schema: OutputSchema { + column_schemas: vec![ + ColumnSchema { + name: "trace_id".to_string(), + data_type: "String".to_string(), + }, + ColumnSchema { + name: "timestamp".to_string(), + data_type: "TimestampNanosecond".to_string(), + }, + ColumnSchema { + name: "duration_nano".to_string(), + data_type: "UInt64".to_string(), + }, + ColumnSchema { + name: "service_name".to_string(), + data_type: "String".to_string(), + }, + ColumnSchema { + name: "span_name".to_string(), + data_type: "String".to_string(), + }, + ColumnSchema { + name: "span_id".to_string(), + data_type: "String".to_string(), + }, + ColumnSchema { + name: "span_attributes".to_string(), + data_type: "Json".to_string(), + }, + ], + }, + rows: vec![], + total_rows: 0, + metrics: HashMap::new(), + }, + vec![ + (TRACE_ID_COLUMN, "String"), + (TIMESTAMP_COLUMN, "TimestampNanosecond"), + (DURATION_NANO_COLUMN, "UInt64"), + (SERVICE_NAME_COLUMN, "String"), + (SPAN_NAME_COLUMN, "String"), + (SPAN_ID_COLUMN, "String"), + (SPAN_ATTRIBUTES_COLUMN, "Json"), + ], + true, + )]; + + for (records, expected_schema, is_ok) in tests { + let result = check_schema(&records, &expected_schema); + assert_eq!(result.is_ok(), is_ok); + } + } + + #[test] + fn test_normalize_span_kind() { + let tests = vec![ + ("SPAN_KIND_SERVER".to_string(), "server".to_string()), + ("SPAN_KIND_CLIENT".to_string(), "client".to_string()), + ]; + + for (input, expected) in tests { + let result = normalize_span_kind(&input); + assert_eq!(result, expected); + } + } + + #[test] + fn test_convert_string_to_number() { + let tests = vec![ + ( + JsonValue::String("123".to_string()), + Some(JsonValue::Number(Number::from(123))), + ), + ( + JsonValue::String("123.456".to_string()), + Some(JsonValue::Number(Number::from_f64(123.456).unwrap())), + ), + ]; + + for (input, expected) in tests { + let result = convert_string_to_number(&input); + assert_eq!(result, expected); + } + } + + #[test] + fn test_convert_string_to_boolean() { + let tests = vec![ + ( + JsonValue::String("true".to_string()), + Some(JsonValue::Bool(true)), + ), + ( + JsonValue::String("false".to_string()), + Some(JsonValue::Bool(false)), + ), + ]; + + for (input, expected) in tests { + let result = convert_string_to_boolean(&input); + assert_eq!(result, expected); + } + } +} diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs index 0489bb0d80..da465f5707 100644 --- a/src/servers/src/metrics.rs +++ b/src/servers/src/metrics.rs @@ -272,6 +272,12 @@ lazy_static! { vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0] ) .unwrap(); + pub static ref METRIC_JAEGER_QUERY_ELAPSED: HistogramVec = register_histogram_vec!( + "greptime_servers_jaeger_query_elapsed", + "servers jaeger query elapsed", + &[METRIC_DB_LABEL, METRIC_PATH_LABEL] + ) +.unwrap(); } // Based on https://github.com/hyperium/tonic/blob/master/examples/src/tower/server.rs diff --git a/src/servers/src/otlp/trace.rs b/src/servers/src/otlp/trace.rs index aeb31a3796..b1bff7344b 100644 --- a/src/servers/src/otlp/trace.rs +++ b/src/servers/src/otlp/trace.rs @@ -25,7 +25,20 @@ use crate::otlp::utils::{make_column_data, make_string_column_data}; use crate::row_writer::{self, MultiTableData, TableData}; const APPROXIMATE_COLUMN_COUNT: usize = 24; + pub const TRACE_TABLE_NAME: &str = "opentelemetry_traces"; +pub const SERVICE_NAME_COLUMN: &str = "service_name"; +pub const TRACE_ID_COLUMN: &str = "trace_id"; +pub const TIMESTAMP_COLUMN: &str = "timestamp"; +pub const DURATION_NANO_COLUMN: &str = "duration_nano"; +pub const SPAN_ID_COLUMN: &str = "span_id"; +pub const SPAN_NAME_COLUMN: &str = "span_name"; +pub const SPAN_KIND_COLUMN: &str = "span_kind"; +pub const SPAN_ATTRIBUTES_COLUMN: &str = "span_attributes"; + +/// The span kind prefix in the database. +/// If the span kind is `server`, it will be stored as `SPAN_KIND_SERVER` in the database. +pub const SPAN_KIND_PREFIX: &str = "SPAN_KIND_"; pub mod attributes; pub mod span; diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index dd41305626..b6ee77aa2b 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -46,16 +46,17 @@ use serde_json::Value; use session::context::{QueryContext, QueryContextRef}; use crate::error::Result; +use crate::http::jaeger::QueryTraceParams; use crate::influxdb::InfluxdbRequest; use crate::opentsdb::codec::DataPoint; use crate::prom_store::Metrics; - pub type OpentsdbProtocolHandlerRef = Arc; pub type InfluxdbLineProtocolHandlerRef = Arc; pub type PromStoreProtocolHandlerRef = Arc; pub type OpenTelemetryProtocolHandlerRef = Arc; pub type PipelineHandlerRef = Arc; pub type LogQueryHandlerRef = Arc; +pub type JaegerQueryHandlerRef = Arc; #[async_trait] pub trait InfluxdbLineProtocolHandler { @@ -170,3 +171,28 @@ pub trait PipelineHandler { pub trait LogQueryHandler { async fn query(&self, query: LogQuery, ctx: QueryContextRef) -> Result; } + +/// Handle Jaeger query requests. +#[async_trait] +pub trait JaegerQueryHandler { + /// Get trace services. It's used for `/api/services` API. + async fn get_services(&self, ctx: QueryContextRef) -> Result; + + /// Get Jaeger operations. It's used for `/api/operations` and `/api/services/{service_name}/operations` API. + async fn get_operations( + &self, + ctx: QueryContextRef, + service_name: &str, + span_kind: Option<&str>, + ) -> Result; + + /// Get trace by trace id. It's used for `/api/traces/{trace_id}` API. + async fn get_trace(&self, ctx: QueryContextRef, trace_id: &str) -> Result; + + /// Find traces by query params. It's used for `/api/traces` API. + async fn find_traces( + &self, + ctx: QueryContextRef, + query_params: QueryTraceParams, + ) -> Result; +} diff --git a/src/session/src/context.rs b/src/session/src/context.rs index 3544228afe..9893e25061 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -399,6 +399,7 @@ pub enum Channel { Opentsdb = 8, Loki = 9, Elasticsearch = 10, + Jaeger = 11, } impl From for Channel { @@ -414,6 +415,7 @@ impl From for Channel { 8 => Self::Opentsdb, 9 => Self::Loki, 10 => Self::Elasticsearch, + 11 => Self::Jaeger, _ => Self::Unknown, } } @@ -442,6 +444,7 @@ impl Display for Channel { Channel::Opentsdb => write!(f, "opentsdb"), Channel::Loki => write!(f, "loki"), Channel::Elasticsearch => write!(f, "elasticsearch"), + Channel::Jaeger => write!(f, "jaeger"), Channel::Unknown => write!(f, "unknown"), } } diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 2118bb6a28..bc7fa65cd6 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -431,6 +431,7 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider( .with_log_ingest_handler(instance.instance.clone(), None, None) .with_logs_handler(instance.instance.clone()) .with_otlp_handler(instance.instance.clone()) + .with_jaeger_handler(instance.instance.clone()) .with_greptime_config_options(instance.opts.to_toml().unwrap()); if let Some(user_provider) = user_provider { diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 4186729519..db8ee39e10 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -103,6 +103,7 @@ macro_rules! http_tests { test_elasticsearch_logs, test_elasticsearch_logs_with_index, test_log_query, + test_jaeger_query_api, ); )* }; @@ -949,6 +950,9 @@ enable = true [influxdb] enable = true +[jaeger] +enable = true + [prom_store] enable = true with_metric_engine = true @@ -2505,6 +2509,347 @@ pub async fn test_log_query(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_jaeger_query_api(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = + setup_test_http_app_with_frontend(store_type, "test_jaeger_query_api").await; + + let client = TestClient::new(app).await; + + let content = r#" + { + "resourceSpans": [ + { + "resource": { + "attributes": [ + { + "key": "service.name", + "value": { + "stringValue": "test-jaeger-query-api" + } + } + ] + }, + "scopeSpans": [ + { + "scope": { + "name": "test-jaeger-query-api", + "version": "1.0.0" + }, + "spans": [ + { + "traceId": "5611dce1bc9ebed65352d99a027b08ea", + "spanId": "008421dbbd33a3e9", + "name": "access-mysql", + "kind": 2, + "startTimeUnixNano": "1738726754492422000", + "endTimeUnixNano": "1738726754592422000", + "attributes": [ + { + "key": "operation.type", + "value": { + "stringValue": "access-mysql" + } + }, + { + "key": "net.peer.ip", + "value": { + "stringValue": "1.2.3.4" + } + }, + { + "key": "peer.service", + "value": { + "stringValue": "test-jaeger-query-api" + } + } + ], + "status": { + "message": "success", + "code": 0 + } + } + ] + }, + { + "scope": { + "name": "test-jaeger-query-api", + "version": "1.0.0" + }, + "spans": [ + { + "traceId": "5611dce1bc9ebed65352d99a027b08ea", + "spanId": "ffa03416a7b9ea48", + "name": "access-redis", + "kind": 2, + "startTimeUnixNano": "1738726754492422000", + "endTimeUnixNano": "1738726754592422000", + "attributes": [ + { + "key": "operation.type", + "value": { + "stringValue": "access-redis" + } + }, + { + "key": "net.peer.ip", + "value": { + "stringValue": "1.2.3.4" + } + }, + { + "key": "peer.service", + "value": { + "stringValue": "test-jaeger-query-api" + } + } + ], + "status": { + "message": "success", + "code": 0 + } + } + ] + } + ], + "schemaUrl": "https://opentelemetry.io/schemas/1.4.0" + } + ] + } + "#; + + let req: ExportTraceServiceRequest = serde_json::from_str(content).unwrap(); + let body = req.encode_to_vec(); + // write traces data. + let res = send_req( + &client, + vec![( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/x-protobuf"), + )], + "/v1/otlp/v1/traces", + body.clone(), + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + // Test `/api/services` API. + let res = client.get("/v1/jaeger/api/services").send().await; + assert_eq!(StatusCode::OK, res.status()); + let expected = r#" + { + "data": [ + "test-jaeger-query-api" + ], + "total": 1, + "limit": 0, + "offset": 0, + "errors": [] + } + "#; + let resp: Value = serde_json::from_str(&res.text().await).unwrap(); + let expected: Value = serde_json::from_str(expected).unwrap(); + assert_eq!(resp, expected); + + // Test `/api/operations` API. + let res = client + .get("/v1/jaeger/api/operations?service=test-jaeger-query-api") + .send() + .await; + assert_eq!(StatusCode::OK, res.status()); + let expected = r#" + { + "data": [ + { + "name": "access-mysql", + "spanKind": "server" + }, + { + "name": "access-redis", + "spanKind": "server" + } + ], + "total": 2, + "limit": 0, + "offset": 0, + "errors": [] + } + "#; + let resp: Value = serde_json::from_str(&res.text().await).unwrap(); + let expected: Value = serde_json::from_str(expected).unwrap(); + assert_eq!(resp, expected); + + // Test `/api/services/{service_name}/operations` API. + let res = client + .get("/v1/jaeger/api/services/test-jaeger-query-api/operations") + .send() + .await; + assert_eq!(StatusCode::OK, res.status()); + let expected = r#" + { + "data": [ + "access-mysql", + "access-redis" + ], + "total": 2, + "limit": 0, + "offset": 0, + "errors": [] + } + "#; + let resp: Value = serde_json::from_str(&res.text().await).unwrap(); + let expected: Value = serde_json::from_str(expected).unwrap(); + assert_eq!(resp, expected); + + // Test `/api/traces/{trace_id}` API. + let res = client + .get("/v1/jaeger/api/traces/5611dce1bc9ebed65352d99a027b08ea") + .send() + .await; + assert_eq!(StatusCode::OK, res.status()); + let expected = r#" + { + "data": [ + { + "traceID": "5611dce1bc9ebed65352d99a027b08ea", + "spans": [ + { + "traceID": "5611dce1bc9ebed65352d99a027b08ea", + "spanID": "008421dbbd33a3e9", + "operationName": "access-mysql", + "references": [], + "startTime": 1738726754492422, + "duration": 100000, + "tags": [ + { + "key": "net.peer.ip", + "type": "string", + "value": "1.2.3.4" + }, + { + "key": "operation.type", + "type": "string", + "value": "access-mysql" + }, + { + "key": "peer.service", + "type": "string", + "value": "test-jaeger-query-api" + } + ], + "logs": [], + "processID": "p1" + }, + { + "traceID": "5611dce1bc9ebed65352d99a027b08ea", + "spanID": "ffa03416a7b9ea48", + "operationName": "access-redis", + "references": [], + "startTime": 1738726754492422, + "duration": 100000, + "tags": [ + { + "key": "net.peer.ip", + "type": "string", + "value": "1.2.3.4" + }, + { + "key": "operation.type", + "type": "string", + "value": "access-redis" + }, + { + "key": "peer.service", + "type": "string", + "value": "test-jaeger-query-api" + } + ], + "logs": [], + "processID": "p1" + } + ], + "processes": { + "p1": { + "serviceName": "test-jaeger-query-api", + "tags": [] + } + } + } + ], + "total": 0, + "limit": 0, + "offset": 0, + "errors": [] + } + "#; + + let resp: Value = serde_json::from_str(&res.text().await).unwrap(); + let expected: Value = serde_json::from_str(expected).unwrap(); + assert_eq!(resp, expected); + + // Test `/api/traces` API. + let res = client + .get("/v1/jaeger/api/traces?service=test-jaeger-query-api&operation=access-mysql&start=1738726754492422&end=1738726754642422&tags=%7B%22operation.type%22%3A%22access-mysql%22%7D") + .send() + .await; + assert_eq!(StatusCode::OK, res.status()); + let expected = r#" + { + "data": [ + { + "traceID": "5611dce1bc9ebed65352d99a027b08ea", + "spans": [ + { + "traceID": "5611dce1bc9ebed65352d99a027b08ea", + "spanID": "008421dbbd33a3e9", + "operationName": "access-mysql", + "references": [], + "startTime": 1738726754492422, + "duration": 100000, + "tags": [ + { + "key": "net.peer.ip", + "type": "string", + "value": "1.2.3.4" + }, + { + "key": "operation.type", + "type": "string", + "value": "access-mysql" + }, + { + "key": "peer.service", + "type": "string", + "value": "test-jaeger-query-api" + } + ], + "logs": [], + "processID": "p1" + } + ], + "processes": { + "p1": { + "serviceName": "test-jaeger-query-api", + "tags": [] + } + } + } + ], + "total": 0, + "limit": 0, + "offset": 0, + "errors": [] + } + "#; + + let resp: Value = serde_json::from_str(&res.text().await).unwrap(); + let expected: Value = serde_json::from_str(expected).unwrap(); + assert_eq!(resp, expected); + + guard.remove_all().await; +} + async fn validate_data(test_name: &str, client: &TestClient, sql: &str, expected: &str) { let res = client .get(format!("/v1/sql?sql={sql}").as_str())