From a9256f031072e48a05816ec6b86aadb091e64fb8 Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Fri, 3 Apr 2026 12:13:44 +0800 Subject: [PATCH] refactor: extract otel helper (#7910) * refactor: extract otel helper Signed-off-by: shuiyisong * chore: move to submodule Signed-off-by: shuiyisong --------- Signed-off-by: shuiyisong --- src/frontend/src/instance/otlp.rs | 257 +-------------- src/frontend/src/instance/otlp/trace_types.rs | 308 ++++++++++++++++++ 2 files changed, 317 insertions(+), 248 deletions(-) create mode 100644 src/frontend/src/instance/otlp/trace_types.rs diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index 8b3f8b3eec..75168b3b9a 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -36,10 +36,7 @@ use servers::http::prom_store::PHYSICAL_TABLE_PARAM; use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef}; use servers::otlp; use servers::otlp::trace::TraceAuxData; -use servers::otlp::trace::coerce::{ - coerce_value_data, is_supported_trace_coercion, resolve_new_trace_column_type, - trace_value_datatype, -}; +use servers::otlp::trace::coerce::{coerce_value_data, trace_value_datatype}; use servers::otlp::trace::span::{TraceSpan, TraceSpanGroup}; use servers::query_handler::{ OpenTelemetryProtocolHandler, PipelineHandlerRef, TraceIngestOutcome, @@ -49,10 +46,16 @@ use snafu::{IntoError, ResultExt}; use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM}; use crate::instance::Instance; +use crate::instance::otlp::trace_types::{ + PendingTraceColumnRewrite, choose_trace_reconcile_decision, enrich_trace_reconcile_error, + is_trace_reconcile_candidate_type, push_observed_trace_type, validate_trace_column_rewrites, +}; use crate::metrics::{ OTLP_LOGS_ROWS, OTLP_METRICS_ROWS, OTLP_TRACES_FAILURE_COUNT, OTLP_TRACES_ROWS, }; +pub mod trace_types; + const TRACE_INGEST_CHUNK_SIZE: usize = 64; const TRACE_FAILURE_MESSAGE_LIMIT: usize = 4; @@ -63,33 +66,6 @@ enum ChunkFailureReaction { Propagate, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum TraceReconcileDecision { - UseExisting(ColumnDataType), - UseRequestLocal(ColumnDataType), - AlterExistingTo(ColumnDataType), -} - -impl TraceReconcileDecision { - fn target_type(self) -> ColumnDataType { - match self { - Self::UseExisting(target_type) - | Self::UseRequestLocal(target_type) - | Self::AlterExistingTo(target_type) => target_type, - } - } - - fn requires_alter(self) -> bool { - matches!(self, Self::AlterExistingTo(_)) - } -} - -struct PendingTraceColumnRewrite { - col_idx: usize, - target_type: ColumnDataType, - column_name: String, -} - impl ChunkFailureReaction { fn as_metric_label(self) -> &'static str { match self { @@ -576,52 +552,6 @@ impl Instance { Some(summary) } - /// Picks the reconciliation action for one trace column. - /// - /// Existing table schema is authoritative unless the only incompatible case is - /// widening an existing Int64 column to Float64 for incoming Int64/Float64 data. - fn choose_trace_reconcile_decision( - observed_types: &[ColumnDataType], - existing_type: Option, - ) -> ServerResult> { - let Some(existing_type) = existing_type else { - return resolve_new_trace_column_type(observed_types.iter().copied()) - .map(|target_type| target_type.map(TraceReconcileDecision::UseRequestLocal)) - .map_err(|_| { - error::InvalidParameterSnafu { - reason: "unsupported trace type mix".to_string(), - } - .build() - }); - }; - - if observed_types.iter().all(|&request_type| { - request_type == existing_type - || is_supported_trace_coercion(request_type, existing_type) - }) { - return Ok(Some(TraceReconcileDecision::UseExisting(existing_type))); - } - - if existing_type == ColumnDataType::Int64 - && observed_types.contains(&ColumnDataType::Float64) - && observed_types.iter().all(|observed_type| { - matches!( - observed_type, - ColumnDataType::Int64 | ColumnDataType::Float64 - ) - }) - { - return Ok(Some(TraceReconcileDecision::AlterExistingTo( - ColumnDataType::Float64, - ))); - } - - error::InvalidParameterSnafu { - reason: "unsupported trace type mix".to_string(), - } - .fail() - } - /// Widen existing trace table columns to Float64 before request rewrite. async fn alter_trace_table_columns_to_float64( &self, @@ -763,7 +693,7 @@ impl Instance { // Decide the final type once per column, then rewrite all affected cells // together in one row pass below. let Some(decision) = - Self::choose_trace_reconcile_decision(&observed_types, existing_type).map_err( + choose_trace_reconcile_decision(&observed_types, existing_type).map_err( |_| { enrich_trace_reconcile_error( &req.table_name, @@ -860,44 +790,6 @@ impl Instance { } } -/// Validate all pending trace column rewrites before any schema mutation happens. -fn validate_trace_column_rewrites( - rows: &[api::v1::Row], - pending_rewrites: &[PendingTraceColumnRewrite], - table_name: &str, -) -> ServerResult<()> { - for row in rows { - for pending_rewrite in pending_rewrites { - let Some(value) = row.values.get(pending_rewrite.col_idx) else { - continue; - }; - let Some(request_type) = value.value_data.as_ref().and_then(trace_value_datatype) - else { - continue; - }; - if request_type == pending_rewrite.target_type { - continue; - } - - coerce_value_data(&value.value_data, pending_rewrite.target_type, request_type) - .map_err(|_| { - error::InvalidParameterSnafu { - reason: format!( - "failed to coerce trace column '{}' in table '{}' from {:?} to {:?}", - pending_rewrite.column_name, - table_name, - request_type, - pending_rewrite.target_type - ), - } - .build() - })?; - } - } - - Ok(()) -} - /// Preserve the original alter failure status so chunk retry behavior stays correct. fn wrap_trace_alter_failure(err: E) -> servers::error::Error where @@ -906,64 +798,13 @@ where error::ExecuteGrpcQuerySnafu.into_error(BoxedError::new(err)) } -fn enrich_trace_reconcile_error( - table_name: &str, - column_name: &str, - observed_types: &[ColumnDataType], - existing_type: Option, -) -> servers::error::Error { - let observed_types = observed_types - .iter() - .map(|datatype| format!("{datatype:?}")) - .collect::>() - .join(", "); - - error::InvalidParameterSnafu { - reason: match existing_type { - Some(existing_type) => format!( - "failed to reconcile trace column '{}' in table '{}' with observed types [{}] against existing {:?}", - column_name, table_name, observed_types, existing_type - ), - None => format!( - "failed to reconcile trace column '{}' in table '{}' with observed types [{}]", - column_name, table_name, observed_types - ), - }, - } - .build() -} - -/// Only these trace scalar types participate in reconciliation. Other column kinds -/// such as JSON and binary keep their original write path and schema checks. -fn is_trace_reconcile_candidate_type(datatype: ColumnDataType) -> bool { - matches!( - datatype, - ColumnDataType::String - | ColumnDataType::Boolean - | ColumnDataType::Int64 - | ColumnDataType::Float64 - ) -} - -/// Keeps the observed type list small without depending on enum ordering. -fn push_observed_trace_type(observed_types: &mut Vec, datatype: ColumnDataType) { - if !observed_types.contains(&datatype) { - observed_types.push(datatype); - } -} - #[cfg(test)] mod tests { - use api::v1::value::ValueData; - use api::v1::{ColumnDataType, Row, Value}; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use servers::query_handler::TraceIngestOutcome; - use super::{ - ChunkFailureReaction, Instance, PendingTraceColumnRewrite, TraceReconcileDecision, - validate_trace_column_rewrites, wrap_trace_alter_failure, - }; + use super::{ChunkFailureReaction, Instance, wrap_trace_alter_failure}; use crate::metrics::OTLP_TRACES_FAILURE_COUNT; #[test] @@ -1117,86 +958,6 @@ mod tests { ); } - #[test] - fn test_choose_trace_reconcile_decision_existing_int64_keeps_int64() { - assert_eq!( - Instance::choose_trace_reconcile_decision( - &[ColumnDataType::Int64], - Some(ColumnDataType::Int64) - ) - .unwrap(), - Some(TraceReconcileDecision::UseExisting(ColumnDataType::Int64)) - ); - } - - #[test] - fn test_choose_trace_reconcile_decision_existing_int64_widens_to_float64() { - assert_eq!( - Instance::choose_trace_reconcile_decision( - &[ColumnDataType::Int64, ColumnDataType::Float64], - Some(ColumnDataType::Int64) - ) - .unwrap(), - Some(TraceReconcileDecision::AlterExistingTo( - ColumnDataType::Float64 - )) - ); - } - - #[test] - fn test_choose_trace_reconcile_decision_existing_float64_stays_authoritative() { - assert_eq!( - Instance::choose_trace_reconcile_decision( - &[ColumnDataType::Int64, ColumnDataType::Float64], - Some(ColumnDataType::Float64) - ) - .unwrap(), - Some(TraceReconcileDecision::UseExisting(ColumnDataType::Float64)) - ); - } - - #[test] - fn test_choose_trace_reconcile_decision_existing_int64_with_boolean_is_error() { - let err = Instance::choose_trace_reconcile_decision( - &[ColumnDataType::Boolean, ColumnDataType::Int64], - Some(ColumnDataType::Int64), - ) - .unwrap_err(); - assert_eq!(err.status_code(), StatusCode::InvalidArguments); - } - - #[test] - fn test_choose_trace_reconcile_decision_request_local_prefers_float64() { - assert_eq!( - Instance::choose_trace_reconcile_decision( - &[ColumnDataType::Int64, ColumnDataType::Float64], - None - ) - .unwrap(), - Some(TraceReconcileDecision::UseRequestLocal( - ColumnDataType::Float64 - )) - ); - } - - #[test] - fn test_validate_trace_column_rewrites_rejects_invalid_string_parse() { - let rows = vec![Row { - values: vec![Value { - value_data: Some(ValueData::StringValue("not_a_number".to_string())), - }], - }]; - let pending_rewrites = vec![PendingTraceColumnRewrite { - col_idx: 0, - target_type: ColumnDataType::Int64, - column_name: "span_attributes.attr_int".to_string(), - }]; - - let err = validate_trace_column_rewrites(&rows, &pending_rewrites, "trace_type_atomicity") - .unwrap_err(); - assert_eq!(err.status_code(), StatusCode::InvalidArguments); - } - #[test] fn test_wrap_trace_alter_failure_preserves_status_code() { let err = wrap_trace_alter_failure( diff --git a/src/frontend/src/instance/otlp/trace_types.rs b/src/frontend/src/instance/otlp/trace_types.rs new file mode 100644 index 0000000000..0be3df550e --- /dev/null +++ b/src/frontend/src/instance/otlp/trace_types.rs @@ -0,0 +1,308 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::{ColumnDataType, Row}; +use servers::error::{self, Result as ServerResult}; +use servers::otlp::trace::coerce::{ + coerce_value_data, is_supported_trace_coercion, resolve_new_trace_column_type, + trace_value_datatype, +}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(super) enum TraceReconcileDecision { + UseExisting(ColumnDataType), + UseRequestLocal(ColumnDataType), + AlterExistingTo(ColumnDataType), +} + +impl TraceReconcileDecision { + pub(super) fn target_type(self) -> ColumnDataType { + match self { + Self::UseExisting(target_type) + | Self::UseRequestLocal(target_type) + | Self::AlterExistingTo(target_type) => target_type, + } + } + + pub(super) fn requires_alter(self) -> bool { + matches!(self, Self::AlterExistingTo(_)) + } +} + +pub(super) struct PendingTraceColumnRewrite { + pub(super) col_idx: usize, + pub(super) target_type: ColumnDataType, + pub(super) column_name: String, +} + +/// Picks the reconciliation action for one trace column. +/// +/// Existing table schema is authoritative unless the only incompatible case is +/// widening an existing Int64 column to Float64 for incoming Int64/Float64 data. +pub(super) fn choose_trace_reconcile_decision( + observed_types: &[ColumnDataType], + existing_type: Option, +) -> ServerResult> { + let Some(existing_type) = existing_type else { + return resolve_new_trace_column_type(observed_types.iter().copied()) + .map(|target_type| target_type.map(TraceReconcileDecision::UseRequestLocal)) + .map_err(|_| { + error::InvalidParameterSnafu { + reason: "unsupported trace type mix".to_string(), + } + .build() + }); + }; + + if observed_types.iter().all(|&request_type| { + request_type == existing_type || is_supported_trace_coercion(request_type, existing_type) + }) { + return Ok(Some(TraceReconcileDecision::UseExisting(existing_type))); + } + + if existing_type == ColumnDataType::Int64 + && observed_types.contains(&ColumnDataType::Float64) + && observed_types.iter().all(|observed_type| { + matches!( + observed_type, + ColumnDataType::Int64 | ColumnDataType::Float64 + ) + }) + { + return Ok(Some(TraceReconcileDecision::AlterExistingTo( + ColumnDataType::Float64, + ))); + } + + error::InvalidParameterSnafu { + reason: "unsupported trace type mix".to_string(), + } + .fail() +} + +/// Validate all pending trace column rewrites before any schema mutation happens. +pub(super) fn validate_trace_column_rewrites( + rows: &[Row], + pending_rewrites: &[PendingTraceColumnRewrite], + table_name: &str, +) -> ServerResult<()> { + for row in rows { + for pending_rewrite in pending_rewrites { + let Some(value) = row.values.get(pending_rewrite.col_idx) else { + continue; + }; + let Some(request_type) = value.value_data.as_ref().and_then(trace_value_datatype) + else { + continue; + }; + if request_type == pending_rewrite.target_type { + continue; + } + + coerce_value_data(&value.value_data, pending_rewrite.target_type, request_type) + .map_err(|_| { + error::InvalidParameterSnafu { + reason: format!( + "failed to coerce trace column '{}' in table '{}' from {:?} to {:?}", + pending_rewrite.column_name, + table_name, + request_type, + pending_rewrite.target_type + ), + } + .build() + })?; + } + } + + Ok(()) +} + +pub(super) fn enrich_trace_reconcile_error( + table_name: &str, + column_name: &str, + observed_types: &[ColumnDataType], + existing_type: Option, +) -> servers::error::Error { + let observed_types = observed_types + .iter() + .map(|datatype| format!("{datatype:?}")) + .collect::>() + .join(", "); + + error::InvalidParameterSnafu { + reason: match existing_type { + Some(existing_type) => format!( + "failed to reconcile trace column '{}' in table '{}' with observed types [{}] against existing {:?}", + column_name, table_name, observed_types, existing_type + ), + None => format!( + "failed to reconcile trace column '{}' in table '{}' with observed types [{}]", + column_name, table_name, observed_types + ), + }, + } + .build() +} + +/// Only these trace scalar types participate in reconciliation. Other column kinds +/// such as JSON and binary keep their original write path and schema checks. +pub(super) fn is_trace_reconcile_candidate_type(datatype: ColumnDataType) -> bool { + matches!( + datatype, + ColumnDataType::String + | ColumnDataType::Boolean + | ColumnDataType::Int64 + | ColumnDataType::Float64 + ) +} + +/// Keeps the observed type list small without depending on enum ordering. +pub(super) fn push_observed_trace_type( + observed_types: &mut Vec, + datatype: ColumnDataType, +) { + if !observed_types.contains(&datatype) { + observed_types.push(datatype); + } +} + +#[cfg(test)] +mod tests { + use api::v1::value::ValueData; + use api::v1::{ColumnDataType, Row, Value}; + use common_error::ext::ErrorExt; + use common_error::status_code::StatusCode; + + use super::{ + PendingTraceColumnRewrite, TraceReconcileDecision, choose_trace_reconcile_decision, + enrich_trace_reconcile_error, is_trace_reconcile_candidate_type, push_observed_trace_type, + validate_trace_column_rewrites, + }; + + #[test] + fn test_choose_trace_reconcile_decision_existing_int64_keeps_int64() { + assert_eq!( + choose_trace_reconcile_decision(&[ColumnDataType::Int64], Some(ColumnDataType::Int64)) + .unwrap(), + Some(TraceReconcileDecision::UseExisting(ColumnDataType::Int64)) + ); + } + + #[test] + fn test_choose_trace_reconcile_decision_existing_int64_widens_to_float64() { + assert_eq!( + choose_trace_reconcile_decision( + &[ColumnDataType::Int64, ColumnDataType::Float64], + Some(ColumnDataType::Int64) + ) + .unwrap(), + Some(TraceReconcileDecision::AlterExistingTo( + ColumnDataType::Float64 + )) + ); + } + + #[test] + fn test_choose_trace_reconcile_decision_existing_float64_stays_authoritative() { + assert_eq!( + choose_trace_reconcile_decision( + &[ColumnDataType::Int64, ColumnDataType::Float64], + Some(ColumnDataType::Float64) + ) + .unwrap(), + Some(TraceReconcileDecision::UseExisting(ColumnDataType::Float64)) + ); + } + + #[test] + fn test_choose_trace_reconcile_decision_existing_int64_with_boolean_is_error() { + let err = choose_trace_reconcile_decision( + &[ColumnDataType::Boolean, ColumnDataType::Int64], + Some(ColumnDataType::Int64), + ) + .unwrap_err(); + assert_eq!(err.status_code(), StatusCode::InvalidArguments); + } + + #[test] + fn test_choose_trace_reconcile_decision_request_local_prefers_float64() { + assert_eq!( + choose_trace_reconcile_decision( + &[ColumnDataType::Int64, ColumnDataType::Float64], + None + ) + .unwrap(), + Some(TraceReconcileDecision::UseRequestLocal( + ColumnDataType::Float64 + )) + ); + } + + #[test] + fn test_validate_trace_column_rewrites_rejects_invalid_string_parse() { + let rows = vec![Row { + values: vec![Value { + value_data: Some(ValueData::StringValue("not_a_number".to_string())), + }], + }]; + let pending_rewrites = vec![PendingTraceColumnRewrite { + col_idx: 0, + target_type: ColumnDataType::Int64, + column_name: "span_attributes.attr_int".to_string(), + }]; + + let err = validate_trace_column_rewrites(&rows, &pending_rewrites, "trace_type_atomicity") + .unwrap_err(); + assert_eq!(err.status_code(), StatusCode::InvalidArguments); + } + + #[test] + fn test_enrich_trace_reconcile_error_includes_existing_type() { + let err = enrich_trace_reconcile_error( + "trace_type_atomicity", + "span_attributes.attr_int", + &[ColumnDataType::String, ColumnDataType::Int64], + Some(ColumnDataType::Boolean), + ); + + assert_eq!(err.status_code(), StatusCode::InvalidArguments); + assert!(err.to_string().contains("span_attributes.attr_int")); + assert!(err.to_string().contains("Boolean")); + } + + #[test] + fn test_is_trace_reconcile_candidate_type_filters_non_scalar_types() { + assert!(is_trace_reconcile_candidate_type(ColumnDataType::String)); + assert!(is_trace_reconcile_candidate_type(ColumnDataType::Boolean)); + assert!(!is_trace_reconcile_candidate_type(ColumnDataType::Binary)); + assert!(!is_trace_reconcile_candidate_type( + ColumnDataType::TimestampMillisecond + )); + } + + #[test] + fn test_push_observed_trace_type_deduplicates_types() { + let mut observed_types = Vec::new(); + + push_observed_trace_type(&mut observed_types, ColumnDataType::Int64); + push_observed_trace_type(&mut observed_types, ColumnDataType::Int64); + push_observed_trace_type(&mut observed_types, ColumnDataType::Float64); + + assert_eq!( + observed_types, + vec![ColumnDataType::Int64, ColumnDataType::Float64] + ); + } +}