diff --git a/src/frontend/src/instance/jaeger.rs b/src/frontend/src/instance/jaeger.rs index b82cf65e2d..8e7c350386 100644 --- a/src/frontend/src/instance/jaeger.rs +++ b/src/frontend/src/instance/jaeger.rs @@ -36,11 +36,11 @@ use servers::error::{ TableNotFoundSnafu, }; use servers::http::jaeger::QueryTraceParams; -use servers::otlp::trace::{ +use servers::otlp::trace::v0::{ DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, - TRACE_TABLE_NAME, }; +use servers::otlp::trace::TRACE_TABLE_NAME; use servers::query_handler::JaegerQueryHandler; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index fff075cac6..f1642da1cb 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -72,7 +72,10 @@ impl OpenTelemetryProtocolHandler for Instance { #[tracing::instrument(skip_all)] async fn traces( &self, + pipeline_handler: PipelineHandlerRef, request: ExportTraceServiceRequest, + pipeline: PipelineWay, + pipeline_params: GreptimePipelineParams, table_name: String, ctx: QueryContextRef, ) -> ServerResult { @@ -87,9 +90,14 @@ impl OpenTelemetryProtocolHandler for Instance { .get::>(); interceptor_ref.pre_execute(ctx.clone())?; - let spans = otlp::trace::parse(request); - - let (requests, rows) = otlp::trace::to_grpc_insert_requests(table_name, spans)?; + let (requests, rows) = otlp::trace::to_grpc_insert_requests( + request, + pipeline, + pipeline_params, + table_name, + &ctx, + pipeline_handler, + )?; OTLP_TRACES_ROWS.inc_by(rows as u64); diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 1ca3d40e9a..e38c7ea217 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -164,7 +164,6 @@ where let grpc_server = builder .database_handler(greptime_request_handler.clone()) .prometheus_handler(self.instance.clone(), user_provider.clone()) - .otlp_handler(self.instance.clone(), user_provider) .flight_handler(Arc::new(greptime_request_handler)) .build(); Ok(grpc_server) diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index 56ec4539a0..5493dbbdf4 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -20,12 +20,9 @@ pub mod processor; pub mod transform; pub mod value; -use std::sync::Arc; - use error::{ IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, YamlLoadSnafu, YamlParseSnafu, }; -use itertools::Itertools; use processor::{Processor, Processors}; use snafu::{ensure, OptionExt, ResultExt}; use transform::{Transformer, Transforms}; @@ -34,7 +31,6 @@ use yaml_rust::YamlLoader; use crate::dispatcher::{Dispatcher, Rule}; use crate::etl::error::Result; -use crate::{GreptimeTransformer, PipelineVersion}; const DESCRIPTION: &str = "description"; const PROCESSORS: &str = "processors"; @@ -214,57 +210,6 @@ pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str .context(IntermediateKeyIndexSnafu { kind, key }) } -/// SelectInfo is used to store the selected keys from OpenTelemetry record attrs -/// The key is used to uplift value from the attributes and serve as column name in the table -#[derive(Default)] -pub struct SelectInfo { - pub keys: Vec, -} - -/// Try to convert a string to SelectInfo -/// The string should be a comma-separated list of keys -/// example: "key1,key2,key3" -/// The keys will be sorted and deduplicated -impl From for SelectInfo { - fn from(value: String) -> Self { - let mut keys: Vec = value.split(',').map(|s| s.to_string()).sorted().collect(); - keys.dedup(); - - SelectInfo { keys } - } -} - -impl SelectInfo { - pub fn is_empty(&self) -> bool { - self.keys.is_empty() - } -} - -pub const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity"; - -/// Enum for holding information of a pipeline, which is either pipeline itself, -/// or information that be used to retrieve a pipeline from `PipelineHandler` -pub enum PipelineDefinition { - Resolved(Arc>), - ByNameAndValue((String, PipelineVersion)), - GreptimeIdentityPipeline, -} - -impl PipelineDefinition { - pub fn from_name(name: &str, version: PipelineVersion) -> Self { - if name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME { - Self::GreptimeIdentityPipeline - } else { - Self::ByNameAndValue((name.to_owned(), version)) - } - } -} - -pub enum PipelineWay { - OtlpLogDirect(Box), - Pipeline(PipelineDefinition), -} - #[cfg(test)] mod tests { use api::v1::Rows; diff --git a/src/pipeline/src/lib.rs b/src/pipeline/src/lib.rs index 2b358c4572..ebcbcd332c 100644 --- a/src/pipeline/src/lib.rs +++ b/src/pipeline/src/lib.rs @@ -25,10 +25,10 @@ pub use etl::transform::{GreptimeTransformer, Transformer}; pub use etl::value::{Array, Map, Value}; pub use etl::{ error as etl_error, json_array_to_intermediate_state, json_to_intermediate_state, parse, - Content, DispatchedTo, Pipeline, PipelineDefinition, PipelineExecOutput, PipelineMap, - PipelineWay, SelectInfo, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, + Content, DispatchedTo, Pipeline, PipelineExecOutput, PipelineMap, }; pub use manager::{ - error, pipeline_operator, table, util, PipelineInfo, PipelineRef, PipelineTableRef, - PipelineVersion, + error, pipeline_operator, table, util, PipelineDefinition, PipelineInfo, PipelineRef, + PipelineTableRef, PipelineVersion, PipelineWay, SelectInfo, + GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, }; diff --git a/src/pipeline/src/manager.rs b/src/pipeline/src/manager.rs index 6938d52870..77d0f9b9f6 100644 --- a/src/pipeline/src/manager.rs +++ b/src/pipeline/src/manager.rs @@ -16,6 +16,8 @@ use std::sync::Arc; use common_time::Timestamp; use datatypes::timestamp::TimestampNanosecond; +use itertools::Itertools; +use util::to_pipeline_version; use crate::table::PipelineTable; use crate::{GreptimeTransformer, Pipeline}; @@ -37,3 +39,78 @@ pub type PipelineInfo = (Timestamp, PipelineRef); pub type PipelineTableRef = Arc; pub type PipelineRef = Arc>; + +/// SelectInfo is used to store the selected keys from OpenTelemetry record attrs +/// The key is used to uplift value from the attributes and serve as column name in the table +#[derive(Default)] +pub struct SelectInfo { + pub keys: Vec, +} + +/// Try to convert a string to SelectInfo +/// The string should be a comma-separated list of keys +/// example: "key1,key2,key3" +/// The keys will be sorted and deduplicated +impl From for SelectInfo { + fn from(value: String) -> Self { + let mut keys: Vec = value.split(',').map(|s| s.to_string()).sorted().collect(); + keys.dedup(); + + SelectInfo { keys } + } +} + +impl SelectInfo { + pub fn is_empty(&self) -> bool { + self.keys.is_empty() + } +} + +pub const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity"; +pub const GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME: &str = "greptime_trace_v1"; + +/// Enum for holding information of a pipeline, which is either pipeline itself, +/// or information that be used to retrieve a pipeline from `PipelineHandler` +pub enum PipelineDefinition { + Resolved(Arc>), + ByNameAndValue((String, PipelineVersion)), + GreptimeIdentityPipeline, +} + +impl PipelineDefinition { + pub fn from_name(name: &str, version: PipelineVersion) -> Self { + if name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME { + Self::GreptimeIdentityPipeline + } else { + Self::ByNameAndValue((name.to_owned(), version)) + } + } +} + +pub enum PipelineWay { + OtlpLogDirect(Box), + Pipeline(PipelineDefinition), + OtlpTraceDirectV0, + OtlpTraceDirectV1, +} + +impl PipelineWay { + pub fn from_name_and_default( + name: Option<&str>, + version: Option<&str>, + default_pipeline: PipelineWay, + ) -> error::Result { + if let Some(pipeline_name) = name { + if pipeline_name == GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME { + Ok(PipelineWay::OtlpTraceDirectV1) + } else { + Ok(PipelineWay::Pipeline(PipelineDefinition::from_name( + pipeline_name, + to_pipeline_version(version)?, + ))) + } + } else { + Ok(default_pipeline) + } + } +} diff --git a/src/pipeline/src/manager/util.rs b/src/pipeline/src/manager/util.rs index a7d968edcf..37aa5967e8 100644 --- a/src/pipeline/src/manager/util.rs +++ b/src/pipeline/src/manager/util.rs @@ -23,10 +23,10 @@ use crate::table::{ }; use crate::PipelineVersion; -pub fn to_pipeline_version(version_str: Option) -> Result { +pub fn to_pipeline_version(version_str: Option<&str>) -> Result { match version_str { Some(version) => { - let ts = Timestamp::from_str_utc(&version) + let ts = Timestamp::from_str_utc(version) .map_err(|_| InvalidPipelineVersionSnafu { version }.build())?; Ok(Some(TimestampNanosecond(ts))) } @@ -73,14 +73,14 @@ mod tests { assert!(none_result.is_ok()); assert!(none_result.unwrap().is_none()); - let some_result = to_pipeline_version(Some("2023-01-01 00:00:00Z".to_string())); + let some_result = to_pipeline_version(Some("2023-01-01 00:00:00Z")); assert!(some_result.is_ok()); assert_eq!( some_result.unwrap(), Some(TimestampNanosecond::new(1672531200000000000)) ); - let invalid = to_pipeline_version(Some("invalid".to_string())); + let invalid = to_pipeline_version(Some("invalid")); assert!(invalid.is_err()); } diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index bdf07502af..0d7d185d76 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -18,7 +18,6 @@ mod cancellation; mod database; pub mod flight; pub mod greptime_handler; -mod otlp; pub mod prom_query_gateway; pub mod region_server; diff --git a/src/servers/src/grpc/authorize.rs b/src/servers/src/grpc/authorize.rs index ab1eb1cd8c..bc8872b19e 100644 --- a/src/servers/src/grpc/authorize.rs +++ b/src/servers/src/grpc/authorize.rs @@ -29,12 +29,6 @@ pub struct AuthMiddlewareLayer { user_provider: Option, } -impl AuthMiddlewareLayer { - pub fn with(user_provider: Option) -> Self { - Self { user_provider } - } -} - impl Layer for AuthMiddlewareLayer { type Service = AuthMiddleware; diff --git a/src/servers/src/grpc/builder.rs b/src/servers/src/grpc/builder.rs index 6c2c04635e..b19cc280c6 100644 --- a/src/servers/src/grpc/builder.rs +++ b/src/servers/src/grpc/builder.rs @@ -19,25 +19,18 @@ use arrow_flight::flight_service_server::FlightServiceServer; use auth::UserProviderRef; use common_grpc::error::{Error, InvalidConfigFilePathSnafu, Result}; use common_runtime::Runtime; -use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer; -use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer; use snafu::ResultExt; use tokio::sync::Mutex; -use tonic::codec::CompressionEncoding; use tonic::service::RoutesBuilder; use tonic::transport::{Identity, ServerTlsConfig}; -use tower::ServiceBuilder; use super::flight::{FlightCraftRef, FlightCraftWrapper}; use super::region_server::{RegionServerHandlerRef, RegionServerRequestHandler}; use super::{GrpcServer, GrpcServerConfig}; -use crate::grpc::authorize::AuthMiddlewareLayer; use crate::grpc::database::DatabaseService; use crate::grpc::greptime_handler::GreptimeRequestHandler; -use crate::grpc::otlp::OtlpService; use crate::grpc::prom_query_gateway::PrometheusGatewayService; use crate::prometheus_handler::PrometheusHandlerRef; -use crate::query_handler::OpenTelemetryProtocolHandlerRef; use crate::tls::TlsOption; /// Add a gRPC service (`service`) to a `builder`([RoutesBuilder]). @@ -127,37 +120,6 @@ impl GrpcServerBuilder { self } - /// Add handler for OpenTelemetry Protocol (OTLP) requests. - pub fn otlp_handler( - mut self, - otlp_handler: OpenTelemetryProtocolHandlerRef, - user_provider: Option, - ) -> Self { - let tracing_service = TraceServiceServer::new(OtlpService::new(otlp_handler.clone())) - .accept_compressed(CompressionEncoding::Gzip) - .accept_compressed(CompressionEncoding::Zstd) - .send_compressed(CompressionEncoding::Gzip) - .send_compressed(CompressionEncoding::Zstd); - - let trace_server = ServiceBuilder::new() - .layer(AuthMiddlewareLayer::with(user_provider.clone())) - .service(tracing_service); - self.routes_builder.add_service(trace_server); - - let metrics_service = MetricsServiceServer::new(OtlpService::new(otlp_handler)) - .accept_compressed(CompressionEncoding::Gzip) - .accept_compressed(CompressionEncoding::Zstd) - .send_compressed(CompressionEncoding::Gzip) - .send_compressed(CompressionEncoding::Zstd); - - let metrics_server = ServiceBuilder::new() - .layer(AuthMiddlewareLayer::with(user_provider)) - .service(metrics_service); - self.routes_builder.add_service(metrics_server); - - self - } - pub fn routes_builder_mut(&mut self) -> &mut RoutesBuilder { &mut self.routes_builder } diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index d6d8e89a56..92a6432baa 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -205,7 +205,7 @@ pub async fn delete_pipeline( reason: "version is required", })?; - let version = to_pipeline_version(Some(version_str.clone())).context(PipelineSnafu)?; + let version = to_pipeline_version(Some(&version_str)).context(PipelineSnafu)?; query_ctx.set_channel(Channel::Http); let query_ctx = Arc::new(query_ctx); @@ -445,8 +445,8 @@ pub async fn pipeline_dryrun( match params.pipeline { None => { - let version = - to_pipeline_version(params.pipeline_version).context(PipelineSnafu)?; + let version = to_pipeline_version(params.pipeline_version.as_deref()) + .context(PipelineSnafu)?; let pipeline_name = check_pipeline_name_exists(params.pipeline_name)?; let pipeline = handler .get_pipeline(&pipeline_name, version, query_ctx.clone()) @@ -486,7 +486,8 @@ pub async fn pipeline_dryrun( // is specified using query param. let pipeline_name = check_pipeline_name_exists(query_params.pipeline_name)?; - let version = to_pipeline_version(query_params.version).context(PipelineSnafu)?; + let version = + to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?; let ignore_errors = query_params.ignore_errors.unwrap_or(false); @@ -532,7 +533,7 @@ pub async fn log_ingester( reason: "table is required", })?; - let version = to_pipeline_version(query_params.version).context(PipelineSnafu)?; + let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?; let ignore_errors = query_params.ignore_errors.unwrap_or(false); diff --git a/src/servers/src/http/jaeger.rs b/src/servers/src/http/jaeger.rs index ed03f93aff..9ae56703b5 100644 --- a/src/servers/src/http/jaeger.rs +++ b/src/servers/src/http/jaeger.rs @@ -34,11 +34,11 @@ use crate::error::{ }; use crate::http::HttpRecordsOutput; use crate::metrics::METRIC_JAEGER_QUERY_ELAPSED; -use crate::otlp::trace::{ +use crate::otlp::trace::v0::{ DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, - TRACE_TABLE_NAME, }; +use crate::otlp::trace::TRACE_TABLE_NAME; use crate::query_handler::JaegerQueryHandlerRef; /// JaegerAPIResponse is the response of Jaeger HTTP API. diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index acb571a57d..491e6e4868 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -29,8 +29,7 @@ use opentelemetry_proto::tonic::collector::metrics::v1::{ use opentelemetry_proto::tonic::collector::trace::v1::{ ExportTraceServiceRequest, ExportTraceServiceResponse, }; -use pipeline::util::to_pipeline_version; -use pipeline::{PipelineDefinition, PipelineWay}; +use pipeline::PipelineWay; use prost::Message; use session::context::{Channel, QueryContext}; use snafu::prelude::*; @@ -75,6 +74,7 @@ pub async fn metrics( pub async fn traces( State(handler): State, TraceTableName(table_name): TraceTableName, + pipeline_info: PipelineInfo, Extension(mut query_ctx): Extension, bytes: Bytes, ) -> Result> { @@ -88,8 +88,29 @@ pub async fn traces( .start_timer(); let request = ExportTraceServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?; + + let pipeline = PipelineWay::from_name_and_default( + pipeline_info.pipeline_name.as_deref(), + pipeline_info.pipeline_version.as_deref(), + PipelineWay::OtlpTraceDirectV0, + ) + .context(PipelineSnafu)?; + + let pipeline_params = pipeline_info.pipeline_params; + + // here we use nightly feature `trait_upcasting` to convert handler to + // pipeline_handler + let pipeline_handler: Arc = handler.clone(); + handler - .traces(request, table_name, query_ctx) + .traces( + pipeline_handler, + request, + pipeline, + pipeline_params, + table_name, + query_ctx, + ) .await .map(|o| OtlpResponse { resp_body: ExportTraceServiceResponse { @@ -118,15 +139,12 @@ pub async fn logs( .start_timer(); let request = ExportLogsServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?; - let pipeline = if let Some(pipeline_name) = pipeline_info.pipeline_name { - PipelineWay::Pipeline(PipelineDefinition::from_name( - &pipeline_name, - to_pipeline_version(pipeline_info.pipeline_version).context(PipelineSnafu)?, - )) - } else { - PipelineWay::OtlpLogDirect(Box::new(select_info)) - }; - + let pipeline = PipelineWay::from_name_and_default( + pipeline_info.pipeline_name.as_deref(), + pipeline_info.pipeline_version.as_deref(), + PipelineWay::OtlpLogDirect(Box::new(select_info)), + ) + .context(PipelineSnafu)?; let pipeline_params = pipeline_info.pipeline_params; // here we use nightly feature `trait_upcasting` to convert handler to diff --git a/src/servers/src/otlp/logs.rs b/src/servers/src/otlp/logs.rs index bad05e88ab..c4b59a866a 100644 --- a/src/servers/src/otlp/logs.rs +++ b/src/servers/src/otlp/logs.rs @@ -32,7 +32,8 @@ use snafu::{ensure, ResultExt}; use super::trace::attributes::OtlpAnyValue; use super::utils::{bytes_to_hex_string, key_value_to_jsonb}; use crate::error::{ - IncompatibleSchemaSnafu, PipelineTransformSnafu, Result, UnsupportedJsonDataTypeForTagSnafu, + IncompatibleSchemaSnafu, NotSupportedSnafu, PipelineTransformSnafu, Result, + UnsupportedJsonDataTypeForTagSnafu, }; use crate::pipeline::run_pipeline; use crate::query_handler::PipelineHandlerRef; @@ -98,6 +99,10 @@ pub async fn to_grpc_insert_requests( let insert_requests = RowInsertRequests { inserts }; Ok((insert_requests, len)) } + _ => NotSupportedSnafu { + feat: "Unsupported pipeline for logs", + } + .fail(), } } diff --git a/src/servers/src/otlp/trace.rs b/src/servers/src/otlp/trace.rs index b1bff7344b..1ec8ce4825 100644 --- a/src/servers/src/otlp/trace.rs +++ b/src/servers/src/otlp/trace.rs @@ -12,183 +12,42 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::value::ValueData; -use api::v1::{ColumnDataType, RowInsertRequests}; -use common_grpc::precision::Precision; -use itertools::Itertools; -use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; -use opentelemetry_proto::tonic::common::v1::any_value; - -use self::span::{parse_span, TraceSpan, TraceSpans}; -use crate::error::Result; -use crate::otlp::utils::{make_column_data, make_string_column_data}; -use crate::row_writer::{self, MultiTableData, TableData}; - -const APPROXIMATE_COLUMN_COUNT: usize = 24; - -pub const TRACE_TABLE_NAME: &str = "opentelemetry_traces"; -pub const SERVICE_NAME_COLUMN: &str = "service_name"; -pub const TRACE_ID_COLUMN: &str = "trace_id"; -pub const TIMESTAMP_COLUMN: &str = "timestamp"; -pub const DURATION_NANO_COLUMN: &str = "duration_nano"; -pub const SPAN_ID_COLUMN: &str = "span_id"; -pub const SPAN_NAME_COLUMN: &str = "span_name"; -pub const SPAN_KIND_COLUMN: &str = "span_kind"; -pub const SPAN_ATTRIBUTES_COLUMN: &str = "span_attributes"; - -/// The span kind prefix in the database. -/// If the span kind is `server`, it will be stored as `SPAN_KIND_SERVER` in the database. -pub const SPAN_KIND_PREFIX: &str = "SPAN_KIND_"; - pub mod attributes; pub mod span; +pub mod v0; -/// Convert OpenTelemetry traces to SpanTraces -/// -/// See -/// -/// for data structure of OTLP traces. -pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans { - let span_size = request - .resource_spans - .iter() - .flat_map(|res| res.scope_spans.iter()) - .flat_map(|scope| scope.spans.iter()) - .count(); - let mut spans = Vec::with_capacity(span_size); - for resource_spans in request.resource_spans { - let resource_attrs = resource_spans - .resource - .map(|r| r.attributes) - .unwrap_or_default(); - let service_name = resource_attrs - .iter() - .find_or_first(|kv| kv.key == "service.name") - .and_then(|kv| kv.value.clone()) - .and_then(|v| match v.value { - Some(any_value::Value::StringValue(s)) => Some(s), - Some(any_value::Value::BytesValue(b)) => { - Some(String::from_utf8_lossy(&b).to_string()) - } - _ => None, - }); +use api::v1::RowInsertRequests; +use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; +use pipeline::{GreptimePipelineParams, PipelineWay}; +use session::context::QueryContextRef; - for scope_spans in resource_spans.scope_spans { - let scope = scope_spans.scope.unwrap_or_default(); - for span in scope_spans.spans { - spans.push(parse_span( - service_name.clone(), - &resource_attrs, - &scope, - span, - )); - } - } - } - spans -} +use crate::error::{NotSupportedSnafu, Result}; +use crate::query_handler::PipelineHandlerRef; + +pub const TRACE_TABLE_NAME: &str = "opentelemetry_traces"; /// Convert SpanTraces to GreptimeDB row insert requests. /// Returns `InsertRequests` and total number of rows to ingest pub fn to_grpc_insert_requests( + request: ExportTraceServiceRequest, + pipeline: PipelineWay, + pipeline_params: GreptimePipelineParams, table_name: String, - spans: TraceSpans, + query_ctx: &QueryContextRef, + pipeline_handler: PipelineHandlerRef, ) -> Result<(RowInsertRequests, usize)> { - let mut multi_table_writer = MultiTableData::default(); - let one_table_writer = multi_table_writer.get_or_default_table_data( - table_name, - APPROXIMATE_COLUMN_COUNT, - spans.len(), - ); - - for span in spans { - write_span_to_row(one_table_writer, span)?; - } - - Ok(multi_table_writer.into_row_insert_requests()) -} - -pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> { - let mut row = writer.alloc_one_row(); - - // write ts - row_writer::write_ts_to_nanos( - writer, - "timestamp", - Some(span.start_in_nanosecond as i64), - Precision::Nanosecond, - &mut row, - )?; - // write ts fields - let fields = vec![ - make_column_data( - "timestamp_end", - ColumnDataType::TimestampNanosecond, - ValueData::TimestampNanosecondValue(span.end_in_nanosecond as i64), + match pipeline { + PipelineWay::OtlpTraceDirectV0 => v0::v0_to_grpc_insert_requests( + request, + pipeline, + pipeline_params, + table_name, + query_ctx, + pipeline_handler, ), - make_column_data( - "duration_nano", - ColumnDataType::Uint64, - ValueData::U64Value(span.end_in_nanosecond - span.start_in_nanosecond), - ), - ]; - row_writer::write_fields(writer, fields.into_iter(), &mut row)?; - - if let Some(service_name) = span.service_name { - row_writer::write_tag(writer, "service_name", service_name, &mut row)?; + _ => NotSupportedSnafu { + feat: "Unsupported pipeline for logs", + } + .fail(), } - - // tags - let iter = vec![ - ("trace_id", span.trace_id), - ("span_id", span.span_id), - ("parent_span_id", span.parent_span_id), - ] - .into_iter() - .map(|(col, val)| (col.to_string(), val)); - row_writer::write_tags(writer, iter, &mut row)?; - - // write fields - let fields = vec![ - make_string_column_data("span_kind", span.span_kind), - make_string_column_data("span_name", span.span_name), - make_string_column_data("span_status_code", span.span_status_code), - make_string_column_data("span_status_message", span.span_status_message), - make_string_column_data("trace_state", span.trace_state), - ]; - row_writer::write_fields(writer, fields.into_iter(), &mut row)?; - - row_writer::write_json( - writer, - "span_attributes", - span.span_attributes.into(), - &mut row, - )?; - row_writer::write_json(writer, "span_events", span.span_events.into(), &mut row)?; - row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?; - - // write fields - let fields = vec![ - make_string_column_data("scope_name", span.scope_name), - make_string_column_data("scope_version", span.scope_version), - ]; - row_writer::write_fields(writer, fields.into_iter(), &mut row)?; - - row_writer::write_json( - writer, - "scope_attributes", - span.scope_attributes.into(), - &mut row, - )?; - - row_writer::write_json( - writer, - "resource_attributes", - span.resource_attributes.into(), - &mut row, - )?; - - writer.add_row(row); - - Ok(()) } diff --git a/src/servers/src/otlp/trace/v0.rs b/src/servers/src/otlp/trace/v0.rs new file mode 100644 index 0000000000..5783d5f241 --- /dev/null +++ b/src/servers/src/otlp/trace/v0.rs @@ -0,0 +1,198 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::value::ValueData; +use api::v1::{ColumnDataType, RowInsertRequests}; +use common_grpc::precision::Precision; +use itertools::Itertools; +use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; +use opentelemetry_proto::tonic::common::v1::any_value; +use pipeline::{GreptimePipelineParams, PipelineWay}; +use session::context::QueryContextRef; + +use super::span::{parse_span, TraceSpan, TraceSpans}; +use crate::error::Result; +use crate::otlp::utils::{make_column_data, make_string_column_data}; +use crate::query_handler::PipelineHandlerRef; +use crate::row_writer::{self, MultiTableData, TableData}; + +const APPROXIMATE_COLUMN_COUNT: usize = 24; + +pub const SERVICE_NAME_COLUMN: &str = "service_name"; +pub const TRACE_ID_COLUMN: &str = "trace_id"; +pub const TIMESTAMP_COLUMN: &str = "timestamp"; +pub const DURATION_NANO_COLUMN: &str = "duration_nano"; +pub const SPAN_ID_COLUMN: &str = "span_id"; +pub const SPAN_NAME_COLUMN: &str = "span_name"; +pub const SPAN_KIND_COLUMN: &str = "span_kind"; +pub const SPAN_ATTRIBUTES_COLUMN: &str = "span_attributes"; + +/// The span kind prefix in the database. +/// If the span kind is `server`, it will be stored as `SPAN_KIND_SERVER` in the database. +pub const SPAN_KIND_PREFIX: &str = "SPAN_KIND_"; + +/// Convert OpenTelemetry traces to SpanTraces +/// +/// See +/// +/// for data structure of OTLP traces. +pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans { + let span_size = request + .resource_spans + .iter() + .flat_map(|res| res.scope_spans.iter()) + .flat_map(|scope| scope.spans.iter()) + .count(); + let mut spans = Vec::with_capacity(span_size); + for resource_spans in request.resource_spans { + let resource_attrs = resource_spans + .resource + .map(|r| r.attributes) + .unwrap_or_default(); + let service_name = resource_attrs + .iter() + .find_or_first(|kv| kv.key == "service.name") + .and_then(|kv| kv.value.clone()) + .and_then(|v| match v.value { + Some(any_value::Value::StringValue(s)) => Some(s), + Some(any_value::Value::BytesValue(b)) => { + Some(String::from_utf8_lossy(&b).to_string()) + } + _ => None, + }); + + for scope_spans in resource_spans.scope_spans { + let scope = scope_spans.scope.unwrap_or_default(); + for span in scope_spans.spans { + spans.push(parse_span( + service_name.clone(), + &resource_attrs, + &scope, + span, + )); + } + } + } + spans +} + +/// Convert SpanTraces to GreptimeDB row insert requests. +/// Returns `InsertRequests` and total number of rows to ingest +pub fn v0_to_grpc_insert_requests( + request: ExportTraceServiceRequest, + _pipeline: PipelineWay, + _pipeline_params: GreptimePipelineParams, + table_name: String, + _query_ctx: &QueryContextRef, + _pipeline_handler: PipelineHandlerRef, +) -> Result<(RowInsertRequests, usize)> { + let spans = parse(request); + let mut multi_table_writer = MultiTableData::default(); + let one_table_writer = multi_table_writer.get_or_default_table_data( + table_name, + APPROXIMATE_COLUMN_COUNT, + spans.len(), + ); + + for span in spans { + write_span_to_row(one_table_writer, span)?; + } + + Ok(multi_table_writer.into_row_insert_requests()) +} + +pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> { + let mut row = writer.alloc_one_row(); + + // write ts + row_writer::write_ts_to_nanos( + writer, + "timestamp", + Some(span.start_in_nanosecond as i64), + Precision::Nanosecond, + &mut row, + )?; + // write ts fields + let fields = vec![ + make_column_data( + "timestamp_end", + ColumnDataType::TimestampNanosecond, + ValueData::TimestampNanosecondValue(span.end_in_nanosecond as i64), + ), + make_column_data( + "duration_nano", + ColumnDataType::Uint64, + ValueData::U64Value(span.end_in_nanosecond - span.start_in_nanosecond), + ), + ]; + row_writer::write_fields(writer, fields.into_iter(), &mut row)?; + + if let Some(service_name) = span.service_name { + row_writer::write_tag(writer, "service_name", service_name, &mut row)?; + } + + // tags + let iter = vec![ + ("trace_id", span.trace_id), + ("span_id", span.span_id), + ("parent_span_id", span.parent_span_id), + ] + .into_iter() + .map(|(col, val)| (col.to_string(), val)); + row_writer::write_tags(writer, iter, &mut row)?; + + // write fields + let fields = vec![ + make_string_column_data("span_kind", span.span_kind), + make_string_column_data("span_name", span.span_name), + make_string_column_data("span_status_code", span.span_status_code), + make_string_column_data("span_status_message", span.span_status_message), + make_string_column_data("trace_state", span.trace_state), + ]; + row_writer::write_fields(writer, fields.into_iter(), &mut row)?; + + row_writer::write_json( + writer, + "span_attributes", + span.span_attributes.into(), + &mut row, + )?; + row_writer::write_json(writer, "span_events", span.span_events.into(), &mut row)?; + row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?; + + // write fields + let fields = vec![ + make_string_column_data("scope_name", span.scope_name), + make_string_column_data("scope_version", span.scope_version), + ]; + row_writer::write_fields(writer, fields.into_iter(), &mut row)?; + + row_writer::write_json( + writer, + "scope_attributes", + span.scope_attributes.into(), + &mut row, + )?; + + row_writer::write_json( + writer, + "resource_attributes", + span.resource_attributes.into(), + &mut row, + )?; + + writer.add_row(row); + + Ok(()) +} diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index b6ee77aa2b..f9b3a5637d 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -107,7 +107,10 @@ pub trait OpenTelemetryProtocolHandler: PipelineHandler { /// Handling opentelemetry traces request async fn traces( &self, + pipeline_handler: PipelineHandlerRef, request: ExportTraceServiceRequest, + pipeline: PipelineWay, + pipeline_params: GreptimePipelineParams, table_name: String, ctx: QueryContextRef, ) -> Result;