diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index 8cda639686..8b3f8b3eec 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -15,14 +15,17 @@ use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; -use api::v1::{ColumnDataType, RowInsertRequests}; +use api::v1::alter_table_expr::Kind; +use api::v1::{ + AlterTableExpr, ColumnDataType, ModifyColumnType, ModifyColumnTypes, RowInsertRequests, +}; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use client::Output; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_query::prelude::GREPTIME_PHYSICAL_TABLE; -use common_telemetry::tracing; +use common_telemetry::{tracing, warn}; use itertools::Itertools; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; @@ -42,7 +45,7 @@ use servers::query_handler::{ OpenTelemetryProtocolHandler, PipelineHandlerRef, TraceIngestOutcome, }; use session::context::QueryContextRef; -use snafu::ResultExt; +use snafu::{IntoError, ResultExt}; use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM}; use crate::instance::Instance; @@ -60,6 +63,33 @@ enum ChunkFailureReaction { Propagate, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum TraceReconcileDecision { + UseExisting(ColumnDataType), + UseRequestLocal(ColumnDataType), + AlterExistingTo(ColumnDataType), +} + +impl TraceReconcileDecision { + fn target_type(self) -> ColumnDataType { + match self { + Self::UseExisting(target_type) + | Self::UseRequestLocal(target_type) + | Self::AlterExistingTo(target_type) => target_type, + } + } + + fn requires_alter(self) -> bool { + matches!(self, Self::AlterExistingTo(_)) + } +} + +struct PendingTraceColumnRewrite { + col_idx: usize, + target_type: ColumnDataType, + column_name: String, +} + impl ChunkFailureReaction { fn as_metric_label(self) -> &'static str { match self { @@ -546,34 +576,118 @@ impl Instance { Some(summary) } - /// Picks the final datatype for one trace column. + /// Picks the reconciliation action for one trace column. /// - /// Existing table schema is authoritative when present. Otherwise we resolve the - /// request-local observed types using the shared trace coercion rules. - fn choose_trace_target_type( + /// Existing table schema is authoritative unless the only incompatible case is + /// widening an existing Int64 column to Float64 for incoming Int64/Float64 data. + fn choose_trace_reconcile_decision( observed_types: &[ColumnDataType], existing_type: Option, - ) -> ServerResult> { + ) -> ServerResult> { let Some(existing_type) = existing_type else { - return resolve_new_trace_column_type(observed_types.iter().copied()).map_err(|_| { - error::InvalidParameterSnafu { - reason: "unsupported trace type mix".to_string(), - } - .build() - }); + return resolve_new_trace_column_type(observed_types.iter().copied()) + .map(|target_type| target_type.map(TraceReconcileDecision::UseRequestLocal)) + .map_err(|_| { + error::InvalidParameterSnafu { + reason: "unsupported trace type mix".to_string(), + } + .build() + }); }; - if observed_types.iter().copied().all(|request_type| { + if observed_types.iter().all(|&request_type| { request_type == existing_type || is_supported_trace_coercion(request_type, existing_type) }) { - Ok(Some(existing_type)) - } else { - error::InvalidParameterSnafu { - reason: "unsupported trace type mix".to_string(), - } - .fail() + return Ok(Some(TraceReconcileDecision::UseExisting(existing_type))); } + + if existing_type == ColumnDataType::Int64 + && observed_types.contains(&ColumnDataType::Float64) + && observed_types.iter().all(|observed_type| { + matches!( + observed_type, + ColumnDataType::Int64 | ColumnDataType::Float64 + ) + }) + { + return Ok(Some(TraceReconcileDecision::AlterExistingTo( + ColumnDataType::Float64, + ))); + } + + error::InvalidParameterSnafu { + reason: "unsupported trace type mix".to_string(), + } + .fail() + } + + /// Widen existing trace table columns to Float64 before request rewrite. + async fn alter_trace_table_columns_to_float64( + &self, + ctx: &QueryContextRef, + table_name: &str, + column_names: &[String], + ) -> ServerResult<()> { + let catalog_name = ctx.current_catalog().to_string(); + let schema_name = ctx.current_schema(); + let alter_expr = AlterTableExpr { + catalog_name: catalog_name.clone(), + schema_name: schema_name.clone(), + table_name: table_name.to_string(), + kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes { + modify_column_types: column_names + .iter() + .map(|column_name| ModifyColumnType { + column_name: column_name.clone(), + target_type: ColumnDataType::Float64 as i32, + target_type_extension: None, + }) + .collect(), + })), + }; + + if let Err(err) = self + .statement_executor + .alter_table_inner(alter_expr, ctx.clone()) + .await + { + let table = self + .catalog_manager + .table(&catalog_name, &schema_name, table_name, None) + .await + .map_err(servers::error::Error::from)?; + let alter_already_applied = table + .map(|table| { + let table_schema = table.schema(); + column_names.iter().all(|column_name| { + table_schema + .column_schema_by_name(column_name) + .and_then(|table_col| { + ColumnDataTypeWrapper::try_from(table_col.data_type.clone()) + .ok() + .map(|wrapper| wrapper.datatype()) + }) + == Some(ColumnDataType::Float64) + }) + }) + .unwrap_or(false); + + if alter_already_applied { + return Ok(()); + } + + warn!( + table_name, + columns = ?column_names, + error = %err, + "failed to widen trace columns before insert" + ); + + return Err(wrap_trace_alter_failure(err)); + } + + Ok(()) } /// Coerce request column types and values to match the existing table schema @@ -598,7 +712,8 @@ impl Instance { }; let table_schema = table.map(|table| table.schema()); - let mut pending_coercions = Vec::new(); + let mut pending_rewrites = Vec::new(); + let mut pending_alter_columns = Vec::new(); for (col_idx, col_schema) in rows.schema.iter().enumerate() { let Some(current_type) = ColumnDataType::try_from(col_schema.datatype).ok() else { @@ -647,8 +762,8 @@ impl Instance { // Decide the final type once per column, then rewrite all affected cells // together in one row pass below. - let Some(target_type) = - Self::choose_trace_target_type(&observed_types, existing_type).map_err( + let Some(decision) = + Self::choose_trace_reconcile_decision(&observed_types, existing_type).map_err( |_| { enrich_trace_reconcile_error( &req.table_name, @@ -661,31 +776,54 @@ impl Instance { else { continue; }; + let target_type = decision.target_type(); - if observed_types - .iter() - .all(|observed| *observed == target_type) + if !decision.requires_alter() + && observed_types + .iter() + .all(|observed| *observed == target_type) && col_schema.datatype == target_type as i32 { continue; } - pending_coercions.push((col_idx, target_type, col_schema.column_name.clone())); + if decision.requires_alter() + && !pending_alter_columns.contains(&col_schema.column_name) + { + pending_alter_columns.push(col_schema.column_name.clone()); + } + + pending_rewrites.push(PendingTraceColumnRewrite { + col_idx, + target_type, + column_name: col_schema.column_name.clone(), + }); } - if pending_coercions.is_empty() { + if pending_rewrites.is_empty() { continue; } + validate_trace_column_rewrites(&rows.rows, &pending_rewrites, &req.table_name)?; + + if !pending_alter_columns.is_empty() { + self.alter_trace_table_columns_to_float64( + ctx, + &req.table_name, + &pending_alter_columns, + ) + .await?; + } + // Update schema metadata before mutating row values so both stay in sync. - for (col_idx, target_type, ..) in &pending_coercions { - rows.schema[*col_idx].datatype = *target_type as i32; + for pending_rewrite in &pending_rewrites { + rows.schema[pending_rewrite.col_idx].datatype = pending_rewrite.target_type as i32; } // Apply all pending column rewrites in one row pass. for row in &mut rows.rows { - for (col_idx, target_type, column_name) in &pending_coercions { - let Some(value) = row.values.get_mut(*col_idx) else { + for pending_rewrite in &pending_rewrites { + let Some(value) = row.values.get_mut(pending_rewrite.col_idx) else { continue; }; let Some(request_type) = @@ -693,20 +831,23 @@ impl Instance { else { continue; }; - if request_type == *target_type { + if request_type == pending_rewrite.target_type { continue; } value.value_data = coerce_value_data( &value.value_data, - *target_type, + pending_rewrite.target_type, request_type, ) .map_err(|_| { error::InvalidParameterSnafu { reason: format!( "failed to coerce trace column '{}' in table '{}' from {:?} to {:?}", - column_name, req.table_name, request_type, target_type + pending_rewrite.column_name, + req.table_name, + request_type, + pending_rewrite.target_type ), } .build() @@ -719,6 +860,52 @@ impl Instance { } } +/// Validate all pending trace column rewrites before any schema mutation happens. +fn validate_trace_column_rewrites( + rows: &[api::v1::Row], + pending_rewrites: &[PendingTraceColumnRewrite], + table_name: &str, +) -> ServerResult<()> { + for row in rows { + for pending_rewrite in pending_rewrites { + let Some(value) = row.values.get(pending_rewrite.col_idx) else { + continue; + }; + let Some(request_type) = value.value_data.as_ref().and_then(trace_value_datatype) + else { + continue; + }; + if request_type == pending_rewrite.target_type { + continue; + } + + coerce_value_data(&value.value_data, pending_rewrite.target_type, request_type) + .map_err(|_| { + error::InvalidParameterSnafu { + reason: format!( + "failed to coerce trace column '{}' in table '{}' from {:?} to {:?}", + pending_rewrite.column_name, + table_name, + request_type, + pending_rewrite.target_type + ), + } + .build() + })?; + } + } + + Ok(()) +} + +/// Preserve the original alter failure status so chunk retry behavior stays correct. +fn wrap_trace_alter_failure(err: E) -> servers::error::Error +where + E: ErrorExt + Send + Sync + 'static, +{ + error::ExecuteGrpcQuerySnafu.into_error(BoxedError::new(err)) +} + fn enrich_trace_reconcile_error( table_name: &str, column_name: &str, @@ -767,10 +954,16 @@ fn push_observed_trace_type(observed_types: &mut Vec, datatype: #[cfg(test)] mod tests { + use api::v1::value::ValueData; + use api::v1::{ColumnDataType, Row, Value}; + use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use servers::query_handler::TraceIngestOutcome; - use super::{ChunkFailureReaction, Instance}; + use super::{ + ChunkFailureReaction, Instance, PendingTraceColumnRewrite, TraceReconcileDecision, + validate_trace_column_rewrites, wrap_trace_alter_failure, + }; use crate::metrics::OTLP_TRACES_FAILURE_COUNT; #[test] @@ -923,4 +1116,98 @@ mod tests { ChunkFailureReaction::DiscardChunk ); } + + #[test] + fn test_choose_trace_reconcile_decision_existing_int64_keeps_int64() { + assert_eq!( + Instance::choose_trace_reconcile_decision( + &[ColumnDataType::Int64], + Some(ColumnDataType::Int64) + ) + .unwrap(), + Some(TraceReconcileDecision::UseExisting(ColumnDataType::Int64)) + ); + } + + #[test] + fn test_choose_trace_reconcile_decision_existing_int64_widens_to_float64() { + assert_eq!( + Instance::choose_trace_reconcile_decision( + &[ColumnDataType::Int64, ColumnDataType::Float64], + Some(ColumnDataType::Int64) + ) + .unwrap(), + Some(TraceReconcileDecision::AlterExistingTo( + ColumnDataType::Float64 + )) + ); + } + + #[test] + fn test_choose_trace_reconcile_decision_existing_float64_stays_authoritative() { + assert_eq!( + Instance::choose_trace_reconcile_decision( + &[ColumnDataType::Int64, ColumnDataType::Float64], + Some(ColumnDataType::Float64) + ) + .unwrap(), + Some(TraceReconcileDecision::UseExisting(ColumnDataType::Float64)) + ); + } + + #[test] + fn test_choose_trace_reconcile_decision_existing_int64_with_boolean_is_error() { + let err = Instance::choose_trace_reconcile_decision( + &[ColumnDataType::Boolean, ColumnDataType::Int64], + Some(ColumnDataType::Int64), + ) + .unwrap_err(); + assert_eq!(err.status_code(), StatusCode::InvalidArguments); + } + + #[test] + fn test_choose_trace_reconcile_decision_request_local_prefers_float64() { + assert_eq!( + Instance::choose_trace_reconcile_decision( + &[ColumnDataType::Int64, ColumnDataType::Float64], + None + ) + .unwrap(), + Some(TraceReconcileDecision::UseRequestLocal( + ColumnDataType::Float64 + )) + ); + } + + #[test] + fn test_validate_trace_column_rewrites_rejects_invalid_string_parse() { + let rows = vec![Row { + values: vec![Value { + value_data: Some(ValueData::StringValue("not_a_number".to_string())), + }], + }]; + let pending_rewrites = vec![PendingTraceColumnRewrite { + col_idx: 0, + target_type: ColumnDataType::Int64, + column_name: "span_attributes.attr_int".to_string(), + }]; + + let err = validate_trace_column_rewrites(&rows, &pending_rewrites, "trace_type_atomicity") + .unwrap_err(); + assert_eq!(err.status_code(), StatusCode::InvalidArguments); + } + + #[test] + fn test_wrap_trace_alter_failure_preserves_status_code() { + let err = wrap_trace_alter_failure( + servers::error::TableNotFoundSnafu { + catalog: "greptime".to_string(), + schema: "public".to_string(), + table: "trace_type_missing".to_string(), + } + .build(), + ); + + assert_eq!(err.status_code(), StatusCode::TableNotFound); + } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index caf5b2d11c..21e707e4d0 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -5523,6 +5523,202 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) { ) .await; + let existing_int_table_name = "trace_type_existing_int_widens_to_float"; + let existing_int_seed_req = make_trace_v1_request( + "type-existing-int", + vec![make_trace_v1_span( + "00000000000000000000000000000051", + "0000000000000051", + "existing-int-seed", + 1_736_480_942_445_490_000, + 1_736_480_942_445_590_000, + vec![make_int_attr("attr_num", 1)], + )], + ); + let res = send_trace_v1_req( + &client, + existing_int_table_name, + existing_int_seed_req, + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + let existing_int_req = make_trace_v1_request( + "type-existing-int", + vec![ + make_trace_v1_span( + "00000000000000000000000000000052", + "0000000000000052", + "existing-int-upcast-int", + 1_736_480_942_445_600_000, + 1_736_480_942_445_700_000, + vec![make_int_attr("attr_num", 2)], + ), + make_trace_v1_span( + "00000000000000000000000000000053", + "0000000000000053", + "existing-int-upcast-float", + 1_736_480_942_445_710_000, + 1_736_480_942_445_810_000, + vec![make_double_attr("attr_num", 3.5)], + ), + ], + ); + let res = send_trace_v1_req(&client, existing_int_table_name, existing_int_req, false).await; + assert_eq!(StatusCode::OK, res.status()); + + validate_data( + "otlp_traces_v1_existing_int_widens_rows", + &client, + &format!( + "select trace_id, \"span_attributes.attr_num\" from {} order by trace_id;", + existing_int_table_name + ), + r#"[["00000000000000000000000000000051",1.0],["00000000000000000000000000000052",2.0],["00000000000000000000000000000053",3.5]]"#, + ) + .await; + validate_data( + "otlp_traces_v1_existing_int_widens_type", + &client, + "select column_name, lower(data_type), semantic_type from information_schema.columns where table_name = 'trace_type_existing_int_widens_to_float' and column_name = 'span_attributes.attr_num';", + r#"[["span_attributes.attr_num","double","FIELD"]]"#, + ) + .await; + + let existing_int_atomic_table_name = "trace_type_existing_int_widen_atomic"; + let existing_int_atomic_seed_req = make_trace_v1_request( + "type-existing-int-atomic", + vec![make_trace_v1_span( + "00000000000000000000000000000054", + "0000000000000054", + "existing-int-atomic-seed", + 1_736_480_942_445_720_000, + 1_736_480_942_445_820_000, + vec![ + make_int_attr("attr_num", 1), + make_int_attr("attr_parse", 10), + ], + )], + ); + let res = send_trace_v1_req( + &client, + existing_int_atomic_table_name, + existing_int_atomic_seed_req, + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + let existing_int_atomic_req = make_trace_v1_request( + "type-existing-int-atomic", + vec![make_trace_v1_span( + "00000000000000000000000000000055", + "0000000000000055", + "existing-int-atomic-invalid", + 1_736_480_942_445_830_000, + 1_736_480_942_445_930_000, + vec![ + make_double_attr("attr_num", 3.5), + make_string_attr("attr_parse", "not_a_number"), + ], + )], + ); + let res = send_trace_v1_req( + &client, + existing_int_atomic_table_name, + existing_int_atomic_req, + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + let body = ExportTraceServiceResponse::decode(res.bytes().await).unwrap(); + let partial_success = body.partial_success.as_ref().unwrap(); + assert_eq!(partial_success.rejected_spans, 1); + assert!( + partial_success + .error_message + .contains("Accepted 0 spans, rejected 1 spans"), + "unexpected partial success body: {body:?}" + ); + + validate_data( + "otlp_traces_v1_existing_int_widen_atomic_rows", + &client, + &format!( + "select trace_id, \"span_attributes.attr_num\", \"span_attributes.attr_parse\" from {} order by trace_id;", + existing_int_atomic_table_name + ), + r#"[["00000000000000000000000000000054",1,10]]"#, + ) + .await; + validate_data( + "otlp_traces_v1_existing_int_widen_atomic_types", + &client, + "select column_name, lower(data_type), semantic_type from information_schema.columns where table_name = 'trace_type_existing_int_widen_atomic' and column_name in ('span_attributes.attr_num', 'span_attributes.attr_parse') order by column_name;", + r#"[["span_attributes.attr_num","bigint","FIELD"],["span_attributes.attr_parse","bigint","FIELD"]]"#, + ) + .await; + + let existing_int_float_only_table_name = "trace_type_existing_int_float_only"; + let existing_int_float_only_seed_req = make_trace_v1_request( + "type-existing-int-float-only", + vec![make_trace_v1_span( + "00000000000000000000000000000061", + "0000000000000061", + "existing-int-float-only-seed", + 1_736_480_942_445_820_000, + 1_736_480_942_445_920_000, + vec![make_int_attr("attr_num", 1)], + )], + ); + let res = send_trace_v1_req( + &client, + existing_int_float_only_table_name, + existing_int_float_only_seed_req, + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + let existing_int_float_only_req = make_trace_v1_request( + "type-existing-int-float-only", + vec![make_trace_v1_span( + "00000000000000000000000000000062", + "0000000000000062", + "existing-int-float-only-apply", + 1_736_480_942_445_930_000, + 1_736_480_942_446_030_000, + vec![make_double_attr("attr_num", 2.5)], + )], + ); + let res = send_trace_v1_req( + &client, + existing_int_float_only_table_name, + existing_int_float_only_req, + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + validate_data( + "otlp_traces_v1_existing_int_float_only_rows", + &client, + &format!( + "select trace_id, \"span_attributes.attr_num\" from {} order by trace_id;", + existing_int_float_only_table_name + ), + r#"[["00000000000000000000000000000061",1.0],["00000000000000000000000000000062",2.5]]"#, + ) + .await; + validate_data( + "otlp_traces_v1_existing_int_float_only_type", + &client, + "select column_name, lower(data_type), semantic_type from information_schema.columns where table_name = 'trace_type_existing_int_float_only' and column_name = 'span_attributes.attr_num';", + r#"[["span_attributes.attr_num","double","FIELD"]]"#, + ) + .await; + validate_data( "otlp_traces_v1_type_coercion_rows", &client,