diff --git a/Cargo.lock b/Cargo.lock index 54be9bbdcb..695f19b072 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5255,6 +5255,7 @@ dependencies = [ "humantime", "humantime-serde", "hyper-util", + "itertools 0.14.0", "lazy_static", "log-query", "meta-client", diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 03b0d35130..1b0ffe6e29 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -52,6 +52,7 @@ futures.workspace = true hostname.workspace = true humantime.workspace = true humantime-serde.workspace = true +itertools.workspace = true lazy_static.workspace = true log-query.workspace = true meta-client.workspace = true diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index 59174aa89a..8cda639686 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -19,9 +19,11 @@ use api::v1::{ColumnDataType, RowInsertRequests}; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use client::Output; -use common_error::ext::BoxedError; +use common_error::ext::{BoxedError, ErrorExt}; +use common_error::status_code::StatusCode; use common_query::prelude::GREPTIME_PHYSICAL_TABLE; use common_telemetry::tracing; +use itertools::Itertools; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest; @@ -30,17 +32,57 @@ use servers::error::{self, AuthSnafu, Result as ServerResult}; use servers::http::prom_store::PHYSICAL_TABLE_PARAM; use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef}; use servers::otlp; +use servers::otlp::trace::TraceAuxData; use servers::otlp::trace::coerce::{ coerce_value_data, is_supported_trace_coercion, resolve_new_trace_column_type, trace_value_datatype, }; -use servers::query_handler::{OpenTelemetryProtocolHandler, PipelineHandlerRef}; +use servers::otlp::trace::span::{TraceSpan, TraceSpanGroup}; +use servers::query_handler::{ + OpenTelemetryProtocolHandler, PipelineHandlerRef, TraceIngestOutcome, +}; use session::context::QueryContextRef; use snafu::ResultExt; use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM}; use crate::instance::Instance; -use crate::metrics::{OTLP_LOGS_ROWS, OTLP_METRICS_ROWS, OTLP_TRACES_ROWS}; +use crate::metrics::{ + OTLP_LOGS_ROWS, OTLP_METRICS_ROWS, OTLP_TRACES_FAILURE_COUNT, OTLP_TRACES_ROWS, +}; + +const TRACE_INGEST_CHUNK_SIZE: usize = 64; +const TRACE_FAILURE_MESSAGE_LIMIT: usize = 4; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum ChunkFailureReaction { + RetryPerSpan, + DiscardChunk, + Propagate, +} + +impl ChunkFailureReaction { + fn as_metric_label(self) -> &'static str { + match self { + Self::RetryPerSpan => "retry_per_span", + Self::DiscardChunk => "discard_chunk", + Self::Propagate => "propagate_failure", + } + } +} + +struct TraceChunkIngestContext<'a> { + pipeline_handler: PipelineHandlerRef, + pipeline: &'a PipelineWay, + pipeline_params: &'a GreptimePipelineParams, + table_name: &'a str, + is_trace_v1_model: bool, +} + +struct TraceIngestState { + aux_data: TraceAuxData, + outcome: TraceIngestOutcome, + failure_messages: Vec, +} #[async_trait] impl OpenTelemetryProtocolHandler for Instance { @@ -116,7 +158,7 @@ impl OpenTelemetryProtocolHandler for Instance { pipeline_params: GreptimePipelineParams, table_name: String, ctx: QueryContextRef, - ) -> ServerResult { + ) -> ServerResult { self.plugins .get::() .as_ref() @@ -128,32 +170,16 @@ impl OpenTelemetryProtocolHandler for Instance { .get::>(); interceptor_ref.pre_execute(ctx.clone())?; - let is_trace_v1_model = matches!(pipeline, PipelineWay::OtlpTraceDirectV1); - - let (mut requests, rows) = otlp::trace::to_grpc_insert_requests( - request, - pipeline, - pipeline_params, - table_name, - &ctx, + let spans = otlp::trace::span::parse(request); + self.ingest_trace_spans( pipeline_handler, - )?; - - OTLP_TRACES_ROWS.inc_by(rows as u64); - - if is_trace_v1_model { - self.reconcile_trace_column_types(&mut requests, &ctx) - .await?; - self.handle_trace_inserts(requests, ctx) - .await - .map_err(BoxedError::new) - .context(error::ExecuteGrpcQuerySnafu) - } else { - self.handle_log_inserts(requests, ctx) - .await - .map_err(BoxedError::new) - .context(error::ExecuteGrpcQuerySnafu) - } + &pipeline, + &pipeline_params, + table_name, + spans, + ctx, + ) + .await } #[tracing::instrument(skip_all)] @@ -210,6 +236,316 @@ impl OpenTelemetryProtocolHandler for Instance { } impl Instance { + /// Ingest OTLP trace spans with chunk-level writes and span-level fallback on + /// deterministic chunk failures. + async fn ingest_trace_spans( + &self, + pipeline_handler: PipelineHandlerRef, + pipeline: &PipelineWay, + pipeline_params: &GreptimePipelineParams, + table_name: String, + groups: Vec, + ctx: QueryContextRef, + ) -> ServerResult { + let is_trace_v1_model = matches!(pipeline, PipelineWay::OtlpTraceDirectV1); + let ingest_ctx = TraceChunkIngestContext { + pipeline_handler, + pipeline, + pipeline_params, + table_name: &table_name, + is_trace_v1_model, + }; + let mut ingest_state = TraceIngestState { + aux_data: TraceAuxData::default(), + outcome: TraceIngestOutcome::default(), + failure_messages: Vec::new(), + }; + + for group in groups { + let chunks = group + .spans + .into_iter() + .chunks(TRACE_INGEST_CHUNK_SIZE) + .into_iter() + .map(|chunk| chunk.collect::>()) + .collect::>(); + for chunk in chunks { + self.ingest_trace_chunk(&ingest_ctx, chunk, ctx.clone(), &mut ingest_state) + .await?; + } + } + + OTLP_TRACES_ROWS.inc_by(ingest_state.outcome.accepted_spans as u64); + + if !ingest_state.aux_data.is_empty() { + // Auxiliary trace tables are derived from spans whose main-table + // writes are already confirmed, so they never create new accepted + // spans and they do not affect rejected span counts. + let (aux_requests, _) = otlp::trace::to_grpc_insert_requests_for_aux_tables( + std::mem::take(&mut ingest_state.aux_data), + ingest_ctx.pipeline, + ingest_ctx.table_name, + )?; + + if !aux_requests.inserts.is_empty() { + match self + .insert_trace_requests(aux_requests, ingest_ctx.is_trace_v1_model, ctx) + .await + { + Ok(output) => { + Self::add_trace_write_cost(&mut ingest_state.outcome, output.meta.cost); + } + Err(err) => { + Self::push_trace_failure_message( + &mut ingest_state.failure_messages, + "aux_table_update_failed", + format!( + "Auxiliary trace tables were not fully updated ({})", + err.status_code().as_ref() + ), + ); + } + } + } + } + + ingest_state.outcome.error_message = Self::finish_trace_failure_message( + ingest_state.outcome.accepted_spans, + ingest_state.outcome.rejected_spans, + ingest_state.failure_messages, + ); + + Ok(ingest_state.outcome) + } + + /// Ingest one owned trace chunk so successful spans can be moved into the + /// accepted set without extra cloning. + async fn ingest_trace_chunk( + &self, + ingest_ctx: &TraceChunkIngestContext<'_>, + chunk: Vec, + ctx: QueryContextRef, + ingest_state: &mut TraceIngestState, + ) -> ServerResult<()> { + // Try the fast path first so healthy batches keep their original + // throughput and write amplification stays low. + let (requests, chunk_rows) = otlp::trace::to_grpc_insert_requests_from_spans( + &chunk, + ingest_ctx.pipeline, + ingest_ctx.pipeline_params, + ingest_ctx.table_name, + &ctx, + ingest_ctx.pipeline_handler.clone(), + )?; + + match self + .insert_trace_requests(requests, ingest_ctx.is_trace_v1_model, ctx.clone()) + .await + { + Ok(output) => { + Self::add_trace_write_cost(&mut ingest_state.outcome, output.meta.cost); + ingest_state.outcome.accepted_spans += chunk_rows; + for span in &chunk { + ingest_state.aux_data.observe_span(span); + } + } + Err(err) => match Self::classify_trace_chunk_failure(err.status_code()) { + ChunkFailureReaction::RetryPerSpan => { + Self::push_trace_failure_message( + &mut ingest_state.failure_messages, + ChunkFailureReaction::RetryPerSpan.as_metric_label(), + format!("Chunk fallback triggered by {}", err.status_code().as_ref()), + ); + // Only deterministic failures are retried span by span. + // This includes schemaless table or column creation paths for + // trace ingestion. Ambiguous failures are handled below + // without retrying because the chunk may already have been + // ingested. + self.ingest_trace_chunk_span_by_span( + ingest_ctx, + chunk, + ctx.clone(), + ingest_state, + ) + .await?; + } + ChunkFailureReaction::DiscardChunk => { + ingest_state.outcome.rejected_spans += chunk.len(); + Self::push_trace_failure_message( + &mut ingest_state.failure_messages, + ChunkFailureReaction::DiscardChunk.as_metric_label(), + format!( + "Discarded {} spans after ambiguous chunk failure ({})", + chunk.len(), + err.status_code().as_ref() + ), + ); + // TODO(shuiyisong): Add an idempotent retry-safe recovery path for + // ambiguous chunk failures such as timeout-like errors. + } + // Retryable or ambiguous failures must fail the request instead of + // becoming partial success. This path is not retry-safe because the + // chunk may already have been committed before the error surfaced. + ChunkFailureReaction::Propagate => { + Self::push_trace_failure_message( + &mut ingest_state.failure_messages, + ChunkFailureReaction::Propagate.as_metric_label(), + format!( + "Propagating retryable chunk failure ({})", + err.status_code().as_ref() + ), + ); + return Err(err); + } + }, + } + + Ok(()) + } + + /// Retry spans one by one only after a deterministic chunk failure. + async fn ingest_trace_chunk_span_by_span( + &self, + ingest_ctx: &TraceChunkIngestContext<'_>, + chunk: Vec, + ctx: QueryContextRef, + ingest_state: &mut TraceIngestState, + ) -> ServerResult<()> { + for span in chunk { + let (requests, rows) = otlp::trace::to_grpc_insert_requests_from_spans( + std::slice::from_ref(&span), + ingest_ctx.pipeline, + ingest_ctx.pipeline_params, + ingest_ctx.table_name, + &ctx, + ingest_ctx.pipeline_handler.clone(), + )?; + + match self + .insert_trace_requests(requests, ingest_ctx.is_trace_v1_model, ctx.clone()) + .await + { + Ok(output) => { + Self::add_trace_write_cost(&mut ingest_state.outcome, output.meta.cost); + ingest_state.outcome.accepted_spans += rows; + ingest_state.aux_data.observe_span(&span); + } + Err(err) => { + if Self::should_propagate_trace_span_failure(err.status_code()) { + Self::push_trace_failure_message( + &mut ingest_state.failure_messages, + ChunkFailureReaction::Propagate.as_metric_label(), + format!( + "Propagating retryable span failure for {}:{} ({})", + span.trace_id, + span.span_id, + err.status_code().as_ref() + ), + ); + return Err(err); + } + + ingest_state.outcome.rejected_spans += 1; + Self::push_trace_failure_message( + &mut ingest_state.failure_messages, + "span_rejected", + format!( + "Rejected span {}:{} ({})", + span.trace_id, + span.span_id, + err.status_code().as_ref() + ), + ); + } + } + } + + Ok(()) + } + + /// Reconcile and insert one trace request batch. + async fn insert_trace_requests( + &self, + mut requests: RowInsertRequests, + is_trace_v1_model: bool, + ctx: QueryContextRef, + ) -> ServerResult { + if is_trace_v1_model { + self.reconcile_trace_column_types(&mut requests, &ctx) + .await?; + self.handle_trace_inserts(requests, ctx) + .await + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu) + } else { + self.handle_log_inserts(requests, ctx) + .await + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu) + } + } + + fn classify_trace_chunk_failure(status: StatusCode) -> ChunkFailureReaction { + match status { + StatusCode::InvalidArguments + | StatusCode::InvalidSyntax + | StatusCode::Unsupported + | StatusCode::TableNotFound + | StatusCode::TableColumnNotFound => ChunkFailureReaction::RetryPerSpan, + StatusCode::DatabaseNotFound => ChunkFailureReaction::DiscardChunk, + StatusCode::Cancelled | StatusCode::DeadlineExceeded => ChunkFailureReaction::Propagate, + _ if status.is_retryable() => ChunkFailureReaction::Propagate, + _ => ChunkFailureReaction::DiscardChunk, + } + } + + fn should_propagate_trace_span_failure(status: StatusCode) -> bool { + matches!( + Self::classify_trace_chunk_failure(status), + ChunkFailureReaction::Propagate + ) + } + + fn add_trace_write_cost(outcome: &mut TraceIngestOutcome, cost: usize) { + outcome.write_cost += cost; + } + + fn push_trace_failure_message(messages: &mut Vec, label: &str, message: String) { + OTLP_TRACES_FAILURE_COUNT.with_label_values(&[label]).inc(); + + if messages.len() < TRACE_FAILURE_MESSAGE_LIMIT { + messages.push(message); + } else if messages.len() == TRACE_FAILURE_MESSAGE_LIMIT { + tracing::debug!( + label, + limit = TRACE_FAILURE_MESSAGE_LIMIT, + "Trace ingest failure message limit reached; suppressing additional failure details" + ); + } + } + + fn finish_trace_failure_message( + accepted_spans: usize, + rejected_spans: usize, + messages: Vec, + ) -> Option { + if rejected_spans == 0 && messages.is_empty() { + return None; + } + + let mut summary = format!( + "Accepted {} spans, rejected {} spans", + accepted_spans, rejected_spans + ); + + if !messages.is_empty() { + summary.push_str(": "); + summary.push_str(&messages.join("; ")); + } + + Some(summary) + } + /// Picks the final datatype for one trace column. /// /// Existing table schema is authoritative when present. Otherwise we resolve the @@ -428,3 +764,163 @@ fn push_observed_trace_type(observed_types: &mut Vec, datatype: observed_types.push(datatype); } } + +#[cfg(test)] +mod tests { + use common_error::status_code::StatusCode; + use servers::query_handler::TraceIngestOutcome; + + use super::{ChunkFailureReaction, Instance}; + use crate::metrics::OTLP_TRACES_FAILURE_COUNT; + + #[test] + fn test_classify_trace_chunk_failure() { + assert_eq!( + Instance::classify_trace_chunk_failure(StatusCode::InvalidArguments), + ChunkFailureReaction::RetryPerSpan + ); + assert_eq!( + Instance::classify_trace_chunk_failure(StatusCode::InvalidSyntax), + ChunkFailureReaction::RetryPerSpan + ); + assert_eq!( + Instance::classify_trace_chunk_failure(StatusCode::Unsupported), + ChunkFailureReaction::RetryPerSpan + ); + assert_eq!( + Instance::classify_trace_chunk_failure(StatusCode::TableColumnNotFound), + ChunkFailureReaction::RetryPerSpan + ); + assert_eq!( + Instance::classify_trace_chunk_failure(StatusCode::TableNotFound), + ChunkFailureReaction::RetryPerSpan + ); + assert_eq!( + Instance::classify_trace_chunk_failure(StatusCode::DatabaseNotFound), + ChunkFailureReaction::DiscardChunk + ); + assert_eq!( + Instance::classify_trace_chunk_failure(StatusCode::DeadlineExceeded), + ChunkFailureReaction::Propagate + ); + assert_eq!( + Instance::classify_trace_chunk_failure(StatusCode::Cancelled), + ChunkFailureReaction::Propagate + ); + assert_eq!( + Instance::classify_trace_chunk_failure(StatusCode::StorageUnavailable), + ChunkFailureReaction::Propagate + ); + assert_eq!( + Instance::classify_trace_chunk_failure(StatusCode::Internal), + ChunkFailureReaction::Propagate + ); + assert_eq!( + Instance::classify_trace_chunk_failure(StatusCode::RegionNotReady), + ChunkFailureReaction::Propagate + ); + assert_eq!( + Instance::classify_trace_chunk_failure(StatusCode::TableUnavailable), + ChunkFailureReaction::Propagate + ); + assert_eq!( + Instance::classify_trace_chunk_failure(StatusCode::RegionBusy), + ChunkFailureReaction::Propagate + ); + assert_eq!( + Instance::classify_trace_chunk_failure(StatusCode::RuntimeResourcesExhausted), + ChunkFailureReaction::Propagate + ); + } + + #[test] + fn test_classify_trace_span_failure() { + assert!(Instance::should_propagate_trace_span_failure( + StatusCode::DeadlineExceeded + )); + assert!(Instance::should_propagate_trace_span_failure( + StatusCode::StorageUnavailable + )); + assert!(!Instance::should_propagate_trace_span_failure( + StatusCode::InvalidArguments + )); + } + + #[test] + fn test_add_trace_write_cost() { + let mut outcome = TraceIngestOutcome::default(); + Instance::add_trace_write_cost(&mut outcome, 3); + Instance::add_trace_write_cost(&mut outcome, 5); + assert_eq!(outcome.write_cost, 8); + } + + #[test] + fn test_finish_trace_failure_message() { + let message = Instance::finish_trace_failure_message( + 3, + 2, + vec!["Rejected span trace:span (InvalidArguments)".to_string()], + ) + .unwrap(); + assert!(message.contains("Accepted 3 spans, rejected 2 spans")); + assert!(message.contains("Rejected span trace:span")); + + assert_eq!(Instance::finish_trace_failure_message(2, 0, vec![]), None); + } + + #[test] + fn test_finish_trace_failure_message_without_detail_messages() { + assert_eq!( + Instance::finish_trace_failure_message(0, 2, vec![]), + Some("Accepted 0 spans, rejected 2 spans".to_string()) + ); + } + + #[test] + fn test_push_trace_failure_message_increments_labeled_counter() { + let label = "retry_per_span_counter_test"; + let initial = OTLP_TRACES_FAILURE_COUNT.with_label_values(&[label]).get(); + let mut messages = Vec::new(); + + Instance::push_trace_failure_message( + &mut messages, + label, + "Chunk fallback triggered by InvalidArguments".to_string(), + ); + + assert_eq!(messages.len(), 1); + assert_eq!( + OTLP_TRACES_FAILURE_COUNT.with_label_values(&[label]).get(), + initial + 1 + ); + } + + #[test] + fn test_push_trace_failure_message_caps_recorded_messages() { + let label = "retry_per_span_limit_test"; + let mut messages = Vec::new(); + + for idx in 0..=4 { + Instance::push_trace_failure_message(&mut messages, label, format!("failure-{idx}")); + } + + assert_eq!(messages.len(), 4); + assert_eq!( + messages, + vec![ + "failure-0".to_string(), + "failure-1".to_string(), + "failure-2".to_string(), + "failure-3".to_string() + ] + ); + } + + #[test] + fn test_classify_trace_chunk_failure_defaults_to_discard() { + assert_eq!( + Instance::classify_trace_chunk_failure(StatusCode::Unknown), + ChunkFailureReaction::DiscardChunk + ); + } +} diff --git a/src/frontend/src/metrics.rs b/src/frontend/src/metrics.rs index 58ba21476a..aba33637cf 100644 --- a/src/frontend/src/metrics.rs +++ b/src/frontend/src/metrics.rs @@ -52,6 +52,14 @@ lazy_static! { ) .unwrap(); + /// The number of OpenTelemetry trace ingest failures on the frontend node. + pub static ref OTLP_TRACES_FAILURE_COUNT: IntCounterVec = register_int_counter_vec!( + "greptime_frontend_otlp_traces_failure_count", + "frontend otlp trace ingest failure count", + &["label"] + ) + .unwrap(); + /// The number of OpenTelemetry logs send by frontend node. pub static ref OTLP_LOGS_ROWS: IntCounter = register_int_counter!( "greptime_frontend_otlp_logs_rows", diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index 4fd2d42122..3d6057f046 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -29,7 +29,7 @@ use opentelemetry_proto::tonic::collector::logs::v1::{ }; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceResponse; use opentelemetry_proto::tonic::collector::trace::v1::{ - ExportTraceServiceRequest, ExportTraceServiceResponse, + ExportTracePartialSuccess, ExportTraceServiceRequest, ExportTraceServiceResponse, }; use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest; use pipeline::PipelineWay; @@ -175,11 +175,16 @@ pub async fn traces( query_ctx, ) .await - .map(|o| OtlpResponse { + .map(|outcome| OtlpResponse { resp_body: ExportTraceServiceResponse { - partial_success: None, + partial_success: outcome.error_message.map(|error_message| { + ExportTracePartialSuccess { + rejected_spans: outcome.rejected_spans as i64, + error_message, + } + }), }, - write_cost: o.meta.cost, + write_cost: outcome.write_cost, }) } diff --git a/src/servers/src/otlp/trace.rs b/src/servers/src/otlp/trace.rs index ca56f9b868..98f4441923 100644 --- a/src/servers/src/otlp/trace.rs +++ b/src/servers/src/otlp/trace.rs @@ -18,15 +18,17 @@ pub mod span; pub mod v0; pub mod v1; +use std::collections::HashSet; + use api::v1::RowInsertRequests; pub use common_catalog::consts::{ PARENT_SPAN_ID_COLUMN, SPAN_ID_COLUMN, SPAN_NAME_COLUMN, TRACE_ID_COLUMN, }; -use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use pipeline::{GreptimePipelineParams, PipelineWay}; use session::context::QueryContextRef; use crate::error::{NotSupportedSnafu, Result}; +use crate::otlp::trace::span::TraceSpan; use crate::query_handler::PipelineHandlerRef; // column names @@ -65,27 +67,58 @@ pub const SPAN_STATUS_PREFIX: &str = "STATUS_CODE_"; pub const SPAN_STATUS_UNSET: &str = "STATUS_CODE_UNSET"; pub const SPAN_STATUS_ERROR: &str = "STATUS_CODE_ERROR"; -/// Convert SpanTraces to GreptimeDB row insert requests. -/// Returns `InsertRequests` and total number of rows to ingest -pub fn to_grpc_insert_requests( - request: ExportTraceServiceRequest, - pipeline: PipelineWay, - pipeline_params: GreptimePipelineParams, - table_name: String, +/// Deduplicated auxiliary trace entities derived from successfully ingested +/// spans. +/// +/// The main trace table is written first. Once a span is confirmed accepted, we +/// record the service and operation tuples here so the auxiliary tables can be +/// updated separately without affecting span acceptance accounting. +#[derive(Debug, Default)] +pub struct TraceAuxData { + pub services: HashSet, + pub operations: HashSet<(String, String, String)>, +} + +impl TraceAuxData { + /// Records the auxiliary service and operation rows implied by one accepted + /// span. + pub fn observe_span(&mut self, span: &TraceSpan) { + if let Some(service_name) = &span.service_name { + self.services.insert(service_name.clone()); + self.operations.insert(( + service_name.clone(), + span.span_name.clone(), + span.span_kind.clone(), + )); + } + } + + /// Returns true when no auxiliary table updates are needed. + pub fn is_empty(&self) -> bool { + self.services.is_empty() && self.operations.is_empty() + } +} + +/// Convert a subset of trace spans to GreptimeDB row insert requests. +pub fn to_grpc_insert_requests_from_spans( + spans: &[TraceSpan], + pipeline: &PipelineWay, + pipeline_params: &GreptimePipelineParams, + table_name: &str, query_ctx: &QueryContextRef, pipeline_handler: PipelineHandlerRef, ) -> Result<(RowInsertRequests, usize)> { match pipeline { - PipelineWay::OtlpTraceDirectV0 => v0::v0_to_grpc_insert_requests( - request, + PipelineWay::OtlpTraceDirectV0 => v0::v0_to_grpc_main_insert_requests( + spans, pipeline, pipeline_params, table_name, query_ctx, pipeline_handler, ), - PipelineWay::OtlpTraceDirectV1 => v1::v1_to_grpc_insert_requests( - request, + PipelineWay::OtlpTraceDirectV1 => v1::v1_to_grpc_main_insert_requests( + spans, pipeline, pipeline_params, table_name, @@ -98,3 +131,23 @@ pub fn to_grpc_insert_requests( .fail(), } } + +/// Build insert requests for the auxiliary trace tables derived from accepted +/// spans. +/// +/// "Aux" here refers to the trace service and trace operation tables, not the +/// main trace span table itself. +pub fn to_grpc_insert_requests_for_aux_tables( + aux_data: TraceAuxData, + pipeline: &PipelineWay, + table_name: &str, +) -> Result<(RowInsertRequests, usize)> { + match pipeline { + PipelineWay::OtlpTraceDirectV0 => v0::build_aux_table_requests(aux_data, table_name), + PipelineWay::OtlpTraceDirectV1 => v1::build_aux_table_requests(aux_data, table_name), + _ => NotSupportedSnafu { + feat: "Unsupported pipeline for trace", + } + .fail(), + } +} diff --git a/src/servers/src/otlp/trace/span.rs b/src/servers/src/otlp/trace/span.rs index d96bc17277..19103240f6 100644 --- a/src/servers/src/otlp/trace/span.rs +++ b/src/servers/src/otlp/trace/span.rs @@ -53,6 +53,18 @@ pub struct TraceSpan { pub type TraceSpans = Vec; +#[derive(Debug, Clone)] +pub struct TraceSpanGroup { + pub service_name: Option, + pub resource_attributes: Attributes, + pub scope_name: String, + pub scope_version: String, + pub scope_attributes: Attributes, + pub spans: TraceSpans, +} + +pub type TraceSpanGroups = Vec; + #[derive(Debug, Clone, Serialize)] pub struct SpanLink { pub trace_id: String, @@ -241,14 +253,13 @@ pub fn status_to_string(status: &Option) -> (String, String) { /// See /// /// for data structure of OTLP traces. -pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans { - let span_size = request +pub fn parse(request: ExportTraceServiceRequest) -> TraceSpanGroups { + let group_size = request .resource_spans .iter() .flat_map(|res| res.scope_spans.iter()) - .flat_map(|scope| scope.spans.iter()) .count(); - let mut spans = Vec::with_capacity(span_size); + let mut groups = Vec::with_capacity(group_size); for resource_spans in request.resource_spans { let resource_attrs = resource_spans .resource @@ -268,6 +279,7 @@ pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans { for scope_spans in resource_spans.scope_spans { let scope = scope_spans.scope.unwrap_or_default(); + let mut spans = Vec::with_capacity(scope_spans.spans.len()); for span in scope_spans.spans { spans.push(parse_span( service_name.clone(), @@ -276,16 +288,47 @@ pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans { span, )); } + groups.push(TraceSpanGroup { + service_name: service_name.clone(), + resource_attributes: Attributes::from(&resource_attrs[..]), + scope_name: scope.name, + scope_version: scope.version, + scope_attributes: Attributes::from(scope.attributes), + spans, + }); } } - spans + groups } #[cfg(test)] mod tests { - use opentelemetry_proto::tonic::trace::v1::Status; + use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; + use opentelemetry_proto::tonic::common::v1::{ + AnyValue, InstrumentationScope, KeyValue, any_value, + }; + use opentelemetry_proto::tonic::resource::v1::Resource; + use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span, Status}; - use crate::otlp::trace::span::{bytes_to_hex_string, status_to_string}; + use crate::otlp::trace::KEY_SERVICE_NAME; + use crate::otlp::trace::span::{bytes_to_hex_string, parse, status_to_string}; + + fn make_kv(key: &str, value: &str) -> KeyValue { + KeyValue { + key: key.to_string(), + value: Some(AnyValue { + value: Some(any_value::Value::StringValue(value.to_string())), + }), + } + } + + fn make_span(trace_id: u8, span_id: u8) -> Span { + Span { + trace_id: vec![trace_id; 16], + span_id: vec![span_id; 8], + ..Default::default() + } + } #[test] fn test_bytes_to_hex_string() { @@ -315,4 +358,62 @@ mod tests { status_to_string(&Some(status)), ); } + + #[test] + fn test_parse_preserves_resource_scope_groups() { + let request = ExportTraceServiceRequest { + resource_spans: vec![ + ResourceSpans { + resource: Some(Resource { + attributes: vec![make_kv(KEY_SERVICE_NAME, "svc-a")], + ..Default::default() + }), + scope_spans: vec![ + ScopeSpans { + scope: Some(InstrumentationScope { + name: "scope-1".to_string(), + ..Default::default() + }), + spans: vec![make_span(0x11, 0x21), make_span(0x12, 0x22)], + ..Default::default() + }, + ScopeSpans { + scope: Some(InstrumentationScope { + name: "scope-2".to_string(), + ..Default::default() + }), + spans: vec![make_span(0x13, 0x23)], + ..Default::default() + }, + ], + ..Default::default() + }, + ResourceSpans { + resource: Some(Resource { + attributes: vec![make_kv(KEY_SERVICE_NAME, "svc-b")], + ..Default::default() + }), + scope_spans: vec![ScopeSpans { + scope: Some(InstrumentationScope { + name: "scope-3".to_string(), + ..Default::default() + }), + spans: vec![make_span(0x14, 0x24)], + ..Default::default() + }], + ..Default::default() + }, + ], + }; + + let groups = parse(request); + assert_eq!(groups.len(), 3); + assert_eq!(groups[0].service_name.as_deref(), Some("svc-a")); + assert_eq!(groups[0].scope_name, "scope-1"); + assert_eq!(groups[0].spans.len(), 2); + assert_eq!(groups[1].scope_name, "scope-2"); + assert_eq!(groups[1].spans.len(), 1); + assert_eq!(groups[2].service_name.as_deref(), Some("svc-b")); + assert_eq!(groups[2].scope_name, "scope-3"); + } } diff --git a/src/servers/src/otlp/trace/v0.rs b/src/servers/src/otlp/trace/v0.rs index b52b406fb2..fa10dcc00f 100644 --- a/src/servers/src/otlp/trace/v0.rs +++ b/src/servers/src/otlp/trace/v0.rs @@ -18,16 +18,16 @@ use api::v1::value::ValueData; use api::v1::{ColumnDataType, RowInsertRequests}; use common_catalog::consts::{trace_operations_table_name, trace_services_table_name}; use common_grpc::precision::Precision; -use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use pipeline::{GreptimePipelineParams, PipelineWay}; use session::context::QueryContextRef; use crate::error::Result; -use crate::otlp::trace::span::{TraceSpan, parse}; +use crate::otlp::trace::span::TraceSpan; use crate::otlp::trace::{ DURATION_NANO_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_MESSAGE_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_STATE_COLUMN, + TraceAuxData, }; use crate::otlp::utils::{make_column_data, make_string_column_data}; use crate::query_handler::PipelineHandlerRef; @@ -38,56 +38,52 @@ const APPROXIMATE_COLUMN_COUNT: usize = 24; // Use a timestamp(2100-01-01 00:00:00) as large as possible. const MAX_TIMESTAMP: i64 = 4102444800000000000; -/// Convert SpanTraces to GreptimeDB row insert requests. -/// Returns `InsertRequests` and total number of rows to ingest -pub fn v0_to_grpc_insert_requests( - request: ExportTraceServiceRequest, - _pipeline: PipelineWay, - _pipeline_params: GreptimePipelineParams, - table_name: String, +/// Converts trace spans into row insert requests for the main v0 trace table. +/// +/// Auxiliary service and operation table writes are built separately so the +/// caller can update them only after the main span write succeeds. +pub fn v0_to_grpc_main_insert_requests( + spans: &[TraceSpan], + _pipeline: &PipelineWay, + _pipeline_params: &GreptimePipelineParams, + table_name: &str, _query_ctx: &QueryContextRef, _pipeline_handler: PipelineHandlerRef, ) -> Result<(RowInsertRequests, usize)> { - let spans = parse(request); let mut multi_table_writer = MultiTableData::default(); + let trace_writer = build_trace_table_data(spans)?; + multi_table_writer.add_table_data(table_name, trace_writer); + + Ok(multi_table_writer.into_row_insert_requests()) +} + +/// Builds the row-oriented payload for the main v0 trace table. +pub fn build_trace_table_data(spans: &[TraceSpan]) -> Result { let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len()); + for span in spans.iter().cloned() { + write_span_to_row(&mut trace_writer, span)?; + } + + Ok(trace_writer) +} + +/// Builds row insert requests for the v0 trace auxiliary tables. +pub fn build_aux_table_requests( + aux_data: TraceAuxData, + table_name: &str, +) -> Result<(RowInsertRequests, usize)> { + let mut multi_table_writer = MultiTableData::default(); let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1); let mut trace_operations_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1); - let mut services = HashSet::new(); - let mut operations = HashSet::new(); - for span in spans { - if let Some(service_name) = &span.service_name { - // Only insert the service name if it's not already in the set. - if !services.contains(service_name) { - services.insert(service_name.clone()); - } - - // Collect operations (service_name + span_name + span_kind). - let operation = ( - service_name.clone(), - span.span_name.clone(), - span.span_kind.clone(), - ); - if !operations.contains(&operation) { - operations.insert(operation); - } - } - write_span_to_row(&mut trace_writer, span)?; - } - write_trace_services_to_row(&mut trace_services_writer, services)?; - write_trace_operations_to_row(&mut trace_operations_writer, operations)?; + write_trace_services_to_row(&mut trace_services_writer, aux_data.services)?; + write_trace_operations_to_row(&mut trace_operations_writer, aux_data.operations)?; + multi_table_writer.add_table_data(trace_services_table_name(table_name), trace_services_writer); multi_table_writer.add_table_data( - trace_services_table_name(&table_name), - trace_services_writer, - ); - multi_table_writer.add_table_data( - trace_operations_table_name(&table_name), + trace_operations_table_name(table_name), trace_operations_writer, ); - multi_table_writer.add_table_data(table_name, trace_writer); - Ok(multi_table_writer.into_row_insert_requests()) } @@ -232,3 +228,63 @@ fn write_trace_operations_to_row( Ok(()) } + +#[cfg(test)] +mod tests { + use super::{build_aux_table_requests, build_trace_table_data}; + use crate::otlp::trace::TraceAuxData; + use crate::otlp::trace::attributes::Attributes; + use crate::otlp::trace::span::{SpanEvents, SpanLinks, TraceSpan}; + + fn make_span(service_name: &str, trace_id: &str, span_id: &str) -> TraceSpan { + TraceSpan { + service_name: Some(service_name.to_string()), + trace_id: trace_id.to_string(), + span_id: span_id.to_string(), + parent_span_id: None, + resource_attributes: Attributes::from(vec![]), + scope_name: "scope".to_string(), + scope_version: "v1".to_string(), + scope_attributes: Attributes::from(vec![]), + trace_state: String::new(), + span_name: "op".to_string(), + span_kind: "SPAN_KIND_SERVER".to_string(), + span_status_code: "STATUS_CODE_UNSET".to_string(), + span_status_message: String::new(), + span_attributes: Attributes::from(vec![]), + span_events: SpanEvents::from(vec![]), + span_links: SpanLinks::from(vec![]), + start_in_nanosecond: 1, + end_in_nanosecond: 2, + } + } + + #[test] + fn test_build_trace_table_data_from_span_subset() { + let spans = [ + make_span("svc-a", "trace-a", "span-a"), + make_span("svc-b", "trace-b", "span-b"), + ]; + + let writer = build_trace_table_data(&spans[..1]).unwrap(); + let (_, rows) = writer.into_schema_and_rows(); + assert_eq!(rows.len(), 1); + } + + #[test] + fn test_build_aux_table_requests_deduplicates_services_and_operations() { + let spans = vec![ + make_span("svc-a", "trace-a", "span-a"), + make_span("svc-a", "trace-b", "span-b"), + ]; + let mut aux_data = TraceAuxData::default(); + for span in &spans { + aux_data.observe_span(span); + } + + let (requests, total_rows) = + build_aux_table_requests(aux_data, "opentelemetry_traces").unwrap(); + assert_eq!(requests.inserts.len(), 2); + assert_eq!(total_rows, 2); + } +} diff --git a/src/servers/src/otlp/trace/v1.rs b/src/servers/src/otlp/trace/v1.rs index 11e986de04..cce6891b0d 100644 --- a/src/servers/src/otlp/trace/v1.rs +++ b/src/servers/src/otlp/trace/v1.rs @@ -18,19 +18,18 @@ use api::v1::value::ValueData; use api::v1::{ColumnDataType, RowInsertRequests, Value}; use common_catalog::consts::{trace_operations_table_name, trace_services_table_name}; use common_grpc::precision::Precision; -use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue; use pipeline::{GreptimePipelineParams, PipelineWay}; use session::context::QueryContextRef; use crate::error::Result; use crate::otlp::trace::attributes::Attributes; -use crate::otlp::trace::span::{TraceSpan, parse}; +use crate::otlp::trace::span::TraceSpan; use crate::otlp::trace::{ DURATION_NANO_COLUMN, KEY_SERVICE_NAME, PARENT_SPAN_ID_COLUMN, SCOPE_NAME_COLUMN, SCOPE_VERSION_COLUMN, SERVICE_NAME_COLUMN, SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_MESSAGE_COLUMN, - TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_STATE_COLUMN, + TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_STATE_COLUMN, TraceAuxData, }; use crate::otlp::utils::{any_value_to_jsonb, make_column_data, make_string_column_data}; use crate::query_handler::PipelineHandlerRef; @@ -41,64 +40,52 @@ const APPROXIMATE_COLUMN_COUNT: usize = 30; // Use a timestamp(2100-01-01 00:00:00) as large as possible. const MAX_TIMESTAMP: i64 = 4102444800000000000; -/// Convert SpanTraces to GreptimeDB row insert requests. -/// Returns `InsertRequests` and total number of rows to ingest +/// Converts trace spans into row insert requests for the main v1 trace table. /// -/// Compared with v0, this v1 implementation: -/// 1. flattens all attribute data into columns. -/// 2. treat `span_id` and `parent_trace_id` as fields. -/// 3. removed `service_name` column because it's already in -/// `resource_attributes.service_name` -/// -/// For other compound data structures like span_links and span_events here we -/// are still using `json` data structure. -pub fn v1_to_grpc_insert_requests( - request: ExportTraceServiceRequest, - _pipeline: PipelineWay, - _pipeline_params: GreptimePipelineParams, - table_name: String, +/// Auxiliary service and operation table writes are built separately so the +/// caller can update them only after the main span write succeeds. +pub fn v1_to_grpc_main_insert_requests( + spans: &[TraceSpan], + _pipeline: &PipelineWay, + _pipeline_params: &GreptimePipelineParams, + table_name: &str, _query_ctx: &QueryContextRef, _pipeline_handler: PipelineHandlerRef, ) -> Result<(RowInsertRequests, usize)> { - let spans = parse(request); let mut multi_table_writer = MultiTableData::default(); + let trace_writer = build_trace_table_data(spans)?; + multi_table_writer.add_table_data(table_name, trace_writer); + + Ok(multi_table_writer.into_row_insert_requests()) +} + +/// Builds the row-oriented payload for the main v1 trace table. +pub fn build_trace_table_data(spans: &[TraceSpan]) -> Result { let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len()); + for span in spans.iter().cloned() { + write_span_to_row(&mut trace_writer, span)?; + } + + Ok(trace_writer) +} + +/// Builds row insert requests for the v1 trace auxiliary tables. +pub fn build_aux_table_requests( + aux_data: TraceAuxData, + table_name: &str, +) -> Result<(RowInsertRequests, usize)> { + let mut multi_table_writer = MultiTableData::default(); let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1); let mut trace_operations_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1); - let mut services = HashSet::new(); - let mut operations = HashSet::new(); - for span in spans { - if let Some(service_name) = &span.service_name { - // Only insert the service name if it's not already in the set. - if !services.contains(service_name) { - services.insert(service_name.clone()); - } - - // Only insert the operation if it's not already in the set. - let operation = ( - service_name.clone(), - span.span_name.clone(), - span.span_kind.clone(), - ); - if !operations.contains(&operation) { - operations.insert(operation); - } - } - write_span_to_row(&mut trace_writer, span)?; - } - write_trace_services_to_row(&mut trace_services_writer, services)?; - write_trace_operations_to_row(&mut trace_operations_writer, operations)?; + write_trace_services_to_row(&mut trace_services_writer, aux_data.services)?; + write_trace_operations_to_row(&mut trace_operations_writer, aux_data.operations)?; + multi_table_writer.add_table_data(trace_services_table_name(table_name), trace_services_writer); multi_table_writer.add_table_data( - trace_services_table_name(&table_name), - trace_services_writer, - ); - multi_table_writer.add_table_data( - trace_operations_table_name(&table_name), + trace_operations_table_name(table_name), trace_operations_writer, ); - multi_table_writer.add_table_data(table_name, trace_writer); Ok(multi_table_writer.into_row_insert_requests()) } @@ -319,7 +306,9 @@ mod tests { use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue}; use super::*; + use crate::otlp::trace::TraceAuxData; use crate::otlp::trace::attributes::Attributes; + use crate::otlp::trace::span::{SpanEvents, SpanLinks}; use crate::row_writer::TableData; fn make_kv(key: &str, value: OtlpValue) -> KeyValue { @@ -329,6 +318,29 @@ mod tests { } } + fn make_span(service_name: &str, trace_id: &str, span_id: &str) -> TraceSpan { + TraceSpan { + service_name: Some(service_name.to_string()), + trace_id: trace_id.to_string(), + span_id: span_id.to_string(), + parent_span_id: None, + resource_attributes: Attributes::from(vec![]), + scope_name: "scope".to_string(), + scope_version: "v1".to_string(), + scope_attributes: Attributes::from(vec![]), + trace_state: String::new(), + span_name: "op".to_string(), + span_kind: "SPAN_KIND_SERVER".to_string(), + span_status_code: "STATUS_CODE_UNSET".to_string(), + span_status_message: String::new(), + span_attributes: Attributes::from(vec![]), + span_events: SpanEvents::from(vec![]), + span_links: SpanLinks::from(vec![]), + start_in_nanosecond: 1, + end_in_nanosecond: 2, + } + } + #[test] fn test_keep_mixed_numeric_values_until_frontend_reconciliation() { let mut writer = TableData::new(4, 2); @@ -520,5 +532,22 @@ mod tests { Some(ValueData::StringValue("false".to_string())) ); } + + #[test] + fn test_build_aux_table_requests_deduplicates_services_and_operations() { + let spans = vec![ + make_span("svc-a", "trace-a", "span-a"), + make_span("svc-a", "trace-b", "span-b"), + ]; + let mut aux_data = TraceAuxData::default(); + for span in &spans { + aux_data.observe_span(span); + } + + let (requests, total_rows) = + build_aux_table_requests(aux_data, "opentelemetry_traces").unwrap(); + assert_eq!(requests.inserts.len(), 2); + assert_eq!(total_rows, 2); + } // Conversion matrix coverage lives in the shared coercion helper tests. } diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index b55502e742..d4b272de12 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -63,6 +63,14 @@ pub type PipelineHandlerRef = Arc; pub type LogQueryHandlerRef = Arc; pub type JaegerQueryHandlerRef = Arc; +#[derive(Debug, Default, Clone)] +pub struct TraceIngestOutcome { + pub write_cost: usize, + pub accepted_spans: usize, + pub rejected_spans: usize, + pub error_message: Option, +} + #[async_trait] pub trait InfluxdbLineProtocolHandler { /// A successful request will not return a response. @@ -123,7 +131,7 @@ pub trait OpenTelemetryProtocolHandler: PipelineHandler { pipeline_params: GreptimePipelineParams, table_name: String, ctx: QueryContextRef, - ) -> Result; + ) -> Result; async fn logs( &self, diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index c0d858a592..36ddb1bb38 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -40,7 +40,9 @@ use loki_proto::logproto::{EntryAdapter, LabelPairAdapter, PushRequest, StreamAd use loki_proto::prost_types::Timestamp; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; -use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; +use opentelemetry_proto::tonic::collector::trace::v1::{ + ExportTraceServiceRequest, ExportTraceServiceResponse, +}; use pipeline::GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME; use prost::Message; use serde_json::{Value, json}; @@ -5572,26 +5574,83 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) { ], ); let res = send_trace_v1_req(&client, abort_table_name, abort_req, false).await; - assert_eq!(StatusCode::BAD_REQUEST, res.status()); - let body: Value = res.json().await; + assert_eq!(StatusCode::OK, res.status()); + let body = ExportTraceServiceResponse::decode(res.bytes().await).unwrap(); + let partial_success = body.partial_success.as_ref().unwrap(); + assert_eq!(partial_success.rejected_spans, 1); assert!( - body["error"].as_str().unwrap().contains( - "failed to coerce trace column 'span_attributes.attr_int' in table 'trace_type_abort'" + partial_success + .error_message + .contains("Accepted 1 spans, rejected 1 spans"), + "unexpected partial success body: {body:?}" + ); + assert!( + partial_success.error_message.contains( + "Rejected span 00000000000000000000000000000013:0000000000000013 (InvalidArguments)" ), - "unexpected error body: {body}" + "unexpected partial success body: {body:?}" ); validate_data( "otlp_traces_v1_type_abort_rows", &client, &format!( - "select trace_id, \"span_attributes.attr_int\" from {} order by trace_id;", + "select trace_id, \"span_attributes.attr_int\" from {} order by trace_id", abort_table_name ), - r#"[["00000000000000000000000000000011",10]]"#, + r#"[["00000000000000000000000000000011",10],["00000000000000000000000000000012",20]]"#, ) .await; + let chunk_failure_req = make_trace_v1_request( + "type-discard", + vec![ + make_trace_v1_span( + "00000000000000000000000000000021", + "0000000000000021", + "discard-one", + 1_736_480_942_445_400_000, + 1_736_480_942_445_500_000, + vec![make_string_attr("attr_text", "alpha")], + ), + make_trace_v1_span( + "00000000000000000000000000000022", + "0000000000000022", + "discard-two", + 1_736_480_942_445_600_000, + 1_736_480_942_445_700_000, + vec![make_string_attr("attr_text", "beta")], + ), + ], + ); + let res = send_trace_v1_req_with_db( + &client, + "nonexistent", + "trace_chunk_discard", + chunk_failure_req, + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + let body = ExportTraceServiceResponse::decode(res.bytes().await).unwrap(); + let partial_success = body.partial_success.as_ref().unwrap(); + assert_eq!(partial_success.rejected_spans, 2); + assert!( + partial_success + .error_message + .contains("Accepted 0 spans, rejected 2 spans"), + "unexpected partial success body: {body:?}" + ); + assert!( + partial_success + .error_message + .contains("Chunk fallback triggered by") + || partial_success + .error_message + .contains("Discarded 2 spans after ambiguous chunk failure"), + "unexpected partial success body: {body:?}" + ); + guard.remove_all().await; } @@ -7829,6 +7888,16 @@ async fn send_trace_v1_req( table_name: &str, req: ExportTraceServiceRequest, with_gzip: bool, +) -> TestResponse { + send_trace_v1_req_with_db(client, "public", table_name, req, with_gzip).await +} + +async fn send_trace_v1_req_with_db( + client: &TestClient, + db_name: &str, + table_name: &str, + req: ExportTraceServiceRequest, + with_gzip: bool, ) -> TestResponse { send_req( client, @@ -7845,6 +7914,10 @@ async fn send_trace_v1_req( HeaderName::from_static("x-greptime-trace-table-name"), HeaderValue::from_str(table_name).unwrap(), ), + ( + GREPTIME_DB_HEADER_NAME.clone(), + HeaderValue::from_str(db_name).unwrap(), + ), ], "/v1/otlp/v1/traces", req.encode_to_vec(),