refactor: add pipeline concept to OTLP traces and remove OTLP over gRPC (#5605)

This commit is contained in:
Ning Sun
2025-02-27 06:01:45 -08:00
committed by GitHub
parent c0c87652c3
commit c9d70e0e28
17 changed files with 369 additions and 301 deletions

View File

@@ -36,11 +36,11 @@ use servers::error::{
TableNotFoundSnafu,
};
use servers::http::jaeger::QueryTraceParams;
use servers::otlp::trace::{
use servers::otlp::trace::v0::{
DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_ID_COLUMN,
SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
TRACE_TABLE_NAME,
};
use servers::otlp::trace::TRACE_TABLE_NAME;
use servers::query_handler::JaegerQueryHandler;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};

View File

@@ -72,7 +72,10 @@ impl OpenTelemetryProtocolHandler for Instance {
#[tracing::instrument(skip_all)]
async fn traces(
&self,
pipeline_handler: PipelineHandlerRef,
request: ExportTraceServiceRequest,
pipeline: PipelineWay,
pipeline_params: GreptimePipelineParams,
table_name: String,
ctx: QueryContextRef,
) -> ServerResult<Output> {
@@ -87,9 +90,14 @@ impl OpenTelemetryProtocolHandler for Instance {
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_execute(ctx.clone())?;
let spans = otlp::trace::parse(request);
let (requests, rows) = otlp::trace::to_grpc_insert_requests(table_name, spans)?;
let (requests, rows) = otlp::trace::to_grpc_insert_requests(
request,
pipeline,
pipeline_params,
table_name,
&ctx,
pipeline_handler,
)?;
OTLP_TRACES_ROWS.inc_by(rows as u64);

View File

@@ -164,7 +164,6 @@ where
let grpc_server = builder
.database_handler(greptime_request_handler.clone())
.prometheus_handler(self.instance.clone(), user_provider.clone())
.otlp_handler(self.instance.clone(), user_provider)
.flight_handler(Arc::new(greptime_request_handler))
.build();
Ok(grpc_server)

View File

@@ -20,12 +20,9 @@ pub mod processor;
pub mod transform;
pub mod value;
use std::sync::Arc;
use error::{
IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, YamlLoadSnafu, YamlParseSnafu,
};
use itertools::Itertools;
use processor::{Processor, Processors};
use snafu::{ensure, OptionExt, ResultExt};
use transform::{Transformer, Transforms};
@@ -34,7 +31,6 @@ use yaml_rust::YamlLoader;
use crate::dispatcher::{Dispatcher, Rule};
use crate::etl::error::Result;
use crate::{GreptimeTransformer, PipelineVersion};
const DESCRIPTION: &str = "description";
const PROCESSORS: &str = "processors";
@@ -214,57 +210,6 @@ pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str
.context(IntermediateKeyIndexSnafu { kind, key })
}
/// SelectInfo is used to store the selected keys from OpenTelemetry record attrs
/// The key is used to uplift value from the attributes and serve as column name in the table
#[derive(Default)]
pub struct SelectInfo {
pub keys: Vec<String>,
}
/// Try to convert a string to SelectInfo
/// The string should be a comma-separated list of keys
/// example: "key1,key2,key3"
/// The keys will be sorted and deduplicated
impl From<String> for SelectInfo {
fn from(value: String) -> Self {
let mut keys: Vec<String> = value.split(',').map(|s| s.to_string()).sorted().collect();
keys.dedup();
SelectInfo { keys }
}
}
impl SelectInfo {
pub fn is_empty(&self) -> bool {
self.keys.is_empty()
}
}
pub const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity";
/// Enum for holding information of a pipeline, which is either pipeline itself,
/// or information that be used to retrieve a pipeline from `PipelineHandler`
pub enum PipelineDefinition {
Resolved(Arc<Pipeline<GreptimeTransformer>>),
ByNameAndValue((String, PipelineVersion)),
GreptimeIdentityPipeline,
}
impl PipelineDefinition {
pub fn from_name(name: &str, version: PipelineVersion) -> Self {
if name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME {
Self::GreptimeIdentityPipeline
} else {
Self::ByNameAndValue((name.to_owned(), version))
}
}
}
pub enum PipelineWay {
OtlpLogDirect(Box<SelectInfo>),
Pipeline(PipelineDefinition),
}
#[cfg(test)]
mod tests {
use api::v1::Rows;

View File

@@ -25,10 +25,10 @@ pub use etl::transform::{GreptimeTransformer, Transformer};
pub use etl::value::{Array, Map, Value};
pub use etl::{
error as etl_error, json_array_to_intermediate_state, json_to_intermediate_state, parse,
Content, DispatchedTo, Pipeline, PipelineDefinition, PipelineExecOutput, PipelineMap,
PipelineWay, SelectInfo, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
Content, DispatchedTo, Pipeline, PipelineExecOutput, PipelineMap,
};
pub use manager::{
error, pipeline_operator, table, util, PipelineInfo, PipelineRef, PipelineTableRef,
PipelineVersion,
error, pipeline_operator, table, util, PipelineDefinition, PipelineInfo, PipelineRef,
PipelineTableRef, PipelineVersion, PipelineWay, SelectInfo,
GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
};

View File

@@ -16,6 +16,8 @@ use std::sync::Arc;
use common_time::Timestamp;
use datatypes::timestamp::TimestampNanosecond;
use itertools::Itertools;
use util::to_pipeline_version;
use crate::table::PipelineTable;
use crate::{GreptimeTransformer, Pipeline};
@@ -37,3 +39,78 @@ pub type PipelineInfo = (Timestamp, PipelineRef);
pub type PipelineTableRef = Arc<PipelineTable>;
pub type PipelineRef = Arc<Pipeline<GreptimeTransformer>>;
/// SelectInfo is used to store the selected keys from OpenTelemetry record attrs
/// The key is used to uplift value from the attributes and serve as column name in the table
#[derive(Default)]
pub struct SelectInfo {
pub keys: Vec<String>,
}
/// Try to convert a string to SelectInfo
/// The string should be a comma-separated list of keys
/// example: "key1,key2,key3"
/// The keys will be sorted and deduplicated
impl From<String> for SelectInfo {
fn from(value: String) -> Self {
let mut keys: Vec<String> = value.split(',').map(|s| s.to_string()).sorted().collect();
keys.dedup();
SelectInfo { keys }
}
}
impl SelectInfo {
pub fn is_empty(&self) -> bool {
self.keys.is_empty()
}
}
pub const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity";
pub const GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME: &str = "greptime_trace_v1";
/// Enum for holding information of a pipeline, which is either pipeline itself,
/// or information that be used to retrieve a pipeline from `PipelineHandler`
pub enum PipelineDefinition {
Resolved(Arc<Pipeline<GreptimeTransformer>>),
ByNameAndValue((String, PipelineVersion)),
GreptimeIdentityPipeline,
}
impl PipelineDefinition {
pub fn from_name(name: &str, version: PipelineVersion) -> Self {
if name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME {
Self::GreptimeIdentityPipeline
} else {
Self::ByNameAndValue((name.to_owned(), version))
}
}
}
pub enum PipelineWay {
OtlpLogDirect(Box<SelectInfo>),
Pipeline(PipelineDefinition),
OtlpTraceDirectV0,
OtlpTraceDirectV1,
}
impl PipelineWay {
pub fn from_name_and_default(
name: Option<&str>,
version: Option<&str>,
default_pipeline: PipelineWay,
) -> error::Result<PipelineWay> {
if let Some(pipeline_name) = name {
if pipeline_name == GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME {
Ok(PipelineWay::OtlpTraceDirectV1)
} else {
Ok(PipelineWay::Pipeline(PipelineDefinition::from_name(
pipeline_name,
to_pipeline_version(version)?,
)))
}
} else {
Ok(default_pipeline)
}
}
}

View File

@@ -23,10 +23,10 @@ use crate::table::{
};
use crate::PipelineVersion;
pub fn to_pipeline_version(version_str: Option<String>) -> Result<PipelineVersion> {
pub fn to_pipeline_version(version_str: Option<&str>) -> Result<PipelineVersion> {
match version_str {
Some(version) => {
let ts = Timestamp::from_str_utc(&version)
let ts = Timestamp::from_str_utc(version)
.map_err(|_| InvalidPipelineVersionSnafu { version }.build())?;
Ok(Some(TimestampNanosecond(ts)))
}
@@ -73,14 +73,14 @@ mod tests {
assert!(none_result.is_ok());
assert!(none_result.unwrap().is_none());
let some_result = to_pipeline_version(Some("2023-01-01 00:00:00Z".to_string()));
let some_result = to_pipeline_version(Some("2023-01-01 00:00:00Z"));
assert!(some_result.is_ok());
assert_eq!(
some_result.unwrap(),
Some(TimestampNanosecond::new(1672531200000000000))
);
let invalid = to_pipeline_version(Some("invalid".to_string()));
let invalid = to_pipeline_version(Some("invalid"));
assert!(invalid.is_err());
}

View File

@@ -18,7 +18,6 @@ mod cancellation;
mod database;
pub mod flight;
pub mod greptime_handler;
mod otlp;
pub mod prom_query_gateway;
pub mod region_server;

View File

@@ -29,12 +29,6 @@ pub struct AuthMiddlewareLayer {
user_provider: Option<UserProviderRef>,
}
impl AuthMiddlewareLayer {
pub fn with(user_provider: Option<UserProviderRef>) -> Self {
Self { user_provider }
}
}
impl<S> Layer<S> for AuthMiddlewareLayer {
type Service = AuthMiddleware<S>;

View File

@@ -19,25 +19,18 @@ use arrow_flight::flight_service_server::FlightServiceServer;
use auth::UserProviderRef;
use common_grpc::error::{Error, InvalidConfigFilePathSnafu, Result};
use common_runtime::Runtime;
use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer;
use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer;
use snafu::ResultExt;
use tokio::sync::Mutex;
use tonic::codec::CompressionEncoding;
use tonic::service::RoutesBuilder;
use tonic::transport::{Identity, ServerTlsConfig};
use tower::ServiceBuilder;
use super::flight::{FlightCraftRef, FlightCraftWrapper};
use super::region_server::{RegionServerHandlerRef, RegionServerRequestHandler};
use super::{GrpcServer, GrpcServerConfig};
use crate::grpc::authorize::AuthMiddlewareLayer;
use crate::grpc::database::DatabaseService;
use crate::grpc::greptime_handler::GreptimeRequestHandler;
use crate::grpc::otlp::OtlpService;
use crate::grpc::prom_query_gateway::PrometheusGatewayService;
use crate::prometheus_handler::PrometheusHandlerRef;
use crate::query_handler::OpenTelemetryProtocolHandlerRef;
use crate::tls::TlsOption;
/// Add a gRPC service (`service`) to a `builder`([RoutesBuilder]).
@@ -127,37 +120,6 @@ impl GrpcServerBuilder {
self
}
/// Add handler for OpenTelemetry Protocol (OTLP) requests.
pub fn otlp_handler(
mut self,
otlp_handler: OpenTelemetryProtocolHandlerRef,
user_provider: Option<UserProviderRef>,
) -> Self {
let tracing_service = TraceServiceServer::new(OtlpService::new(otlp_handler.clone()))
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd);
let trace_server = ServiceBuilder::new()
.layer(AuthMiddlewareLayer::with(user_provider.clone()))
.service(tracing_service);
self.routes_builder.add_service(trace_server);
let metrics_service = MetricsServiceServer::new(OtlpService::new(otlp_handler))
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd);
let metrics_server = ServiceBuilder::new()
.layer(AuthMiddlewareLayer::with(user_provider))
.service(metrics_service);
self.routes_builder.add_service(metrics_server);
self
}
pub fn routes_builder_mut(&mut self) -> &mut RoutesBuilder {
&mut self.routes_builder
}

View File

@@ -205,7 +205,7 @@ pub async fn delete_pipeline(
reason: "version is required",
})?;
let version = to_pipeline_version(Some(version_str.clone())).context(PipelineSnafu)?;
let version = to_pipeline_version(Some(&version_str)).context(PipelineSnafu)?;
query_ctx.set_channel(Channel::Http);
let query_ctx = Arc::new(query_ctx);
@@ -445,8 +445,8 @@ pub async fn pipeline_dryrun(
match params.pipeline {
None => {
let version =
to_pipeline_version(params.pipeline_version).context(PipelineSnafu)?;
let version = to_pipeline_version(params.pipeline_version.as_deref())
.context(PipelineSnafu)?;
let pipeline_name = check_pipeline_name_exists(params.pipeline_name)?;
let pipeline = handler
.get_pipeline(&pipeline_name, version, query_ctx.clone())
@@ -486,7 +486,8 @@ pub async fn pipeline_dryrun(
// is specified using query param.
let pipeline_name = check_pipeline_name_exists(query_params.pipeline_name)?;
let version = to_pipeline_version(query_params.version).context(PipelineSnafu)?;
let version =
to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
let ignore_errors = query_params.ignore_errors.unwrap_or(false);
@@ -532,7 +533,7 @@ pub async fn log_ingester(
reason: "table is required",
})?;
let version = to_pipeline_version(query_params.version).context(PipelineSnafu)?;
let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
let ignore_errors = query_params.ignore_errors.unwrap_or(false);

View File

@@ -34,11 +34,11 @@ use crate::error::{
};
use crate::http::HttpRecordsOutput;
use crate::metrics::METRIC_JAEGER_QUERY_ELAPSED;
use crate::otlp::trace::{
use crate::otlp::trace::v0::{
DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_ID_COLUMN,
SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
TRACE_TABLE_NAME,
};
use crate::otlp::trace::TRACE_TABLE_NAME;
use crate::query_handler::JaegerQueryHandlerRef;
/// JaegerAPIResponse is the response of Jaeger HTTP API.

View File

@@ -29,8 +29,7 @@ use opentelemetry_proto::tonic::collector::metrics::v1::{
use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use pipeline::util::to_pipeline_version;
use pipeline::{PipelineDefinition, PipelineWay};
use pipeline::PipelineWay;
use prost::Message;
use session::context::{Channel, QueryContext};
use snafu::prelude::*;
@@ -75,6 +74,7 @@ pub async fn metrics(
pub async fn traces(
State(handler): State<OpenTelemetryProtocolHandlerRef>,
TraceTableName(table_name): TraceTableName,
pipeline_info: PipelineInfo,
Extension(mut query_ctx): Extension<QueryContext>,
bytes: Bytes,
) -> Result<OtlpResponse<ExportTraceServiceResponse>> {
@@ -88,8 +88,29 @@ pub async fn traces(
.start_timer();
let request =
ExportTraceServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;
let pipeline = PipelineWay::from_name_and_default(
pipeline_info.pipeline_name.as_deref(),
pipeline_info.pipeline_version.as_deref(),
PipelineWay::OtlpTraceDirectV0,
)
.context(PipelineSnafu)?;
let pipeline_params = pipeline_info.pipeline_params;
// here we use nightly feature `trait_upcasting` to convert handler to
// pipeline_handler
let pipeline_handler: Arc<dyn PipelineHandler + Send + Sync> = handler.clone();
handler
.traces(request, table_name, query_ctx)
.traces(
pipeline_handler,
request,
pipeline,
pipeline_params,
table_name,
query_ctx,
)
.await
.map(|o| OtlpResponse {
resp_body: ExportTraceServiceResponse {
@@ -118,15 +139,12 @@ pub async fn logs(
.start_timer();
let request = ExportLogsServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;
let pipeline = if let Some(pipeline_name) = pipeline_info.pipeline_name {
PipelineWay::Pipeline(PipelineDefinition::from_name(
&pipeline_name,
to_pipeline_version(pipeline_info.pipeline_version).context(PipelineSnafu)?,
))
} else {
PipelineWay::OtlpLogDirect(Box::new(select_info))
};
let pipeline = PipelineWay::from_name_and_default(
pipeline_info.pipeline_name.as_deref(),
pipeline_info.pipeline_version.as_deref(),
PipelineWay::OtlpLogDirect(Box::new(select_info)),
)
.context(PipelineSnafu)?;
let pipeline_params = pipeline_info.pipeline_params;
// here we use nightly feature `trait_upcasting` to convert handler to

View File

@@ -32,7 +32,8 @@ use snafu::{ensure, ResultExt};
use super::trace::attributes::OtlpAnyValue;
use super::utils::{bytes_to_hex_string, key_value_to_jsonb};
use crate::error::{
IncompatibleSchemaSnafu, PipelineTransformSnafu, Result, UnsupportedJsonDataTypeForTagSnafu,
IncompatibleSchemaSnafu, NotSupportedSnafu, PipelineTransformSnafu, Result,
UnsupportedJsonDataTypeForTagSnafu,
};
use crate::pipeline::run_pipeline;
use crate::query_handler::PipelineHandlerRef;
@@ -98,6 +99,10 @@ pub async fn to_grpc_insert_requests(
let insert_requests = RowInsertRequests { inserts };
Ok((insert_requests, len))
}
_ => NotSupportedSnafu {
feat: "Unsupported pipeline for logs",
}
.fail(),
}
}

View File

@@ -12,183 +12,42 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, RowInsertRequests};
use common_grpc::precision::Precision;
use itertools::Itertools;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use opentelemetry_proto::tonic::common::v1::any_value;
use self::span::{parse_span, TraceSpan, TraceSpans};
use crate::error::Result;
use crate::otlp::utils::{make_column_data, make_string_column_data};
use crate::row_writer::{self, MultiTableData, TableData};
const APPROXIMATE_COLUMN_COUNT: usize = 24;
pub const TRACE_TABLE_NAME: &str = "opentelemetry_traces";
pub const SERVICE_NAME_COLUMN: &str = "service_name";
pub const TRACE_ID_COLUMN: &str = "trace_id";
pub const TIMESTAMP_COLUMN: &str = "timestamp";
pub const DURATION_NANO_COLUMN: &str = "duration_nano";
pub const SPAN_ID_COLUMN: &str = "span_id";
pub const SPAN_NAME_COLUMN: &str = "span_name";
pub const SPAN_KIND_COLUMN: &str = "span_kind";
pub const SPAN_ATTRIBUTES_COLUMN: &str = "span_attributes";
/// The span kind prefix in the database.
/// If the span kind is `server`, it will be stored as `SPAN_KIND_SERVER` in the database.
pub const SPAN_KIND_PREFIX: &str = "SPAN_KIND_";
pub mod attributes;
pub mod span;
pub mod v0;
/// Convert OpenTelemetry traces to SpanTraces
///
/// See
/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto>
/// for data structure of OTLP traces.
pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans {
let span_size = request
.resource_spans
.iter()
.flat_map(|res| res.scope_spans.iter())
.flat_map(|scope| scope.spans.iter())
.count();
let mut spans = Vec::with_capacity(span_size);
for resource_spans in request.resource_spans {
let resource_attrs = resource_spans
.resource
.map(|r| r.attributes)
.unwrap_or_default();
let service_name = resource_attrs
.iter()
.find_or_first(|kv| kv.key == "service.name")
.and_then(|kv| kv.value.clone())
.and_then(|v| match v.value {
Some(any_value::Value::StringValue(s)) => Some(s),
Some(any_value::Value::BytesValue(b)) => {
Some(String::from_utf8_lossy(&b).to_string())
}
_ => None,
});
use api::v1::RowInsertRequests;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use pipeline::{GreptimePipelineParams, PipelineWay};
use session::context::QueryContextRef;
for scope_spans in resource_spans.scope_spans {
let scope = scope_spans.scope.unwrap_or_default();
for span in scope_spans.spans {
spans.push(parse_span(
service_name.clone(),
&resource_attrs,
&scope,
span,
));
}
}
}
spans
}
use crate::error::{NotSupportedSnafu, Result};
use crate::query_handler::PipelineHandlerRef;
pub const TRACE_TABLE_NAME: &str = "opentelemetry_traces";
/// Convert SpanTraces to GreptimeDB row insert requests.
/// Returns `InsertRequests` and total number of rows to ingest
pub fn to_grpc_insert_requests(
request: ExportTraceServiceRequest,
pipeline: PipelineWay,
pipeline_params: GreptimePipelineParams,
table_name: String,
spans: TraceSpans,
query_ctx: &QueryContextRef,
pipeline_handler: PipelineHandlerRef,
) -> Result<(RowInsertRequests, usize)> {
let mut multi_table_writer = MultiTableData::default();
let one_table_writer = multi_table_writer.get_or_default_table_data(
table_name,
APPROXIMATE_COLUMN_COUNT,
spans.len(),
);
for span in spans {
write_span_to_row(one_table_writer, span)?;
}
Ok(multi_table_writer.into_row_insert_requests())
}
pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
let mut row = writer.alloc_one_row();
// write ts
row_writer::write_ts_to_nanos(
writer,
"timestamp",
Some(span.start_in_nanosecond as i64),
Precision::Nanosecond,
&mut row,
)?;
// write ts fields
let fields = vec![
make_column_data(
"timestamp_end",
ColumnDataType::TimestampNanosecond,
ValueData::TimestampNanosecondValue(span.end_in_nanosecond as i64),
match pipeline {
PipelineWay::OtlpTraceDirectV0 => v0::v0_to_grpc_insert_requests(
request,
pipeline,
pipeline_params,
table_name,
query_ctx,
pipeline_handler,
),
make_column_data(
"duration_nano",
ColumnDataType::Uint64,
ValueData::U64Value(span.end_in_nanosecond - span.start_in_nanosecond),
),
];
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
if let Some(service_name) = span.service_name {
row_writer::write_tag(writer, "service_name", service_name, &mut row)?;
_ => NotSupportedSnafu {
feat: "Unsupported pipeline for logs",
}
.fail(),
}
// tags
let iter = vec![
("trace_id", span.trace_id),
("span_id", span.span_id),
("parent_span_id", span.parent_span_id),
]
.into_iter()
.map(|(col, val)| (col.to_string(), val));
row_writer::write_tags(writer, iter, &mut row)?;
// write fields
let fields = vec![
make_string_column_data("span_kind", span.span_kind),
make_string_column_data("span_name", span.span_name),
make_string_column_data("span_status_code", span.span_status_code),
make_string_column_data("span_status_message", span.span_status_message),
make_string_column_data("trace_state", span.trace_state),
];
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
row_writer::write_json(
writer,
"span_attributes",
span.span_attributes.into(),
&mut row,
)?;
row_writer::write_json(writer, "span_events", span.span_events.into(), &mut row)?;
row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
// write fields
let fields = vec![
make_string_column_data("scope_name", span.scope_name),
make_string_column_data("scope_version", span.scope_version),
];
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
row_writer::write_json(
writer,
"scope_attributes",
span.scope_attributes.into(),
&mut row,
)?;
row_writer::write_json(
writer,
"resource_attributes",
span.resource_attributes.into(),
&mut row,
)?;
writer.add_row(row);
Ok(())
}

View File

@@ -0,0 +1,198 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, RowInsertRequests};
use common_grpc::precision::Precision;
use itertools::Itertools;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use opentelemetry_proto::tonic::common::v1::any_value;
use pipeline::{GreptimePipelineParams, PipelineWay};
use session::context::QueryContextRef;
use super::span::{parse_span, TraceSpan, TraceSpans};
use crate::error::Result;
use crate::otlp::utils::{make_column_data, make_string_column_data};
use crate::query_handler::PipelineHandlerRef;
use crate::row_writer::{self, MultiTableData, TableData};
const APPROXIMATE_COLUMN_COUNT: usize = 24;
pub const SERVICE_NAME_COLUMN: &str = "service_name";
pub const TRACE_ID_COLUMN: &str = "trace_id";
pub const TIMESTAMP_COLUMN: &str = "timestamp";
pub const DURATION_NANO_COLUMN: &str = "duration_nano";
pub const SPAN_ID_COLUMN: &str = "span_id";
pub const SPAN_NAME_COLUMN: &str = "span_name";
pub const SPAN_KIND_COLUMN: &str = "span_kind";
pub const SPAN_ATTRIBUTES_COLUMN: &str = "span_attributes";
/// The span kind prefix in the database.
/// If the span kind is `server`, it will be stored as `SPAN_KIND_SERVER` in the database.
pub const SPAN_KIND_PREFIX: &str = "SPAN_KIND_";
/// Convert OpenTelemetry traces to SpanTraces
///
/// See
/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto>
/// for data structure of OTLP traces.
pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans {
let span_size = request
.resource_spans
.iter()
.flat_map(|res| res.scope_spans.iter())
.flat_map(|scope| scope.spans.iter())
.count();
let mut spans = Vec::with_capacity(span_size);
for resource_spans in request.resource_spans {
let resource_attrs = resource_spans
.resource
.map(|r| r.attributes)
.unwrap_or_default();
let service_name = resource_attrs
.iter()
.find_or_first(|kv| kv.key == "service.name")
.and_then(|kv| kv.value.clone())
.and_then(|v| match v.value {
Some(any_value::Value::StringValue(s)) => Some(s),
Some(any_value::Value::BytesValue(b)) => {
Some(String::from_utf8_lossy(&b).to_string())
}
_ => None,
});
for scope_spans in resource_spans.scope_spans {
let scope = scope_spans.scope.unwrap_or_default();
for span in scope_spans.spans {
spans.push(parse_span(
service_name.clone(),
&resource_attrs,
&scope,
span,
));
}
}
}
spans
}
/// Convert SpanTraces to GreptimeDB row insert requests.
/// Returns `InsertRequests` and total number of rows to ingest
pub fn v0_to_grpc_insert_requests(
request: ExportTraceServiceRequest,
_pipeline: PipelineWay,
_pipeline_params: GreptimePipelineParams,
table_name: String,
_query_ctx: &QueryContextRef,
_pipeline_handler: PipelineHandlerRef,
) -> Result<(RowInsertRequests, usize)> {
let spans = parse(request);
let mut multi_table_writer = MultiTableData::default();
let one_table_writer = multi_table_writer.get_or_default_table_data(
table_name,
APPROXIMATE_COLUMN_COUNT,
spans.len(),
);
for span in spans {
write_span_to_row(one_table_writer, span)?;
}
Ok(multi_table_writer.into_row_insert_requests())
}
pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
let mut row = writer.alloc_one_row();
// write ts
row_writer::write_ts_to_nanos(
writer,
"timestamp",
Some(span.start_in_nanosecond as i64),
Precision::Nanosecond,
&mut row,
)?;
// write ts fields
let fields = vec![
make_column_data(
"timestamp_end",
ColumnDataType::TimestampNanosecond,
ValueData::TimestampNanosecondValue(span.end_in_nanosecond as i64),
),
make_column_data(
"duration_nano",
ColumnDataType::Uint64,
ValueData::U64Value(span.end_in_nanosecond - span.start_in_nanosecond),
),
];
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
if let Some(service_name) = span.service_name {
row_writer::write_tag(writer, "service_name", service_name, &mut row)?;
}
// tags
let iter = vec![
("trace_id", span.trace_id),
("span_id", span.span_id),
("parent_span_id", span.parent_span_id),
]
.into_iter()
.map(|(col, val)| (col.to_string(), val));
row_writer::write_tags(writer, iter, &mut row)?;
// write fields
let fields = vec![
make_string_column_data("span_kind", span.span_kind),
make_string_column_data("span_name", span.span_name),
make_string_column_data("span_status_code", span.span_status_code),
make_string_column_data("span_status_message", span.span_status_message),
make_string_column_data("trace_state", span.trace_state),
];
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
row_writer::write_json(
writer,
"span_attributes",
span.span_attributes.into(),
&mut row,
)?;
row_writer::write_json(writer, "span_events", span.span_events.into(), &mut row)?;
row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
// write fields
let fields = vec![
make_string_column_data("scope_name", span.scope_name),
make_string_column_data("scope_version", span.scope_version),
];
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
row_writer::write_json(
writer,
"scope_attributes",
span.scope_attributes.into(),
&mut row,
)?;
row_writer::write_json(
writer,
"resource_attributes",
span.resource_attributes.into(),
&mut row,
)?;
writer.add_row(row);
Ok(())
}

View File

@@ -107,7 +107,10 @@ pub trait OpenTelemetryProtocolHandler: PipelineHandler {
/// Handling opentelemetry traces request
async fn traces(
&self,
pipeline_handler: PipelineHandlerRef,
request: ExportTraceServiceRequest,
pipeline: PipelineWay,
pipeline_params: GreptimePipelineParams,
table_name: String,
ctx: QueryContextRef,
) -> Result<Output>;