feat: implement Jaeger query APIs (#5452)

* feat: implement jaeger query api

* test: add some unit tests

* test: add integration tests for jaeger query APIs

* refactor: parse tags from url parameters

* refactor: support to query traces by tags

* refactor: add limit parameter

* refactor: add jaeger query api metrics

* chore: add some comment docs and default limit value

* test: add more unit tests

* docs: add jaeger options in config docs

* refactor: code review

* wip

* refactor: use datafusion's dataframe APIs to query traces

* refactor: code review

* chore: format test cases

* refactor: add check_schema()

* chore: fix clippy errors and rename function name

* refactor: throw error when covert start_time and duration error

* chore: modify incorrect request type name

* chore: remove unecessary serde rename

* refactor: add some important comments

* refactor: add SPAN_KIND_PREFIX

* refactor: code review
This commit is contained in:
zyy17
2025-02-13 08:36:38 +09:00
committed by GitHub
parent 58c6274bf6
commit 954310f917
23 changed files with 2143 additions and 10 deletions

1
Cargo.lock generated
View File

@@ -4336,6 +4336,7 @@ dependencies = [
"common-test-util",
"common-time",
"common-version",
"datafusion",
"datafusion-expr",
"datanode",
"datatypes",

View File

@@ -60,6 +60,8 @@
| `opentsdb.enable` | Bool | `true` | Whether to enable OpenTSDB put in HTTP API. |
| `influxdb` | -- | -- | InfluxDB protocol options. |
| `influxdb.enable` | Bool | `true` | Whether to enable InfluxDB protocol in HTTP API. |
| `jaeger` | -- | -- | Jaeger protocol options. |
| `jaeger.enable` | Bool | `true` | Whether to enable Jaeger protocol in HTTP API. |
| `prom_store` | -- | -- | Prometheus remote storage options |
| `prom_store.enable` | Bool | `true` | Whether to enable Prometheus remote write and read in HTTP API. |
| `prom_store.with_metric_engine` | Bool | `true` | Whether to store the data from Prometheus remote write in metric engine. |
@@ -256,6 +258,8 @@
| `opentsdb.enable` | Bool | `true` | Whether to enable OpenTSDB put in HTTP API. |
| `influxdb` | -- | -- | InfluxDB protocol options. |
| `influxdb.enable` | Bool | `true` | Whether to enable InfluxDB protocol in HTTP API. |
| `jaeger` | -- | -- | Jaeger protocol options. |
| `jaeger.enable` | Bool | `true` | Whether to enable Jaeger protocol in HTTP API. |
| `prom_store` | -- | -- | Prometheus remote storage options |
| `prom_store.enable` | Bool | `true` | Whether to enable Prometheus remote write and read in HTTP API. |
| `prom_store.with_metric_engine` | Bool | `true` | Whether to store the data from Prometheus remote write in metric engine. |

View File

@@ -138,6 +138,11 @@ enable = true
## Whether to enable InfluxDB protocol in HTTP API.
enable = true
## Jaeger protocol options.
[jaeger]
## Whether to enable Jaeger protocol in HTTP API.
enable = true
## Prometheus remote storage options
[prom_store]
## Whether to enable Prometheus remote write and read in HTTP API.

View File

@@ -142,6 +142,11 @@ enable = true
## Whether to enable InfluxDB protocol in HTTP API.
enable = true
## Jaeger protocol options.
[jaeger]
## Whether to enable Jaeger protocol in HTTP API.
enable = true
## Prometheus remote storage options
[prom_store]
## Whether to enable Prometheus remote write and read in HTTP API.

View File

@@ -60,7 +60,8 @@ use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
use frontend::server::Services;
use frontend::service_config::{
InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions,
InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, PostgresOptions,
PromStoreOptions,
};
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
use mito2::config::MitoConfig;
@@ -140,6 +141,7 @@ pub struct StandaloneOptions {
pub postgres: PostgresOptions,
pub opentsdb: OpentsdbOptions,
pub influxdb: InfluxdbOptions,
pub jaeger: JaegerOptions,
pub prom_store: PromStoreOptions,
pub wal: DatanodeWalConfig,
pub storage: StorageConfig,
@@ -169,6 +171,7 @@ impl Default for StandaloneOptions {
postgres: PostgresOptions::default(),
opentsdb: OpentsdbOptions::default(),
influxdb: InfluxdbOptions::default(),
jaeger: JaegerOptions::default(),
prom_store: PromStoreOptions::default(),
wal: DatanodeWalConfig::default(),
storage: StorageConfig::default(),
@@ -217,6 +220,7 @@ impl StandaloneOptions {
postgres: cloned_opts.postgres,
opentsdb: cloned_opts.opentsdb,
influxdb: cloned_opts.influxdb,
jaeger: cloned_opts.jaeger,
prom_store: cloned_opts.prom_store,
meta_client: None,
logging: cloned_opts.logging,

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use std::sync::Arc;
mod json_get;
pub mod json_get;
mod json_is;
mod json_path_exists;
mod json_path_match;

View File

@@ -35,6 +35,7 @@ common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-version.workspace = true
datafusion.workspace = true
datafusion-expr.workspace = true
datanode.workspace = true
datatypes.workspace = true
@@ -52,6 +53,7 @@ promql-parser.workspace = true
prost.workspace = true
query.workspace = true
serde.workspace = true
serde_json.workspace = true
servers.workspace = true
session.workspace = true
snafu.workspace = true

View File

@@ -24,7 +24,8 @@ use servers::heartbeat_options::HeartbeatOptions;
use servers::http::HttpOptions;
use crate::service_config::{
InfluxdbOptions, MysqlOptions, OpentsdbOptions, OtlpOptions, PostgresOptions, PromStoreOptions,
InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, OtlpOptions, PostgresOptions,
PromStoreOptions,
};
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
@@ -40,6 +41,7 @@ pub struct FrontendOptions {
pub opentsdb: OpentsdbOptions,
pub influxdb: InfluxdbOptions,
pub prom_store: PromStoreOptions,
pub jaeger: JaegerOptions,
pub otlp: OtlpOptions,
pub meta_client: Option<MetaClientOptions>,
pub logging: LoggingOptions,
@@ -62,6 +64,7 @@ impl Default for FrontendOptions {
postgres: PostgresOptions::default(),
opentsdb: OpentsdbOptions::default(),
influxdb: InfluxdbOptions::default(),
jaeger: JaegerOptions::default(),
prom_store: PromStoreOptions::default(),
otlp: OtlpOptions::default(),
meta_client: None,

View File

@@ -15,6 +15,7 @@
pub mod builder;
mod grpc;
mod influxdb;
mod jaeger;
mod log_handler;
mod logs;
mod opentsdb;
@@ -65,7 +66,7 @@ use servers::prometheus_handler::PrometheusHandler;
use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::{
InfluxdbLineProtocolHandler, LogQueryHandler, OpenTelemetryProtocolHandler,
InfluxdbLineProtocolHandler, JaegerQueryHandler, LogQueryHandler, OpenTelemetryProtocolHandler,
OpentsdbProtocolHandler, PipelineHandler, PromStoreProtocolHandler,
};
use servers::server::ServerHandlers;
@@ -100,6 +101,7 @@ pub trait FrontendInstance:
+ PrometheusHandler
+ PipelineHandler
+ LogQueryHandler
+ JaegerQueryHandler
+ Send
+ Sync
+ 'static
@@ -167,6 +169,10 @@ impl Instance {
&self.catalog_manager
}
pub fn query_engine(&self) -> &QueryEngineRef {
&self.query_engine
}
pub fn plugins(&self) -> Plugins {
self.plugins.clone()
}

View File

@@ -0,0 +1,337 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_function::function::{Function, FunctionRef};
use common_function::scalars::json::json_get::{
JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString,
};
use common_function::scalars::udf::create_udf;
use common_function::state::FunctionState;
use common_query::Output;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use datafusion::dataframe::DataFrame;
use datafusion::execution::context::SessionContext;
use datafusion::execution::SessionStateBuilder;
use datafusion_expr::{col, lit, lit_timestamp_nano, Expr};
use query::QueryEngineRef;
use serde_json::Value as JsonValue;
use servers::error::{
CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
TableNotFoundSnafu,
};
use servers::http::jaeger::QueryTraceParams;
use servers::otlp::trace::{
DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_ID_COLUMN,
SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
TRACE_TABLE_NAME,
};
use servers::query_handler::JaegerQueryHandler;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use table::table::adapter::DfTableProviderAdapter;
use super::Instance;
const DEFAULT_LIMIT: usize = 100;
#[async_trait]
impl JaegerQueryHandler for Instance {
async fn get_services(&self, ctx: QueryContextRef) -> ServerResult<Output> {
// It's equivalent to `SELECT DISTINCT(service_name) FROM {db}.{trace_table}`.
Ok(query_trace_table(
ctx,
self.catalog_manager(),
self.query_engine(),
vec![col(SERVICE_NAME_COLUMN)],
vec![],
Some(DEFAULT_LIMIT),
None,
true,
)
.await?)
}
async fn get_operations(
&self,
ctx: QueryContextRef,
service_name: &str,
span_kind: Option<&str>,
) -> ServerResult<Output> {
let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))];
if let Some(span_kind) = span_kind {
filters.push(col(SPAN_KIND_COLUMN).eq(lit(format!(
"{}{}",
SPAN_KIND_PREFIX,
span_kind.to_uppercase()
))));
}
// It's equivalent to `SELECT span_name, span_kind FROM {db}.{trace_table} WHERE service_name = '{service_name}'`.
Ok(query_trace_table(
ctx,
self.catalog_manager(),
self.query_engine(),
vec![
col(SPAN_NAME_COLUMN),
col(SPAN_KIND_COLUMN),
col(SERVICE_NAME_COLUMN),
],
filters,
Some(DEFAULT_LIMIT),
None,
false,
)
.await?)
}
async fn get_trace(&self, ctx: QueryContextRef, trace_id: &str) -> ServerResult<Output> {
// It's equivalent to `SELECT trace_id, timestamp, duration_nano, service_name, span_name, span_id, span_attributes FROM {db}.{trace_table} WHERE trace_id = '{trace_id}'`.
let selects = vec![
col(TRACE_ID_COLUMN),
col(TIMESTAMP_COLUMN),
col(DURATION_NANO_COLUMN),
col(SERVICE_NAME_COLUMN),
col(SPAN_NAME_COLUMN),
col(SPAN_ID_COLUMN),
col(SPAN_ATTRIBUTES_COLUMN),
];
let filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
Ok(query_trace_table(
ctx,
self.catalog_manager(),
self.query_engine(),
selects,
filters,
Some(DEFAULT_LIMIT),
None,
false,
)
.await?)
}
async fn find_traces(
&self,
ctx: QueryContextRef,
query_params: QueryTraceParams,
) -> ServerResult<Output> {
let selects = vec![
col(TRACE_ID_COLUMN),
col(TIMESTAMP_COLUMN),
col(DURATION_NANO_COLUMN),
col(SERVICE_NAME_COLUMN),
col(SPAN_NAME_COLUMN),
col(SPAN_ID_COLUMN),
col(SPAN_ATTRIBUTES_COLUMN),
];
let mut filters = vec![];
if let Some(operation_name) = query_params.operation_name {
filters.push(col(SPAN_NAME_COLUMN).eq(lit(operation_name)));
}
if let Some(start_time) = query_params.start_time {
filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
}
if let Some(end_time) = query_params.end_time {
filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
}
if let Some(min_duration) = query_params.min_duration {
filters.push(col(DURATION_NANO_COLUMN).gt_eq(lit(min_duration)));
}
if let Some(max_duration) = query_params.max_duration {
filters.push(col(DURATION_NANO_COLUMN).lt_eq(lit(max_duration)));
}
Ok(query_trace_table(
ctx,
self.catalog_manager(),
self.query_engine(),
selects,
filters,
Some(DEFAULT_LIMIT),
query_params.tags,
false,
)
.await?)
}
}
#[allow(clippy::too_many_arguments)]
async fn query_trace_table(
ctx: QueryContextRef,
catalog_manager: &CatalogManagerRef,
query_engine: &QueryEngineRef,
selects: Vec<Expr>,
filters: Vec<Expr>,
limit: Option<usize>,
tags: Option<HashMap<String, JsonValue>>,
distinct: bool,
) -> ServerResult<Output> {
let db = ctx.get_db_string();
let table = catalog_manager
.table(ctx.current_catalog(), &db, TRACE_TABLE_NAME, Some(&ctx))
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table: TRACE_TABLE_NAME,
catalog: ctx.current_catalog(),
schema: db,
})?;
let df_context = create_df_context(query_engine, ctx.clone())?;
let dataframe = df_context
.read_table(Arc::new(DfTableProviderAdapter::new(table)))
.context(DataFusionSnafu)?;
let dataframe = dataframe.select(selects).context(DataFusionSnafu)?;
// Apply all filters.
let dataframe = filters
.into_iter()
.chain(tags.map_or(Ok(vec![]), |t| tags_filters(&dataframe, t))?)
.try_fold(dataframe, |df, expr| {
df.filter(expr).context(DataFusionSnafu)
})?;
// Apply the distinct if needed.
let dataframe = if distinct {
dataframe.distinct().context(DataFusionSnafu)?
} else {
dataframe
};
// Apply the limit if needed.
let dataframe = if let Some(limit) = limit {
dataframe.limit(0, Some(limit)).context(DataFusionSnafu)?
} else {
dataframe
};
// Execute the query and collect the result.
let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
let output = Output::new_with_stream(Box::pin(
RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
));
Ok(output)
}
// The current implementation registers UDFs during the planning stage, which makes it difficult
// to utilize them through DataFrame APIs. To address this limitation, we create a new session
// context and register the required UDFs, allowing them to be decoupled from the global context.
// TODO(zyy17): Is it possible or necessary to reuse the existing session context?
fn create_df_context(
query_engine: &QueryEngineRef,
ctx: QueryContextRef,
) -> ServerResult<SessionContext> {
let df_context = SessionContext::new_with_state(
SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
);
// The following JSON UDFs will be used for tags filters.
let udfs: Vec<FunctionRef> = vec![
Arc::new(JsonGetInt),
Arc::new(JsonGetFloat),
Arc::new(JsonGetBool),
Arc::new(JsonGetString),
];
for udf in udfs {
df_context
.register_udf(create_udf(udf, ctx.clone(), Arc::new(FunctionState::default())).into());
}
Ok(df_context)
}
fn tags_filters(
dataframe: &DataFrame,
tags: HashMap<String, JsonValue>,
) -> ServerResult<Vec<Expr>> {
let mut filters = vec![];
// NOTE: The key of the tags may contain `.`, for example: `http.status_code`, so we need to use `["http.status_code"]` in json path to access the value.
for (key, value) in tags.iter() {
if let JsonValue::String(value) = value {
filters.push(
dataframe
.registry()
.udf(JsonGetString {}.name())
.context(DataFusionSnafu)?
.call(vec![
col(SPAN_ATTRIBUTES_COLUMN),
lit(format!("[\"{}\"]", key)),
])
.eq(lit(value)),
);
}
if let JsonValue::Number(value) = value {
if value.is_i64() {
filters.push(
dataframe
.registry()
.udf(JsonGetInt {}.name())
.context(DataFusionSnafu)?
.call(vec![
col(SPAN_ATTRIBUTES_COLUMN),
lit(format!("[\"{}\"]", key)),
])
.eq(lit(value.as_i64().unwrap())),
);
}
if value.is_f64() {
filters.push(
dataframe
.registry()
.udf(JsonGetFloat {}.name())
.context(DataFusionSnafu)?
.call(vec![
col(SPAN_ATTRIBUTES_COLUMN),
lit(format!("[\"{}\"]", key)),
])
.eq(lit(value.as_f64().unwrap())),
);
}
}
if let JsonValue::Bool(value) = value {
filters.push(
dataframe
.registry()
.udf(JsonGetBool {}.name())
.context(DataFusionSnafu)?
.call(vec![
col(SPAN_ATTRIBUTES_COLUMN),
lit(format!("[\"{}\"]", key)),
])
.eq(lit(*value)),
);
}
}
Ok(filters)
}

View File

@@ -112,6 +112,11 @@ where
if opts.otlp.enable {
builder = builder.with_otlp_handler(self.instance.clone());
}
if opts.jaeger.enable {
builder = builder.with_jaeger_handler(self.instance.clone());
}
builder
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
pub mod influxdb;
pub mod jaeger;
pub mod mysql;
pub mod opentsdb;
pub mod otlp;
@@ -20,6 +21,7 @@ pub mod postgres;
pub mod prom_store;
pub use influxdb::InfluxdbOptions;
pub use jaeger::JaegerOptions;
pub use mysql::MysqlOptions;
pub use opentsdb::OpentsdbOptions;
pub use otlp::OtlpOptions;

View File

@@ -0,0 +1,39 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use serde::{Deserialize, Serialize};
/// Options for Jaeger query APIs.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct JaegerOptions {
/// Whether to enable Jaeger query APIs.
pub enable: bool,
}
impl Default for JaegerOptions {
fn default() -> Self {
Self { enable: true }
}
}
#[cfg(test)]
mod tests {
use super::JaegerOptions;
#[test]
fn test_jaeger_options() {
let default = JaegerOptions::default();
assert!(default.enable);
}
}

View File

@@ -292,7 +292,7 @@ impl QueryEngineState {
.unwrap_or(false)
}
pub(crate) fn session_state(&self) -> SessionState {
pub fn session_state(&self) -> SessionState {
self.df_context.state()
}

View File

@@ -25,6 +25,7 @@ use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_telemetry::{error, warn};
use datafusion::error::DataFusionError;
use datatypes::prelude::ConcreteDataType;
use headers::ContentType;
use http::header::InvalidHeaderValue;
@@ -598,6 +599,21 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid Jaeger query, reason: {}", reason))]
InvalidJaegerQuery {
reason: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("DataFusion error"))]
DataFusion {
#[snafu(source)]
error: DataFusionError,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -668,7 +684,8 @@ impl ErrorExt for Error {
| InvalidTableName { .. }
| PrepareStatementNotFound { .. }
| FailedToParseQuery { .. }
| InvalidElasticsearchInput { .. } => StatusCode::InvalidArguments,
| InvalidElasticsearchInput { .. }
| InvalidJaegerQuery { .. } => StatusCode::InvalidArguments,
Catalog { source, .. } => source.status_code(),
RowWriter { source, .. } => source.status_code(),
@@ -711,7 +728,7 @@ impl ErrorExt for Error {
ConvertScalarValue { source, .. } => source.status_code(),
ToJson { .. } => StatusCode::Internal,
ToJson { .. } | DataFusion { .. } => StatusCode::Internal,
ConvertSqlValue { source, .. } => source.status_code(),

View File

@@ -72,8 +72,9 @@ use crate::metrics_handler::MetricsHandler;
use crate::prometheus_handler::PrometheusHandlerRef;
use crate::query_handler::sql::ServerSqlQueryHandlerRef;
use crate::query_handler::{
InfluxdbLineProtocolHandlerRef, LogQueryHandlerRef, OpenTelemetryProtocolHandlerRef,
OpentsdbProtocolHandlerRef, PipelineHandlerRef, PromStoreProtocolHandlerRef,
InfluxdbLineProtocolHandlerRef, JaegerQueryHandlerRef, LogQueryHandlerRef,
OpenTelemetryProtocolHandlerRef, OpentsdbProtocolHandlerRef, PipelineHandlerRef,
PromStoreProtocolHandlerRef,
};
use crate::server::Server;
@@ -86,6 +87,7 @@ mod extractor;
pub mod handler;
pub mod header;
pub mod influxdb;
pub mod jaeger;
pub mod logs;
pub mod loki;
pub mod mem_prof;
@@ -652,6 +654,16 @@ impl HttpServerBuilder {
}
}
pub fn with_jaeger_handler(self, handler: JaegerQueryHandlerRef) -> Self {
Self {
router: self.router.nest(
&format!("/{HTTP_API_VERSION}/jaeger"),
HttpServer::route_jaeger(handler),
),
..self
}
}
pub fn with_extra_router(self, router: Router) -> Self {
Self {
router: self.router.merge(router),
@@ -1054,6 +1066,25 @@ impl HttpServer {
.route("/config", routing::get(handler::config))
.with_state(state)
}
fn route_jaeger<S>(handler: JaegerQueryHandlerRef) -> Router<S> {
Router::new()
.route("/api/services", routing::get(jaeger::handle_get_services))
.route(
"/api/services/{service_name}/operations",
routing::get(jaeger::handle_get_operations_by_service),
)
.route(
"/api/operations",
routing::get(jaeger::handle_get_operations),
)
.route("/api/traces", routing::get(jaeger::handle_find_traces))
.route(
"/api/traces/{trace_id}",
routing::get(jaeger::handle_get_trace),
)
.with_state(handler)
}
}
pub const HTTP_SERVER: &str = "HTTP_SERVER";

File diff suppressed because it is too large Load Diff

View File

@@ -272,6 +272,12 @@ lazy_static! {
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
)
.unwrap();
pub static ref METRIC_JAEGER_QUERY_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_servers_jaeger_query_elapsed",
"servers jaeger query elapsed",
&[METRIC_DB_LABEL, METRIC_PATH_LABEL]
)
.unwrap();
}
// Based on https://github.com/hyperium/tonic/blob/master/examples/src/tower/server.rs

View File

@@ -25,7 +25,20 @@ use crate::otlp::utils::{make_column_data, make_string_column_data};
use crate::row_writer::{self, MultiTableData, TableData};
const APPROXIMATE_COLUMN_COUNT: usize = 24;
pub const TRACE_TABLE_NAME: &str = "opentelemetry_traces";
pub const SERVICE_NAME_COLUMN: &str = "service_name";
pub const TRACE_ID_COLUMN: &str = "trace_id";
pub const TIMESTAMP_COLUMN: &str = "timestamp";
pub const DURATION_NANO_COLUMN: &str = "duration_nano";
pub const SPAN_ID_COLUMN: &str = "span_id";
pub const SPAN_NAME_COLUMN: &str = "span_name";
pub const SPAN_KIND_COLUMN: &str = "span_kind";
pub const SPAN_ATTRIBUTES_COLUMN: &str = "span_attributes";
/// The span kind prefix in the database.
/// If the span kind is `server`, it will be stored as `SPAN_KIND_SERVER` in the database.
pub const SPAN_KIND_PREFIX: &str = "SPAN_KIND_";
pub mod attributes;
pub mod span;

View File

@@ -46,16 +46,17 @@ use serde_json::Value;
use session::context::{QueryContext, QueryContextRef};
use crate::error::Result;
use crate::http::jaeger::QueryTraceParams;
use crate::influxdb::InfluxdbRequest;
use crate::opentsdb::codec::DataPoint;
use crate::prom_store::Metrics;
pub type OpentsdbProtocolHandlerRef = Arc<dyn OpentsdbProtocolHandler + Send + Sync>;
pub type InfluxdbLineProtocolHandlerRef = Arc<dyn InfluxdbLineProtocolHandler + Send + Sync>;
pub type PromStoreProtocolHandlerRef = Arc<dyn PromStoreProtocolHandler + Send + Sync>;
pub type OpenTelemetryProtocolHandlerRef = Arc<dyn OpenTelemetryProtocolHandler + Send + Sync>;
pub type PipelineHandlerRef = Arc<dyn PipelineHandler + Send + Sync>;
pub type LogQueryHandlerRef = Arc<dyn LogQueryHandler + Send + Sync>;
pub type JaegerQueryHandlerRef = Arc<dyn JaegerQueryHandler + Send + Sync>;
#[async_trait]
pub trait InfluxdbLineProtocolHandler {
@@ -170,3 +171,28 @@ pub trait PipelineHandler {
pub trait LogQueryHandler {
async fn query(&self, query: LogQuery, ctx: QueryContextRef) -> Result<Output>;
}
/// Handle Jaeger query requests.
#[async_trait]
pub trait JaegerQueryHandler {
/// Get trace services. It's used for `/api/services` API.
async fn get_services(&self, ctx: QueryContextRef) -> Result<Output>;
/// Get Jaeger operations. It's used for `/api/operations` and `/api/services/{service_name}/operations` API.
async fn get_operations(
&self,
ctx: QueryContextRef,
service_name: &str,
span_kind: Option<&str>,
) -> Result<Output>;
/// Get trace by trace id. It's used for `/api/traces/{trace_id}` API.
async fn get_trace(&self, ctx: QueryContextRef, trace_id: &str) -> Result<Output>;
/// Find traces by query params. It's used for `/api/traces` API.
async fn find_traces(
&self,
ctx: QueryContextRef,
query_params: QueryTraceParams,
) -> Result<Output>;
}

View File

@@ -399,6 +399,7 @@ pub enum Channel {
Opentsdb = 8,
Loki = 9,
Elasticsearch = 10,
Jaeger = 11,
}
impl From<u32> for Channel {
@@ -414,6 +415,7 @@ impl From<u32> for Channel {
8 => Self::Opentsdb,
9 => Self::Loki,
10 => Self::Elasticsearch,
11 => Self::Jaeger,
_ => Self::Unknown,
}
}
@@ -442,6 +444,7 @@ impl Display for Channel {
Channel::Opentsdb => write!(f, "opentsdb"),
Channel::Loki => write!(f, "loki"),
Channel::Elasticsearch => write!(f, "elasticsearch"),
Channel::Jaeger => write!(f, "jaeger"),
Channel::Unknown => write!(f, "unknown"),
}
}

View File

@@ -431,6 +431,7 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider(
.with_log_ingest_handler(instance.instance.clone(), None, None)
.with_logs_handler(instance.instance.clone())
.with_otlp_handler(instance.instance.clone())
.with_jaeger_handler(instance.instance.clone())
.with_greptime_config_options(instance.opts.to_toml().unwrap());
if let Some(user_provider) = user_provider {

View File

@@ -103,6 +103,7 @@ macro_rules! http_tests {
test_elasticsearch_logs,
test_elasticsearch_logs_with_index,
test_log_query,
test_jaeger_query_api,
);
)*
};
@@ -949,6 +950,9 @@ enable = true
[influxdb]
enable = true
[jaeger]
enable = true
[prom_store]
enable = true
with_metric_engine = true
@@ -2505,6 +2509,347 @@ pub async fn test_log_query(store_type: StorageType) {
guard.remove_all().await;
}
pub async fn test_jaeger_query_api(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
setup_test_http_app_with_frontend(store_type, "test_jaeger_query_api").await;
let client = TestClient::new(app).await;
let content = r#"
{
"resourceSpans": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "test-jaeger-query-api"
}
}
]
},
"scopeSpans": [
{
"scope": {
"name": "test-jaeger-query-api",
"version": "1.0.0"
},
"spans": [
{
"traceId": "5611dce1bc9ebed65352d99a027b08ea",
"spanId": "008421dbbd33a3e9",
"name": "access-mysql",
"kind": 2,
"startTimeUnixNano": "1738726754492422000",
"endTimeUnixNano": "1738726754592422000",
"attributes": [
{
"key": "operation.type",
"value": {
"stringValue": "access-mysql"
}
},
{
"key": "net.peer.ip",
"value": {
"stringValue": "1.2.3.4"
}
},
{
"key": "peer.service",
"value": {
"stringValue": "test-jaeger-query-api"
}
}
],
"status": {
"message": "success",
"code": 0
}
}
]
},
{
"scope": {
"name": "test-jaeger-query-api",
"version": "1.0.0"
},
"spans": [
{
"traceId": "5611dce1bc9ebed65352d99a027b08ea",
"spanId": "ffa03416a7b9ea48",
"name": "access-redis",
"kind": 2,
"startTimeUnixNano": "1738726754492422000",
"endTimeUnixNano": "1738726754592422000",
"attributes": [
{
"key": "operation.type",
"value": {
"stringValue": "access-redis"
}
},
{
"key": "net.peer.ip",
"value": {
"stringValue": "1.2.3.4"
}
},
{
"key": "peer.service",
"value": {
"stringValue": "test-jaeger-query-api"
}
}
],
"status": {
"message": "success",
"code": 0
}
}
]
}
],
"schemaUrl": "https://opentelemetry.io/schemas/1.4.0"
}
]
}
"#;
let req: ExportTraceServiceRequest = serde_json::from_str(content).unwrap();
let body = req.encode_to_vec();
// write traces data.
let res = send_req(
&client,
vec![(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/x-protobuf"),
)],
"/v1/otlp/v1/traces",
body.clone(),
false,
)
.await;
assert_eq!(StatusCode::OK, res.status());
// Test `/api/services` API.
let res = client.get("/v1/jaeger/api/services").send().await;
assert_eq!(StatusCode::OK, res.status());
let expected = r#"
{
"data": [
"test-jaeger-query-api"
],
"total": 1,
"limit": 0,
"offset": 0,
"errors": []
}
"#;
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
// Test `/api/operations` API.
let res = client
.get("/v1/jaeger/api/operations?service=test-jaeger-query-api")
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
let expected = r#"
{
"data": [
{
"name": "access-mysql",
"spanKind": "server"
},
{
"name": "access-redis",
"spanKind": "server"
}
],
"total": 2,
"limit": 0,
"offset": 0,
"errors": []
}
"#;
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
// Test `/api/services/{service_name}/operations` API.
let res = client
.get("/v1/jaeger/api/services/test-jaeger-query-api/operations")
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
let expected = r#"
{
"data": [
"access-mysql",
"access-redis"
],
"total": 2,
"limit": 0,
"offset": 0,
"errors": []
}
"#;
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
// Test `/api/traces/{trace_id}` API.
let res = client
.get("/v1/jaeger/api/traces/5611dce1bc9ebed65352d99a027b08ea")
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
let expected = r#"
{
"data": [
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spans": [
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spanID": "008421dbbd33a3e9",
"operationName": "access-mysql",
"references": [],
"startTime": 1738726754492422,
"duration": 100000,
"tags": [
{
"key": "net.peer.ip",
"type": "string",
"value": "1.2.3.4"
},
{
"key": "operation.type",
"type": "string",
"value": "access-mysql"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
}
],
"logs": [],
"processID": "p1"
},
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spanID": "ffa03416a7b9ea48",
"operationName": "access-redis",
"references": [],
"startTime": 1738726754492422,
"duration": 100000,
"tags": [
{
"key": "net.peer.ip",
"type": "string",
"value": "1.2.3.4"
},
{
"key": "operation.type",
"type": "string",
"value": "access-redis"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
}
],
"logs": [],
"processID": "p1"
}
],
"processes": {
"p1": {
"serviceName": "test-jaeger-query-api",
"tags": []
}
}
}
],
"total": 0,
"limit": 0,
"offset": 0,
"errors": []
}
"#;
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
// Test `/api/traces` API.
let res = client
.get("/v1/jaeger/api/traces?service=test-jaeger-query-api&operation=access-mysql&start=1738726754492422&end=1738726754642422&tags=%7B%22operation.type%22%3A%22access-mysql%22%7D")
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
let expected = r#"
{
"data": [
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spans": [
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spanID": "008421dbbd33a3e9",
"operationName": "access-mysql",
"references": [],
"startTime": 1738726754492422,
"duration": 100000,
"tags": [
{
"key": "net.peer.ip",
"type": "string",
"value": "1.2.3.4"
},
{
"key": "operation.type",
"type": "string",
"value": "access-mysql"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
}
],
"logs": [],
"processID": "p1"
}
],
"processes": {
"p1": {
"serviceName": "test-jaeger-query-api",
"tags": []
}
}
}
],
"total": 0,
"limit": 0,
"offset": 0,
"errors": []
}
"#;
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
guard.remove_all().await;
}
async fn validate_data(test_name: &str, client: &TestClient, sql: &str, expected: &str) {
let res = client
.get(format!("/v1/sql?sql={sql}").as_str())