mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-26 18:00:41 +00:00
feat: partial success in trace ingestion (#7892)
* feat: impl partial success Signed-off-by: shuiyisong <xixing.sys@gmail.com> * refactor: grouping by resource and scope Signed-off-by: shuiyisong <xixing.sys@gmail.com> * chore: remove unused code Signed-off-by: shuiyisong <xixing.sys@gmail.com> * chore: rebase main & fix clippy Signed-off-by: shuiyisong <xixing.sys@gmail.com> * chore: add trace ingestion failure counter Signed-off-by: shuiyisong <xixing.sys@gmail.com> * fix: address comments Signed-off-by: shuiyisong <xixing.sys@gmail.com> * fix: update status list and remove TODO Signed-off-by: shuiyisong <xixing.sys@gmail.com> * fix: address comments Signed-off-by: shuiyisong <xixing.sys@gmail.com> * fix: fmt Signed-off-by: shuiyisong <xixing.sys@gmail.com> * chore: add more tests Signed-off-by: shuiyisong <xixing.sys@gmail.com> * fix: fmt Signed-off-by: shuiyisong <xixing.sys@gmail.com> --------- Signed-off-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -5255,6 +5255,7 @@ dependencies = [
|
||||
"humantime",
|
||||
"humantime-serde",
|
||||
"hyper-util",
|
||||
"itertools 0.14.0",
|
||||
"lazy_static",
|
||||
"log-query",
|
||||
"meta-client",
|
||||
|
||||
@@ -52,6 +52,7 @@ futures.workspace = true
|
||||
hostname.workspace = true
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
itertools.workspace = true
|
||||
lazy_static.workspace = true
|
||||
log-query.workspace = true
|
||||
meta-client.workspace = true
|
||||
|
||||
@@ -19,9 +19,11 @@ use api::v1::{ColumnDataType, RowInsertRequests};
|
||||
use async_trait::async_trait;
|
||||
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
|
||||
use client::Output;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_error::ext::{BoxedError, ErrorExt};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
|
||||
use common_telemetry::tracing;
|
||||
use itertools::Itertools;
|
||||
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
|
||||
use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
|
||||
@@ -30,17 +32,57 @@ use servers::error::{self, AuthSnafu, Result as ServerResult};
|
||||
use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
|
||||
use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
|
||||
use servers::otlp;
|
||||
use servers::otlp::trace::TraceAuxData;
|
||||
use servers::otlp::trace::coerce::{
|
||||
coerce_value_data, is_supported_trace_coercion, resolve_new_trace_column_type,
|
||||
trace_value_datatype,
|
||||
};
|
||||
use servers::query_handler::{OpenTelemetryProtocolHandler, PipelineHandlerRef};
|
||||
use servers::otlp::trace::span::{TraceSpan, TraceSpanGroup};
|
||||
use servers::query_handler::{
|
||||
OpenTelemetryProtocolHandler, PipelineHandlerRef, TraceIngestOutcome,
|
||||
};
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ResultExt;
|
||||
use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
|
||||
|
||||
use crate::instance::Instance;
|
||||
use crate::metrics::{OTLP_LOGS_ROWS, OTLP_METRICS_ROWS, OTLP_TRACES_ROWS};
|
||||
use crate::metrics::{
|
||||
OTLP_LOGS_ROWS, OTLP_METRICS_ROWS, OTLP_TRACES_FAILURE_COUNT, OTLP_TRACES_ROWS,
|
||||
};
|
||||
|
||||
const TRACE_INGEST_CHUNK_SIZE: usize = 64;
|
||||
const TRACE_FAILURE_MESSAGE_LIMIT: usize = 4;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
enum ChunkFailureReaction {
|
||||
RetryPerSpan,
|
||||
DiscardChunk,
|
||||
Propagate,
|
||||
}
|
||||
|
||||
impl ChunkFailureReaction {
|
||||
fn as_metric_label(self) -> &'static str {
|
||||
match self {
|
||||
Self::RetryPerSpan => "retry_per_span",
|
||||
Self::DiscardChunk => "discard_chunk",
|
||||
Self::Propagate => "propagate_failure",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct TraceChunkIngestContext<'a> {
|
||||
pipeline_handler: PipelineHandlerRef,
|
||||
pipeline: &'a PipelineWay,
|
||||
pipeline_params: &'a GreptimePipelineParams,
|
||||
table_name: &'a str,
|
||||
is_trace_v1_model: bool,
|
||||
}
|
||||
|
||||
struct TraceIngestState {
|
||||
aux_data: TraceAuxData,
|
||||
outcome: TraceIngestOutcome,
|
||||
failure_messages: Vec<String>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl OpenTelemetryProtocolHandler for Instance {
|
||||
@@ -116,7 +158,7 @@ impl OpenTelemetryProtocolHandler for Instance {
|
||||
pipeline_params: GreptimePipelineParams,
|
||||
table_name: String,
|
||||
ctx: QueryContextRef,
|
||||
) -> ServerResult<Output> {
|
||||
) -> ServerResult<TraceIngestOutcome> {
|
||||
self.plugins
|
||||
.get::<PermissionCheckerRef>()
|
||||
.as_ref()
|
||||
@@ -128,32 +170,16 @@ impl OpenTelemetryProtocolHandler for Instance {
|
||||
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
|
||||
interceptor_ref.pre_execute(ctx.clone())?;
|
||||
|
||||
let is_trace_v1_model = matches!(pipeline, PipelineWay::OtlpTraceDirectV1);
|
||||
|
||||
let (mut requests, rows) = otlp::trace::to_grpc_insert_requests(
|
||||
request,
|
||||
pipeline,
|
||||
pipeline_params,
|
||||
table_name,
|
||||
&ctx,
|
||||
let spans = otlp::trace::span::parse(request);
|
||||
self.ingest_trace_spans(
|
||||
pipeline_handler,
|
||||
)?;
|
||||
|
||||
OTLP_TRACES_ROWS.inc_by(rows as u64);
|
||||
|
||||
if is_trace_v1_model {
|
||||
self.reconcile_trace_column_types(&mut requests, &ctx)
|
||||
.await?;
|
||||
self.handle_trace_inserts(requests, ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)
|
||||
} else {
|
||||
self.handle_log_inserts(requests, ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)
|
||||
}
|
||||
&pipeline,
|
||||
&pipeline_params,
|
||||
table_name,
|
||||
spans,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
@@ -210,6 +236,316 @@ impl OpenTelemetryProtocolHandler for Instance {
|
||||
}
|
||||
|
||||
impl Instance {
|
||||
/// Ingest OTLP trace spans with chunk-level writes and span-level fallback on
|
||||
/// deterministic chunk failures.
|
||||
async fn ingest_trace_spans(
|
||||
&self,
|
||||
pipeline_handler: PipelineHandlerRef,
|
||||
pipeline: &PipelineWay,
|
||||
pipeline_params: &GreptimePipelineParams,
|
||||
table_name: String,
|
||||
groups: Vec<TraceSpanGroup>,
|
||||
ctx: QueryContextRef,
|
||||
) -> ServerResult<TraceIngestOutcome> {
|
||||
let is_trace_v1_model = matches!(pipeline, PipelineWay::OtlpTraceDirectV1);
|
||||
let ingest_ctx = TraceChunkIngestContext {
|
||||
pipeline_handler,
|
||||
pipeline,
|
||||
pipeline_params,
|
||||
table_name: &table_name,
|
||||
is_trace_v1_model,
|
||||
};
|
||||
let mut ingest_state = TraceIngestState {
|
||||
aux_data: TraceAuxData::default(),
|
||||
outcome: TraceIngestOutcome::default(),
|
||||
failure_messages: Vec::new(),
|
||||
};
|
||||
|
||||
for group in groups {
|
||||
let chunks = group
|
||||
.spans
|
||||
.into_iter()
|
||||
.chunks(TRACE_INGEST_CHUNK_SIZE)
|
||||
.into_iter()
|
||||
.map(|chunk| chunk.collect::<Vec<_>>())
|
||||
.collect::<Vec<_>>();
|
||||
for chunk in chunks {
|
||||
self.ingest_trace_chunk(&ingest_ctx, chunk, ctx.clone(), &mut ingest_state)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
OTLP_TRACES_ROWS.inc_by(ingest_state.outcome.accepted_spans as u64);
|
||||
|
||||
if !ingest_state.aux_data.is_empty() {
|
||||
// Auxiliary trace tables are derived from spans whose main-table
|
||||
// writes are already confirmed, so they never create new accepted
|
||||
// spans and they do not affect rejected span counts.
|
||||
let (aux_requests, _) = otlp::trace::to_grpc_insert_requests_for_aux_tables(
|
||||
std::mem::take(&mut ingest_state.aux_data),
|
||||
ingest_ctx.pipeline,
|
||||
ingest_ctx.table_name,
|
||||
)?;
|
||||
|
||||
if !aux_requests.inserts.is_empty() {
|
||||
match self
|
||||
.insert_trace_requests(aux_requests, ingest_ctx.is_trace_v1_model, ctx)
|
||||
.await
|
||||
{
|
||||
Ok(output) => {
|
||||
Self::add_trace_write_cost(&mut ingest_state.outcome, output.meta.cost);
|
||||
}
|
||||
Err(err) => {
|
||||
Self::push_trace_failure_message(
|
||||
&mut ingest_state.failure_messages,
|
||||
"aux_table_update_failed",
|
||||
format!(
|
||||
"Auxiliary trace tables were not fully updated ({})",
|
||||
err.status_code().as_ref()
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ingest_state.outcome.error_message = Self::finish_trace_failure_message(
|
||||
ingest_state.outcome.accepted_spans,
|
||||
ingest_state.outcome.rejected_spans,
|
||||
ingest_state.failure_messages,
|
||||
);
|
||||
|
||||
Ok(ingest_state.outcome)
|
||||
}
|
||||
|
||||
/// Ingest one owned trace chunk so successful spans can be moved into the
|
||||
/// accepted set without extra cloning.
|
||||
async fn ingest_trace_chunk(
|
||||
&self,
|
||||
ingest_ctx: &TraceChunkIngestContext<'_>,
|
||||
chunk: Vec<TraceSpan>,
|
||||
ctx: QueryContextRef,
|
||||
ingest_state: &mut TraceIngestState,
|
||||
) -> ServerResult<()> {
|
||||
// Try the fast path first so healthy batches keep their original
|
||||
// throughput and write amplification stays low.
|
||||
let (requests, chunk_rows) = otlp::trace::to_grpc_insert_requests_from_spans(
|
||||
&chunk,
|
||||
ingest_ctx.pipeline,
|
||||
ingest_ctx.pipeline_params,
|
||||
ingest_ctx.table_name,
|
||||
&ctx,
|
||||
ingest_ctx.pipeline_handler.clone(),
|
||||
)?;
|
||||
|
||||
match self
|
||||
.insert_trace_requests(requests, ingest_ctx.is_trace_v1_model, ctx.clone())
|
||||
.await
|
||||
{
|
||||
Ok(output) => {
|
||||
Self::add_trace_write_cost(&mut ingest_state.outcome, output.meta.cost);
|
||||
ingest_state.outcome.accepted_spans += chunk_rows;
|
||||
for span in &chunk {
|
||||
ingest_state.aux_data.observe_span(span);
|
||||
}
|
||||
}
|
||||
Err(err) => match Self::classify_trace_chunk_failure(err.status_code()) {
|
||||
ChunkFailureReaction::RetryPerSpan => {
|
||||
Self::push_trace_failure_message(
|
||||
&mut ingest_state.failure_messages,
|
||||
ChunkFailureReaction::RetryPerSpan.as_metric_label(),
|
||||
format!("Chunk fallback triggered by {}", err.status_code().as_ref()),
|
||||
);
|
||||
// Only deterministic failures are retried span by span.
|
||||
// This includes schemaless table or column creation paths for
|
||||
// trace ingestion. Ambiguous failures are handled below
|
||||
// without retrying because the chunk may already have been
|
||||
// ingested.
|
||||
self.ingest_trace_chunk_span_by_span(
|
||||
ingest_ctx,
|
||||
chunk,
|
||||
ctx.clone(),
|
||||
ingest_state,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
ChunkFailureReaction::DiscardChunk => {
|
||||
ingest_state.outcome.rejected_spans += chunk.len();
|
||||
Self::push_trace_failure_message(
|
||||
&mut ingest_state.failure_messages,
|
||||
ChunkFailureReaction::DiscardChunk.as_metric_label(),
|
||||
format!(
|
||||
"Discarded {} spans after ambiguous chunk failure ({})",
|
||||
chunk.len(),
|
||||
err.status_code().as_ref()
|
||||
),
|
||||
);
|
||||
// TODO(shuiyisong): Add an idempotent retry-safe recovery path for
|
||||
// ambiguous chunk failures such as timeout-like errors.
|
||||
}
|
||||
// Retryable or ambiguous failures must fail the request instead of
|
||||
// becoming partial success. This path is not retry-safe because the
|
||||
// chunk may already have been committed before the error surfaced.
|
||||
ChunkFailureReaction::Propagate => {
|
||||
Self::push_trace_failure_message(
|
||||
&mut ingest_state.failure_messages,
|
||||
ChunkFailureReaction::Propagate.as_metric_label(),
|
||||
format!(
|
||||
"Propagating retryable chunk failure ({})",
|
||||
err.status_code().as_ref()
|
||||
),
|
||||
);
|
||||
return Err(err);
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Retry spans one by one only after a deterministic chunk failure.
|
||||
async fn ingest_trace_chunk_span_by_span(
|
||||
&self,
|
||||
ingest_ctx: &TraceChunkIngestContext<'_>,
|
||||
chunk: Vec<TraceSpan>,
|
||||
ctx: QueryContextRef,
|
||||
ingest_state: &mut TraceIngestState,
|
||||
) -> ServerResult<()> {
|
||||
for span in chunk {
|
||||
let (requests, rows) = otlp::trace::to_grpc_insert_requests_from_spans(
|
||||
std::slice::from_ref(&span),
|
||||
ingest_ctx.pipeline,
|
||||
ingest_ctx.pipeline_params,
|
||||
ingest_ctx.table_name,
|
||||
&ctx,
|
||||
ingest_ctx.pipeline_handler.clone(),
|
||||
)?;
|
||||
|
||||
match self
|
||||
.insert_trace_requests(requests, ingest_ctx.is_trace_v1_model, ctx.clone())
|
||||
.await
|
||||
{
|
||||
Ok(output) => {
|
||||
Self::add_trace_write_cost(&mut ingest_state.outcome, output.meta.cost);
|
||||
ingest_state.outcome.accepted_spans += rows;
|
||||
ingest_state.aux_data.observe_span(&span);
|
||||
}
|
||||
Err(err) => {
|
||||
if Self::should_propagate_trace_span_failure(err.status_code()) {
|
||||
Self::push_trace_failure_message(
|
||||
&mut ingest_state.failure_messages,
|
||||
ChunkFailureReaction::Propagate.as_metric_label(),
|
||||
format!(
|
||||
"Propagating retryable span failure for {}:{} ({})",
|
||||
span.trace_id,
|
||||
span.span_id,
|
||||
err.status_code().as_ref()
|
||||
),
|
||||
);
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
ingest_state.outcome.rejected_spans += 1;
|
||||
Self::push_trace_failure_message(
|
||||
&mut ingest_state.failure_messages,
|
||||
"span_rejected",
|
||||
format!(
|
||||
"Rejected span {}:{} ({})",
|
||||
span.trace_id,
|
||||
span.span_id,
|
||||
err.status_code().as_ref()
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Reconcile and insert one trace request batch.
|
||||
async fn insert_trace_requests(
|
||||
&self,
|
||||
mut requests: RowInsertRequests,
|
||||
is_trace_v1_model: bool,
|
||||
ctx: QueryContextRef,
|
||||
) -> ServerResult<Output> {
|
||||
if is_trace_v1_model {
|
||||
self.reconcile_trace_column_types(&mut requests, &ctx)
|
||||
.await?;
|
||||
self.handle_trace_inserts(requests, ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)
|
||||
} else {
|
||||
self.handle_log_inserts(requests, ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)
|
||||
}
|
||||
}
|
||||
|
||||
fn classify_trace_chunk_failure(status: StatusCode) -> ChunkFailureReaction {
|
||||
match status {
|
||||
StatusCode::InvalidArguments
|
||||
| StatusCode::InvalidSyntax
|
||||
| StatusCode::Unsupported
|
||||
| StatusCode::TableNotFound
|
||||
| StatusCode::TableColumnNotFound => ChunkFailureReaction::RetryPerSpan,
|
||||
StatusCode::DatabaseNotFound => ChunkFailureReaction::DiscardChunk,
|
||||
StatusCode::Cancelled | StatusCode::DeadlineExceeded => ChunkFailureReaction::Propagate,
|
||||
_ if status.is_retryable() => ChunkFailureReaction::Propagate,
|
||||
_ => ChunkFailureReaction::DiscardChunk,
|
||||
}
|
||||
}
|
||||
|
||||
fn should_propagate_trace_span_failure(status: StatusCode) -> bool {
|
||||
matches!(
|
||||
Self::classify_trace_chunk_failure(status),
|
||||
ChunkFailureReaction::Propagate
|
||||
)
|
||||
}
|
||||
|
||||
fn add_trace_write_cost(outcome: &mut TraceIngestOutcome, cost: usize) {
|
||||
outcome.write_cost += cost;
|
||||
}
|
||||
|
||||
fn push_trace_failure_message(messages: &mut Vec<String>, label: &str, message: String) {
|
||||
OTLP_TRACES_FAILURE_COUNT.with_label_values(&[label]).inc();
|
||||
|
||||
if messages.len() < TRACE_FAILURE_MESSAGE_LIMIT {
|
||||
messages.push(message);
|
||||
} else if messages.len() == TRACE_FAILURE_MESSAGE_LIMIT {
|
||||
tracing::debug!(
|
||||
label,
|
||||
limit = TRACE_FAILURE_MESSAGE_LIMIT,
|
||||
"Trace ingest failure message limit reached; suppressing additional failure details"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn finish_trace_failure_message(
|
||||
accepted_spans: usize,
|
||||
rejected_spans: usize,
|
||||
messages: Vec<String>,
|
||||
) -> Option<String> {
|
||||
if rejected_spans == 0 && messages.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut summary = format!(
|
||||
"Accepted {} spans, rejected {} spans",
|
||||
accepted_spans, rejected_spans
|
||||
);
|
||||
|
||||
if !messages.is_empty() {
|
||||
summary.push_str(": ");
|
||||
summary.push_str(&messages.join("; "));
|
||||
}
|
||||
|
||||
Some(summary)
|
||||
}
|
||||
|
||||
/// Picks the final datatype for one trace column.
|
||||
///
|
||||
/// Existing table schema is authoritative when present. Otherwise we resolve the
|
||||
@@ -428,3 +764,163 @@ fn push_observed_trace_type(observed_types: &mut Vec<ColumnDataType>, datatype:
|
||||
observed_types.push(datatype);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_error::status_code::StatusCode;
|
||||
use servers::query_handler::TraceIngestOutcome;
|
||||
|
||||
use super::{ChunkFailureReaction, Instance};
|
||||
use crate::metrics::OTLP_TRACES_FAILURE_COUNT;
|
||||
|
||||
#[test]
|
||||
fn test_classify_trace_chunk_failure() {
|
||||
assert_eq!(
|
||||
Instance::classify_trace_chunk_failure(StatusCode::InvalidArguments),
|
||||
ChunkFailureReaction::RetryPerSpan
|
||||
);
|
||||
assert_eq!(
|
||||
Instance::classify_trace_chunk_failure(StatusCode::InvalidSyntax),
|
||||
ChunkFailureReaction::RetryPerSpan
|
||||
);
|
||||
assert_eq!(
|
||||
Instance::classify_trace_chunk_failure(StatusCode::Unsupported),
|
||||
ChunkFailureReaction::RetryPerSpan
|
||||
);
|
||||
assert_eq!(
|
||||
Instance::classify_trace_chunk_failure(StatusCode::TableColumnNotFound),
|
||||
ChunkFailureReaction::RetryPerSpan
|
||||
);
|
||||
assert_eq!(
|
||||
Instance::classify_trace_chunk_failure(StatusCode::TableNotFound),
|
||||
ChunkFailureReaction::RetryPerSpan
|
||||
);
|
||||
assert_eq!(
|
||||
Instance::classify_trace_chunk_failure(StatusCode::DatabaseNotFound),
|
||||
ChunkFailureReaction::DiscardChunk
|
||||
);
|
||||
assert_eq!(
|
||||
Instance::classify_trace_chunk_failure(StatusCode::DeadlineExceeded),
|
||||
ChunkFailureReaction::Propagate
|
||||
);
|
||||
assert_eq!(
|
||||
Instance::classify_trace_chunk_failure(StatusCode::Cancelled),
|
||||
ChunkFailureReaction::Propagate
|
||||
);
|
||||
assert_eq!(
|
||||
Instance::classify_trace_chunk_failure(StatusCode::StorageUnavailable),
|
||||
ChunkFailureReaction::Propagate
|
||||
);
|
||||
assert_eq!(
|
||||
Instance::classify_trace_chunk_failure(StatusCode::Internal),
|
||||
ChunkFailureReaction::Propagate
|
||||
);
|
||||
assert_eq!(
|
||||
Instance::classify_trace_chunk_failure(StatusCode::RegionNotReady),
|
||||
ChunkFailureReaction::Propagate
|
||||
);
|
||||
assert_eq!(
|
||||
Instance::classify_trace_chunk_failure(StatusCode::TableUnavailable),
|
||||
ChunkFailureReaction::Propagate
|
||||
);
|
||||
assert_eq!(
|
||||
Instance::classify_trace_chunk_failure(StatusCode::RegionBusy),
|
||||
ChunkFailureReaction::Propagate
|
||||
);
|
||||
assert_eq!(
|
||||
Instance::classify_trace_chunk_failure(StatusCode::RuntimeResourcesExhausted),
|
||||
ChunkFailureReaction::Propagate
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_classify_trace_span_failure() {
|
||||
assert!(Instance::should_propagate_trace_span_failure(
|
||||
StatusCode::DeadlineExceeded
|
||||
));
|
||||
assert!(Instance::should_propagate_trace_span_failure(
|
||||
StatusCode::StorageUnavailable
|
||||
));
|
||||
assert!(!Instance::should_propagate_trace_span_failure(
|
||||
StatusCode::InvalidArguments
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_add_trace_write_cost() {
|
||||
let mut outcome = TraceIngestOutcome::default();
|
||||
Instance::add_trace_write_cost(&mut outcome, 3);
|
||||
Instance::add_trace_write_cost(&mut outcome, 5);
|
||||
assert_eq!(outcome.write_cost, 8);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_finish_trace_failure_message() {
|
||||
let message = Instance::finish_trace_failure_message(
|
||||
3,
|
||||
2,
|
||||
vec!["Rejected span trace:span (InvalidArguments)".to_string()],
|
||||
)
|
||||
.unwrap();
|
||||
assert!(message.contains("Accepted 3 spans, rejected 2 spans"));
|
||||
assert!(message.contains("Rejected span trace:span"));
|
||||
|
||||
assert_eq!(Instance::finish_trace_failure_message(2, 0, vec![]), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_finish_trace_failure_message_without_detail_messages() {
|
||||
assert_eq!(
|
||||
Instance::finish_trace_failure_message(0, 2, vec![]),
|
||||
Some("Accepted 0 spans, rejected 2 spans".to_string())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_push_trace_failure_message_increments_labeled_counter() {
|
||||
let label = "retry_per_span_counter_test";
|
||||
let initial = OTLP_TRACES_FAILURE_COUNT.with_label_values(&[label]).get();
|
||||
let mut messages = Vec::new();
|
||||
|
||||
Instance::push_trace_failure_message(
|
||||
&mut messages,
|
||||
label,
|
||||
"Chunk fallback triggered by InvalidArguments".to_string(),
|
||||
);
|
||||
|
||||
assert_eq!(messages.len(), 1);
|
||||
assert_eq!(
|
||||
OTLP_TRACES_FAILURE_COUNT.with_label_values(&[label]).get(),
|
||||
initial + 1
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_push_trace_failure_message_caps_recorded_messages() {
|
||||
let label = "retry_per_span_limit_test";
|
||||
let mut messages = Vec::new();
|
||||
|
||||
for idx in 0..=4 {
|
||||
Instance::push_trace_failure_message(&mut messages, label, format!("failure-{idx}"));
|
||||
}
|
||||
|
||||
assert_eq!(messages.len(), 4);
|
||||
assert_eq!(
|
||||
messages,
|
||||
vec![
|
||||
"failure-0".to_string(),
|
||||
"failure-1".to_string(),
|
||||
"failure-2".to_string(),
|
||||
"failure-3".to_string()
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_classify_trace_chunk_failure_defaults_to_discard() {
|
||||
assert_eq!(
|
||||
Instance::classify_trace_chunk_failure(StatusCode::Unknown),
|
||||
ChunkFailureReaction::DiscardChunk
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,6 +52,14 @@ lazy_static! {
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
/// The number of OpenTelemetry trace ingest failures on the frontend node.
|
||||
pub static ref OTLP_TRACES_FAILURE_COUNT: IntCounterVec = register_int_counter_vec!(
|
||||
"greptime_frontend_otlp_traces_failure_count",
|
||||
"frontend otlp trace ingest failure count",
|
||||
&["label"]
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
/// The number of OpenTelemetry logs send by frontend node.
|
||||
pub static ref OTLP_LOGS_ROWS: IntCounter = register_int_counter!(
|
||||
"greptime_frontend_otlp_logs_rows",
|
||||
|
||||
@@ -29,7 +29,7 @@ use opentelemetry_proto::tonic::collector::logs::v1::{
|
||||
};
|
||||
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceResponse;
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::{
|
||||
ExportTraceServiceRequest, ExportTraceServiceResponse,
|
||||
ExportTracePartialSuccess, ExportTraceServiceRequest, ExportTraceServiceResponse,
|
||||
};
|
||||
use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
|
||||
use pipeline::PipelineWay;
|
||||
@@ -175,11 +175,16 @@ pub async fn traces(
|
||||
query_ctx,
|
||||
)
|
||||
.await
|
||||
.map(|o| OtlpResponse {
|
||||
.map(|outcome| OtlpResponse {
|
||||
resp_body: ExportTraceServiceResponse {
|
||||
partial_success: None,
|
||||
partial_success: outcome.error_message.map(|error_message| {
|
||||
ExportTracePartialSuccess {
|
||||
rejected_spans: outcome.rejected_spans as i64,
|
||||
error_message,
|
||||
}
|
||||
}),
|
||||
},
|
||||
write_cost: o.meta.cost,
|
||||
write_cost: outcome.write_cost,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -18,15 +18,17 @@ pub mod span;
|
||||
pub mod v0;
|
||||
pub mod v1;
|
||||
|
||||
use std::collections::HashSet;
|
||||
|
||||
use api::v1::RowInsertRequests;
|
||||
pub use common_catalog::consts::{
|
||||
PARENT_SPAN_ID_COLUMN, SPAN_ID_COLUMN, SPAN_NAME_COLUMN, TRACE_ID_COLUMN,
|
||||
};
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
|
||||
use pipeline::{GreptimePipelineParams, PipelineWay};
|
||||
use session::context::QueryContextRef;
|
||||
|
||||
use crate::error::{NotSupportedSnafu, Result};
|
||||
use crate::otlp::trace::span::TraceSpan;
|
||||
use crate::query_handler::PipelineHandlerRef;
|
||||
|
||||
// column names
|
||||
@@ -65,27 +67,58 @@ pub const SPAN_STATUS_PREFIX: &str = "STATUS_CODE_";
|
||||
pub const SPAN_STATUS_UNSET: &str = "STATUS_CODE_UNSET";
|
||||
pub const SPAN_STATUS_ERROR: &str = "STATUS_CODE_ERROR";
|
||||
|
||||
/// Convert SpanTraces to GreptimeDB row insert requests.
|
||||
/// Returns `InsertRequests` and total number of rows to ingest
|
||||
pub fn to_grpc_insert_requests(
|
||||
request: ExportTraceServiceRequest,
|
||||
pipeline: PipelineWay,
|
||||
pipeline_params: GreptimePipelineParams,
|
||||
table_name: String,
|
||||
/// Deduplicated auxiliary trace entities derived from successfully ingested
|
||||
/// spans.
|
||||
///
|
||||
/// The main trace table is written first. Once a span is confirmed accepted, we
|
||||
/// record the service and operation tuples here so the auxiliary tables can be
|
||||
/// updated separately without affecting span acceptance accounting.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct TraceAuxData {
|
||||
pub services: HashSet<String>,
|
||||
pub operations: HashSet<(String, String, String)>,
|
||||
}
|
||||
|
||||
impl TraceAuxData {
|
||||
/// Records the auxiliary service and operation rows implied by one accepted
|
||||
/// span.
|
||||
pub fn observe_span(&mut self, span: &TraceSpan) {
|
||||
if let Some(service_name) = &span.service_name {
|
||||
self.services.insert(service_name.clone());
|
||||
self.operations.insert((
|
||||
service_name.clone(),
|
||||
span.span_name.clone(),
|
||||
span.span_kind.clone(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true when no auxiliary table updates are needed.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.services.is_empty() && self.operations.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert a subset of trace spans to GreptimeDB row insert requests.
|
||||
pub fn to_grpc_insert_requests_from_spans(
|
||||
spans: &[TraceSpan],
|
||||
pipeline: &PipelineWay,
|
||||
pipeline_params: &GreptimePipelineParams,
|
||||
table_name: &str,
|
||||
query_ctx: &QueryContextRef,
|
||||
pipeline_handler: PipelineHandlerRef,
|
||||
) -> Result<(RowInsertRequests, usize)> {
|
||||
match pipeline {
|
||||
PipelineWay::OtlpTraceDirectV0 => v0::v0_to_grpc_insert_requests(
|
||||
request,
|
||||
PipelineWay::OtlpTraceDirectV0 => v0::v0_to_grpc_main_insert_requests(
|
||||
spans,
|
||||
pipeline,
|
||||
pipeline_params,
|
||||
table_name,
|
||||
query_ctx,
|
||||
pipeline_handler,
|
||||
),
|
||||
PipelineWay::OtlpTraceDirectV1 => v1::v1_to_grpc_insert_requests(
|
||||
request,
|
||||
PipelineWay::OtlpTraceDirectV1 => v1::v1_to_grpc_main_insert_requests(
|
||||
spans,
|
||||
pipeline,
|
||||
pipeline_params,
|
||||
table_name,
|
||||
@@ -98,3 +131,23 @@ pub fn to_grpc_insert_requests(
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Build insert requests for the auxiliary trace tables derived from accepted
|
||||
/// spans.
|
||||
///
|
||||
/// "Aux" here refers to the trace service and trace operation tables, not the
|
||||
/// main trace span table itself.
|
||||
pub fn to_grpc_insert_requests_for_aux_tables(
|
||||
aux_data: TraceAuxData,
|
||||
pipeline: &PipelineWay,
|
||||
table_name: &str,
|
||||
) -> Result<(RowInsertRequests, usize)> {
|
||||
match pipeline {
|
||||
PipelineWay::OtlpTraceDirectV0 => v0::build_aux_table_requests(aux_data, table_name),
|
||||
PipelineWay::OtlpTraceDirectV1 => v1::build_aux_table_requests(aux_data, table_name),
|
||||
_ => NotSupportedSnafu {
|
||||
feat: "Unsupported pipeline for trace",
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,6 +53,18 @@ pub struct TraceSpan {
|
||||
|
||||
pub type TraceSpans = Vec<TraceSpan>;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TraceSpanGroup {
|
||||
pub service_name: Option<String>,
|
||||
pub resource_attributes: Attributes,
|
||||
pub scope_name: String,
|
||||
pub scope_version: String,
|
||||
pub scope_attributes: Attributes,
|
||||
pub spans: TraceSpans,
|
||||
}
|
||||
|
||||
pub type TraceSpanGroups = Vec<TraceSpanGroup>;
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct SpanLink {
|
||||
pub trace_id: String,
|
||||
@@ -241,14 +253,13 @@ pub fn status_to_string(status: &Option<Status>) -> (String, String) {
|
||||
/// See
|
||||
/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto>
|
||||
/// for data structure of OTLP traces.
|
||||
pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans {
|
||||
let span_size = request
|
||||
pub fn parse(request: ExportTraceServiceRequest) -> TraceSpanGroups {
|
||||
let group_size = request
|
||||
.resource_spans
|
||||
.iter()
|
||||
.flat_map(|res| res.scope_spans.iter())
|
||||
.flat_map(|scope| scope.spans.iter())
|
||||
.count();
|
||||
let mut spans = Vec::with_capacity(span_size);
|
||||
let mut groups = Vec::with_capacity(group_size);
|
||||
for resource_spans in request.resource_spans {
|
||||
let resource_attrs = resource_spans
|
||||
.resource
|
||||
@@ -268,6 +279,7 @@ pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans {
|
||||
|
||||
for scope_spans in resource_spans.scope_spans {
|
||||
let scope = scope_spans.scope.unwrap_or_default();
|
||||
let mut spans = Vec::with_capacity(scope_spans.spans.len());
|
||||
for span in scope_spans.spans {
|
||||
spans.push(parse_span(
|
||||
service_name.clone(),
|
||||
@@ -276,16 +288,47 @@ pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans {
|
||||
span,
|
||||
));
|
||||
}
|
||||
groups.push(TraceSpanGroup {
|
||||
service_name: service_name.clone(),
|
||||
resource_attributes: Attributes::from(&resource_attrs[..]),
|
||||
scope_name: scope.name,
|
||||
scope_version: scope.version,
|
||||
scope_attributes: Attributes::from(scope.attributes),
|
||||
spans,
|
||||
});
|
||||
}
|
||||
}
|
||||
spans
|
||||
groups
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use opentelemetry_proto::tonic::trace::v1::Status;
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
|
||||
use opentelemetry_proto::tonic::common::v1::{
|
||||
AnyValue, InstrumentationScope, KeyValue, any_value,
|
||||
};
|
||||
use opentelemetry_proto::tonic::resource::v1::Resource;
|
||||
use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span, Status};
|
||||
|
||||
use crate::otlp::trace::span::{bytes_to_hex_string, status_to_string};
|
||||
use crate::otlp::trace::KEY_SERVICE_NAME;
|
||||
use crate::otlp::trace::span::{bytes_to_hex_string, parse, status_to_string};
|
||||
|
||||
fn make_kv(key: &str, value: &str) -> KeyValue {
|
||||
KeyValue {
|
||||
key: key.to_string(),
|
||||
value: Some(AnyValue {
|
||||
value: Some(any_value::Value::StringValue(value.to_string())),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn make_span(trace_id: u8, span_id: u8) -> Span {
|
||||
Span {
|
||||
trace_id: vec![trace_id; 16],
|
||||
span_id: vec![span_id; 8],
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bytes_to_hex_string() {
|
||||
@@ -315,4 +358,62 @@ mod tests {
|
||||
status_to_string(&Some(status)),
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_preserves_resource_scope_groups() {
|
||||
let request = ExportTraceServiceRequest {
|
||||
resource_spans: vec![
|
||||
ResourceSpans {
|
||||
resource: Some(Resource {
|
||||
attributes: vec![make_kv(KEY_SERVICE_NAME, "svc-a")],
|
||||
..Default::default()
|
||||
}),
|
||||
scope_spans: vec![
|
||||
ScopeSpans {
|
||||
scope: Some(InstrumentationScope {
|
||||
name: "scope-1".to_string(),
|
||||
..Default::default()
|
||||
}),
|
||||
spans: vec![make_span(0x11, 0x21), make_span(0x12, 0x22)],
|
||||
..Default::default()
|
||||
},
|
||||
ScopeSpans {
|
||||
scope: Some(InstrumentationScope {
|
||||
name: "scope-2".to_string(),
|
||||
..Default::default()
|
||||
}),
|
||||
spans: vec![make_span(0x13, 0x23)],
|
||||
..Default::default()
|
||||
},
|
||||
],
|
||||
..Default::default()
|
||||
},
|
||||
ResourceSpans {
|
||||
resource: Some(Resource {
|
||||
attributes: vec![make_kv(KEY_SERVICE_NAME, "svc-b")],
|
||||
..Default::default()
|
||||
}),
|
||||
scope_spans: vec![ScopeSpans {
|
||||
scope: Some(InstrumentationScope {
|
||||
name: "scope-3".to_string(),
|
||||
..Default::default()
|
||||
}),
|
||||
spans: vec![make_span(0x14, 0x24)],
|
||||
..Default::default()
|
||||
}],
|
||||
..Default::default()
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
let groups = parse(request);
|
||||
assert_eq!(groups.len(), 3);
|
||||
assert_eq!(groups[0].service_name.as_deref(), Some("svc-a"));
|
||||
assert_eq!(groups[0].scope_name, "scope-1");
|
||||
assert_eq!(groups[0].spans.len(), 2);
|
||||
assert_eq!(groups[1].scope_name, "scope-2");
|
||||
assert_eq!(groups[1].spans.len(), 1);
|
||||
assert_eq!(groups[2].service_name.as_deref(), Some("svc-b"));
|
||||
assert_eq!(groups[2].scope_name, "scope-3");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,16 +18,16 @@ use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, RowInsertRequests};
|
||||
use common_catalog::consts::{trace_operations_table_name, trace_services_table_name};
|
||||
use common_grpc::precision::Precision;
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
|
||||
use pipeline::{GreptimePipelineParams, PipelineWay};
|
||||
use session::context::QueryContextRef;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::otlp::trace::span::{TraceSpan, parse};
|
||||
use crate::otlp::trace::span::TraceSpan;
|
||||
use crate::otlp::trace::{
|
||||
DURATION_NANO_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
|
||||
SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, SPAN_STATUS_CODE,
|
||||
SPAN_STATUS_MESSAGE_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_STATE_COLUMN,
|
||||
TraceAuxData,
|
||||
};
|
||||
use crate::otlp::utils::{make_column_data, make_string_column_data};
|
||||
use crate::query_handler::PipelineHandlerRef;
|
||||
@@ -38,56 +38,52 @@ const APPROXIMATE_COLUMN_COUNT: usize = 24;
|
||||
// Use a timestamp(2100-01-01 00:00:00) as large as possible.
|
||||
const MAX_TIMESTAMP: i64 = 4102444800000000000;
|
||||
|
||||
/// Convert SpanTraces to GreptimeDB row insert requests.
|
||||
/// Returns `InsertRequests` and total number of rows to ingest
|
||||
pub fn v0_to_grpc_insert_requests(
|
||||
request: ExportTraceServiceRequest,
|
||||
_pipeline: PipelineWay,
|
||||
_pipeline_params: GreptimePipelineParams,
|
||||
table_name: String,
|
||||
/// Converts trace spans into row insert requests for the main v0 trace table.
|
||||
///
|
||||
/// Auxiliary service and operation table writes are built separately so the
|
||||
/// caller can update them only after the main span write succeeds.
|
||||
pub fn v0_to_grpc_main_insert_requests(
|
||||
spans: &[TraceSpan],
|
||||
_pipeline: &PipelineWay,
|
||||
_pipeline_params: &GreptimePipelineParams,
|
||||
table_name: &str,
|
||||
_query_ctx: &QueryContextRef,
|
||||
_pipeline_handler: PipelineHandlerRef,
|
||||
) -> Result<(RowInsertRequests, usize)> {
|
||||
let spans = parse(request);
|
||||
let mut multi_table_writer = MultiTableData::default();
|
||||
let trace_writer = build_trace_table_data(spans)?;
|
||||
multi_table_writer.add_table_data(table_name, trace_writer);
|
||||
|
||||
Ok(multi_table_writer.into_row_insert_requests())
|
||||
}
|
||||
|
||||
/// Builds the row-oriented payload for the main v0 trace table.
|
||||
pub fn build_trace_table_data(spans: &[TraceSpan]) -> Result<TableData> {
|
||||
let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len());
|
||||
for span in spans.iter().cloned() {
|
||||
write_span_to_row(&mut trace_writer, span)?;
|
||||
}
|
||||
|
||||
Ok(trace_writer)
|
||||
}
|
||||
|
||||
/// Builds row insert requests for the v0 trace auxiliary tables.
|
||||
pub fn build_aux_table_requests(
|
||||
aux_data: TraceAuxData,
|
||||
table_name: &str,
|
||||
) -> Result<(RowInsertRequests, usize)> {
|
||||
let mut multi_table_writer = MultiTableData::default();
|
||||
let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
|
||||
let mut trace_operations_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
|
||||
|
||||
let mut services = HashSet::new();
|
||||
let mut operations = HashSet::new();
|
||||
for span in spans {
|
||||
if let Some(service_name) = &span.service_name {
|
||||
// Only insert the service name if it's not already in the set.
|
||||
if !services.contains(service_name) {
|
||||
services.insert(service_name.clone());
|
||||
}
|
||||
|
||||
// Collect operations (service_name + span_name + span_kind).
|
||||
let operation = (
|
||||
service_name.clone(),
|
||||
span.span_name.clone(),
|
||||
span.span_kind.clone(),
|
||||
);
|
||||
if !operations.contains(&operation) {
|
||||
operations.insert(operation);
|
||||
}
|
||||
}
|
||||
write_span_to_row(&mut trace_writer, span)?;
|
||||
}
|
||||
write_trace_services_to_row(&mut trace_services_writer, services)?;
|
||||
write_trace_operations_to_row(&mut trace_operations_writer, operations)?;
|
||||
write_trace_services_to_row(&mut trace_services_writer, aux_data.services)?;
|
||||
write_trace_operations_to_row(&mut trace_operations_writer, aux_data.operations)?;
|
||||
|
||||
multi_table_writer.add_table_data(trace_services_table_name(table_name), trace_services_writer);
|
||||
multi_table_writer.add_table_data(
|
||||
trace_services_table_name(&table_name),
|
||||
trace_services_writer,
|
||||
);
|
||||
multi_table_writer.add_table_data(
|
||||
trace_operations_table_name(&table_name),
|
||||
trace_operations_table_name(table_name),
|
||||
trace_operations_writer,
|
||||
);
|
||||
multi_table_writer.add_table_data(table_name, trace_writer);
|
||||
|
||||
Ok(multi_table_writer.into_row_insert_requests())
|
||||
}
|
||||
|
||||
@@ -232,3 +228,63 @@ fn write_trace_operations_to_row(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{build_aux_table_requests, build_trace_table_data};
|
||||
use crate::otlp::trace::TraceAuxData;
|
||||
use crate::otlp::trace::attributes::Attributes;
|
||||
use crate::otlp::trace::span::{SpanEvents, SpanLinks, TraceSpan};
|
||||
|
||||
fn make_span(service_name: &str, trace_id: &str, span_id: &str) -> TraceSpan {
|
||||
TraceSpan {
|
||||
service_name: Some(service_name.to_string()),
|
||||
trace_id: trace_id.to_string(),
|
||||
span_id: span_id.to_string(),
|
||||
parent_span_id: None,
|
||||
resource_attributes: Attributes::from(vec![]),
|
||||
scope_name: "scope".to_string(),
|
||||
scope_version: "v1".to_string(),
|
||||
scope_attributes: Attributes::from(vec![]),
|
||||
trace_state: String::new(),
|
||||
span_name: "op".to_string(),
|
||||
span_kind: "SPAN_KIND_SERVER".to_string(),
|
||||
span_status_code: "STATUS_CODE_UNSET".to_string(),
|
||||
span_status_message: String::new(),
|
||||
span_attributes: Attributes::from(vec![]),
|
||||
span_events: SpanEvents::from(vec![]),
|
||||
span_links: SpanLinks::from(vec![]),
|
||||
start_in_nanosecond: 1,
|
||||
end_in_nanosecond: 2,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_trace_table_data_from_span_subset() {
|
||||
let spans = [
|
||||
make_span("svc-a", "trace-a", "span-a"),
|
||||
make_span("svc-b", "trace-b", "span-b"),
|
||||
];
|
||||
|
||||
let writer = build_trace_table_data(&spans[..1]).unwrap();
|
||||
let (_, rows) = writer.into_schema_and_rows();
|
||||
assert_eq!(rows.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_aux_table_requests_deduplicates_services_and_operations() {
|
||||
let spans = vec![
|
||||
make_span("svc-a", "trace-a", "span-a"),
|
||||
make_span("svc-a", "trace-b", "span-b"),
|
||||
];
|
||||
let mut aux_data = TraceAuxData::default();
|
||||
for span in &spans {
|
||||
aux_data.observe_span(span);
|
||||
}
|
||||
|
||||
let (requests, total_rows) =
|
||||
build_aux_table_requests(aux_data, "opentelemetry_traces").unwrap();
|
||||
assert_eq!(requests.inserts.len(), 2);
|
||||
assert_eq!(total_rows, 2);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,19 +18,18 @@ use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, RowInsertRequests, Value};
|
||||
use common_catalog::consts::{trace_operations_table_name, trace_services_table_name};
|
||||
use common_grpc::precision::Precision;
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
|
||||
use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue;
|
||||
use pipeline::{GreptimePipelineParams, PipelineWay};
|
||||
use session::context::QueryContextRef;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::otlp::trace::attributes::Attributes;
|
||||
use crate::otlp::trace::span::{TraceSpan, parse};
|
||||
use crate::otlp::trace::span::TraceSpan;
|
||||
use crate::otlp::trace::{
|
||||
DURATION_NANO_COLUMN, KEY_SERVICE_NAME, PARENT_SPAN_ID_COLUMN, SCOPE_NAME_COLUMN,
|
||||
SCOPE_VERSION_COLUMN, SERVICE_NAME_COLUMN, SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN,
|
||||
SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_MESSAGE_COLUMN,
|
||||
TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_STATE_COLUMN,
|
||||
TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_STATE_COLUMN, TraceAuxData,
|
||||
};
|
||||
use crate::otlp::utils::{any_value_to_jsonb, make_column_data, make_string_column_data};
|
||||
use crate::query_handler::PipelineHandlerRef;
|
||||
@@ -41,64 +40,52 @@ const APPROXIMATE_COLUMN_COUNT: usize = 30;
|
||||
// Use a timestamp(2100-01-01 00:00:00) as large as possible.
|
||||
const MAX_TIMESTAMP: i64 = 4102444800000000000;
|
||||
|
||||
/// Convert SpanTraces to GreptimeDB row insert requests.
|
||||
/// Returns `InsertRequests` and total number of rows to ingest
|
||||
/// Converts trace spans into row insert requests for the main v1 trace table.
|
||||
///
|
||||
/// Compared with v0, this v1 implementation:
|
||||
/// 1. flattens all attribute data into columns.
|
||||
/// 2. treat `span_id` and `parent_trace_id` as fields.
|
||||
/// 3. removed `service_name` column because it's already in
|
||||
/// `resource_attributes.service_name`
|
||||
///
|
||||
/// For other compound data structures like span_links and span_events here we
|
||||
/// are still using `json` data structure.
|
||||
pub fn v1_to_grpc_insert_requests(
|
||||
request: ExportTraceServiceRequest,
|
||||
_pipeline: PipelineWay,
|
||||
_pipeline_params: GreptimePipelineParams,
|
||||
table_name: String,
|
||||
/// Auxiliary service and operation table writes are built separately so the
|
||||
/// caller can update them only after the main span write succeeds.
|
||||
pub fn v1_to_grpc_main_insert_requests(
|
||||
spans: &[TraceSpan],
|
||||
_pipeline: &PipelineWay,
|
||||
_pipeline_params: &GreptimePipelineParams,
|
||||
table_name: &str,
|
||||
_query_ctx: &QueryContextRef,
|
||||
_pipeline_handler: PipelineHandlerRef,
|
||||
) -> Result<(RowInsertRequests, usize)> {
|
||||
let spans = parse(request);
|
||||
let mut multi_table_writer = MultiTableData::default();
|
||||
let trace_writer = build_trace_table_data(spans)?;
|
||||
multi_table_writer.add_table_data(table_name, trace_writer);
|
||||
|
||||
Ok(multi_table_writer.into_row_insert_requests())
|
||||
}
|
||||
|
||||
/// Builds the row-oriented payload for the main v1 trace table.
|
||||
pub fn build_trace_table_data(spans: &[TraceSpan]) -> Result<TableData> {
|
||||
let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len());
|
||||
for span in spans.iter().cloned() {
|
||||
write_span_to_row(&mut trace_writer, span)?;
|
||||
}
|
||||
|
||||
Ok(trace_writer)
|
||||
}
|
||||
|
||||
/// Builds row insert requests for the v1 trace auxiliary tables.
|
||||
pub fn build_aux_table_requests(
|
||||
aux_data: TraceAuxData,
|
||||
table_name: &str,
|
||||
) -> Result<(RowInsertRequests, usize)> {
|
||||
let mut multi_table_writer = MultiTableData::default();
|
||||
let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
|
||||
let mut trace_operations_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
|
||||
|
||||
let mut services = HashSet::new();
|
||||
let mut operations = HashSet::new();
|
||||
for span in spans {
|
||||
if let Some(service_name) = &span.service_name {
|
||||
// Only insert the service name if it's not already in the set.
|
||||
if !services.contains(service_name) {
|
||||
services.insert(service_name.clone());
|
||||
}
|
||||
|
||||
// Only insert the operation if it's not already in the set.
|
||||
let operation = (
|
||||
service_name.clone(),
|
||||
span.span_name.clone(),
|
||||
span.span_kind.clone(),
|
||||
);
|
||||
if !operations.contains(&operation) {
|
||||
operations.insert(operation);
|
||||
}
|
||||
}
|
||||
write_span_to_row(&mut trace_writer, span)?;
|
||||
}
|
||||
write_trace_services_to_row(&mut trace_services_writer, services)?;
|
||||
write_trace_operations_to_row(&mut trace_operations_writer, operations)?;
|
||||
write_trace_services_to_row(&mut trace_services_writer, aux_data.services)?;
|
||||
write_trace_operations_to_row(&mut trace_operations_writer, aux_data.operations)?;
|
||||
|
||||
multi_table_writer.add_table_data(trace_services_table_name(table_name), trace_services_writer);
|
||||
multi_table_writer.add_table_data(
|
||||
trace_services_table_name(&table_name),
|
||||
trace_services_writer,
|
||||
);
|
||||
multi_table_writer.add_table_data(
|
||||
trace_operations_table_name(&table_name),
|
||||
trace_operations_table_name(table_name),
|
||||
trace_operations_writer,
|
||||
);
|
||||
multi_table_writer.add_table_data(table_name, trace_writer);
|
||||
|
||||
Ok(multi_table_writer.into_row_insert_requests())
|
||||
}
|
||||
@@ -319,7 +306,9 @@ mod tests {
|
||||
use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue};
|
||||
|
||||
use super::*;
|
||||
use crate::otlp::trace::TraceAuxData;
|
||||
use crate::otlp::trace::attributes::Attributes;
|
||||
use crate::otlp::trace::span::{SpanEvents, SpanLinks};
|
||||
use crate::row_writer::TableData;
|
||||
|
||||
fn make_kv(key: &str, value: OtlpValue) -> KeyValue {
|
||||
@@ -329,6 +318,29 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn make_span(service_name: &str, trace_id: &str, span_id: &str) -> TraceSpan {
|
||||
TraceSpan {
|
||||
service_name: Some(service_name.to_string()),
|
||||
trace_id: trace_id.to_string(),
|
||||
span_id: span_id.to_string(),
|
||||
parent_span_id: None,
|
||||
resource_attributes: Attributes::from(vec![]),
|
||||
scope_name: "scope".to_string(),
|
||||
scope_version: "v1".to_string(),
|
||||
scope_attributes: Attributes::from(vec![]),
|
||||
trace_state: String::new(),
|
||||
span_name: "op".to_string(),
|
||||
span_kind: "SPAN_KIND_SERVER".to_string(),
|
||||
span_status_code: "STATUS_CODE_UNSET".to_string(),
|
||||
span_status_message: String::new(),
|
||||
span_attributes: Attributes::from(vec![]),
|
||||
span_events: SpanEvents::from(vec![]),
|
||||
span_links: SpanLinks::from(vec![]),
|
||||
start_in_nanosecond: 1,
|
||||
end_in_nanosecond: 2,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_keep_mixed_numeric_values_until_frontend_reconciliation() {
|
||||
let mut writer = TableData::new(4, 2);
|
||||
@@ -520,5 +532,22 @@ mod tests {
|
||||
Some(ValueData::StringValue("false".to_string()))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_aux_table_requests_deduplicates_services_and_operations() {
|
||||
let spans = vec![
|
||||
make_span("svc-a", "trace-a", "span-a"),
|
||||
make_span("svc-a", "trace-b", "span-b"),
|
||||
];
|
||||
let mut aux_data = TraceAuxData::default();
|
||||
for span in &spans {
|
||||
aux_data.observe_span(span);
|
||||
}
|
||||
|
||||
let (requests, total_rows) =
|
||||
build_aux_table_requests(aux_data, "opentelemetry_traces").unwrap();
|
||||
assert_eq!(requests.inserts.len(), 2);
|
||||
assert_eq!(total_rows, 2);
|
||||
}
|
||||
// Conversion matrix coverage lives in the shared coercion helper tests.
|
||||
}
|
||||
|
||||
@@ -63,6 +63,14 @@ pub type PipelineHandlerRef = Arc<dyn PipelineHandler + Send + Sync>;
|
||||
pub type LogQueryHandlerRef = Arc<dyn LogQueryHandler + Send + Sync>;
|
||||
pub type JaegerQueryHandlerRef = Arc<dyn JaegerQueryHandler + Send + Sync>;
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct TraceIngestOutcome {
|
||||
pub write_cost: usize,
|
||||
pub accepted_spans: usize,
|
||||
pub rejected_spans: usize,
|
||||
pub error_message: Option<String>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait InfluxdbLineProtocolHandler {
|
||||
/// A successful request will not return a response.
|
||||
@@ -123,7 +131,7 @@ pub trait OpenTelemetryProtocolHandler: PipelineHandler {
|
||||
pipeline_params: GreptimePipelineParams,
|
||||
table_name: String,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<Output>;
|
||||
) -> Result<TraceIngestOutcome>;
|
||||
|
||||
async fn logs(
|
||||
&self,
|
||||
|
||||
@@ -40,7 +40,9 @@ use loki_proto::logproto::{EntryAdapter, LabelPairAdapter, PushRequest, StreamAd
|
||||
use loki_proto::prost_types::Timestamp;
|
||||
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
|
||||
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::{
|
||||
ExportTraceServiceRequest, ExportTraceServiceResponse,
|
||||
};
|
||||
use pipeline::GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME;
|
||||
use prost::Message;
|
||||
use serde_json::{Value, json};
|
||||
@@ -5572,26 +5574,83 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
|
||||
],
|
||||
);
|
||||
let res = send_trace_v1_req(&client, abort_table_name, abort_req, false).await;
|
||||
assert_eq!(StatusCode::BAD_REQUEST, res.status());
|
||||
let body: Value = res.json().await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
let body = ExportTraceServiceResponse::decode(res.bytes().await).unwrap();
|
||||
let partial_success = body.partial_success.as_ref().unwrap();
|
||||
assert_eq!(partial_success.rejected_spans, 1);
|
||||
assert!(
|
||||
body["error"].as_str().unwrap().contains(
|
||||
"failed to coerce trace column 'span_attributes.attr_int' in table 'trace_type_abort'"
|
||||
partial_success
|
||||
.error_message
|
||||
.contains("Accepted 1 spans, rejected 1 spans"),
|
||||
"unexpected partial success body: {body:?}"
|
||||
);
|
||||
assert!(
|
||||
partial_success.error_message.contains(
|
||||
"Rejected span 00000000000000000000000000000013:0000000000000013 (InvalidArguments)"
|
||||
),
|
||||
"unexpected error body: {body}"
|
||||
"unexpected partial success body: {body:?}"
|
||||
);
|
||||
|
||||
validate_data(
|
||||
"otlp_traces_v1_type_abort_rows",
|
||||
&client,
|
||||
&format!(
|
||||
"select trace_id, \"span_attributes.attr_int\" from {} order by trace_id;",
|
||||
"select trace_id, \"span_attributes.attr_int\" from {} order by trace_id",
|
||||
abort_table_name
|
||||
),
|
||||
r#"[["00000000000000000000000000000011",10]]"#,
|
||||
r#"[["00000000000000000000000000000011",10],["00000000000000000000000000000012",20]]"#,
|
||||
)
|
||||
.await;
|
||||
|
||||
let chunk_failure_req = make_trace_v1_request(
|
||||
"type-discard",
|
||||
vec![
|
||||
make_trace_v1_span(
|
||||
"00000000000000000000000000000021",
|
||||
"0000000000000021",
|
||||
"discard-one",
|
||||
1_736_480_942_445_400_000,
|
||||
1_736_480_942_445_500_000,
|
||||
vec![make_string_attr("attr_text", "alpha")],
|
||||
),
|
||||
make_trace_v1_span(
|
||||
"00000000000000000000000000000022",
|
||||
"0000000000000022",
|
||||
"discard-two",
|
||||
1_736_480_942_445_600_000,
|
||||
1_736_480_942_445_700_000,
|
||||
vec![make_string_attr("attr_text", "beta")],
|
||||
),
|
||||
],
|
||||
);
|
||||
let res = send_trace_v1_req_with_db(
|
||||
&client,
|
||||
"nonexistent",
|
||||
"trace_chunk_discard",
|
||||
chunk_failure_req,
|
||||
false,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
let body = ExportTraceServiceResponse::decode(res.bytes().await).unwrap();
|
||||
let partial_success = body.partial_success.as_ref().unwrap();
|
||||
assert_eq!(partial_success.rejected_spans, 2);
|
||||
assert!(
|
||||
partial_success
|
||||
.error_message
|
||||
.contains("Accepted 0 spans, rejected 2 spans"),
|
||||
"unexpected partial success body: {body:?}"
|
||||
);
|
||||
assert!(
|
||||
partial_success
|
||||
.error_message
|
||||
.contains("Chunk fallback triggered by")
|
||||
|| partial_success
|
||||
.error_message
|
||||
.contains("Discarded 2 spans after ambiguous chunk failure"),
|
||||
"unexpected partial success body: {body:?}"
|
||||
);
|
||||
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
@@ -7829,6 +7888,16 @@ async fn send_trace_v1_req(
|
||||
table_name: &str,
|
||||
req: ExportTraceServiceRequest,
|
||||
with_gzip: bool,
|
||||
) -> TestResponse {
|
||||
send_trace_v1_req_with_db(client, "public", table_name, req, with_gzip).await
|
||||
}
|
||||
|
||||
async fn send_trace_v1_req_with_db(
|
||||
client: &TestClient,
|
||||
db_name: &str,
|
||||
table_name: &str,
|
||||
req: ExportTraceServiceRequest,
|
||||
with_gzip: bool,
|
||||
) -> TestResponse {
|
||||
send_req(
|
||||
client,
|
||||
@@ -7845,6 +7914,10 @@ async fn send_trace_v1_req(
|
||||
HeaderName::from_static("x-greptime-trace-table-name"),
|
||||
HeaderValue::from_str(table_name).unwrap(),
|
||||
),
|
||||
(
|
||||
GREPTIME_DB_HEADER_NAME.clone(),
|
||||
HeaderValue::from_str(db_name).unwrap(),
|
||||
),
|
||||
],
|
||||
"/v1/otlp/v1/traces",
|
||||
req.encode_to_vec(),
|
||||
|
||||
Reference in New Issue
Block a user