diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index 52df274780..9b21f9924f 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -14,6 +14,8 @@ use std::sync::Arc; +use api::helper::ColumnDataTypeWrapper; +use api::v1::{ColumnDataType, RowInsertRequests}; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use client::Output; @@ -24,10 +26,14 @@ use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest; use pipeline::{GreptimePipelineParams, PipelineWay}; -use servers::error::{self, AuthSnafu, Result as ServerResult}; +use servers::error::{self, AuthSnafu, CatalogSnafu, Result as ServerResult}; use servers::http::prom_store::PHYSICAL_TABLE_PARAM; use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef}; use servers::otlp; +use servers::otlp::trace::coerce::{ + coerce_value_data, is_supported_trace_coercion, resolve_new_trace_column_type, + trace_value_datatype, +}; use servers::query_handler::{OpenTelemetryProtocolHandler, PipelineHandlerRef}; use session::context::QueryContextRef; use snafu::ResultExt; @@ -124,7 +130,7 @@ impl OpenTelemetryProtocolHandler for Instance { let is_trace_v1_model = matches!(pipeline, PipelineWay::OtlpTraceDirectV1); - let (requests, rows) = otlp::trace::to_grpc_insert_requests( + let (mut requests, rows) = otlp::trace::to_grpc_insert_requests( request, pipeline, pipeline_params, @@ -136,6 +142,8 @@ impl OpenTelemetryProtocolHandler for Instance { OTLP_TRACES_ROWS.inc_by(rows as u64); if is_trace_v1_model { + self.reconcile_trace_column_types(&mut requests, &ctx) + .await?; self.handle_trace_inserts(requests, ctx) .await .map_err(BoxedError::new) @@ -200,3 +208,224 @@ impl OpenTelemetryProtocolHandler for Instance { Ok(outputs) } } + +impl Instance { + /// Picks the final datatype for one trace column. + /// + /// Existing table schema is authoritative when present. Otherwise we resolve the + /// request-local observed types using the shared trace coercion rules. + fn choose_trace_target_type( + observed_types: &[ColumnDataType], + existing_type: Option, + ) -> ServerResult> { + let Some(existing_type) = existing_type else { + return resolve_new_trace_column_type(observed_types.iter().copied()).map_err(|_| { + error::InvalidParameterSnafu { + reason: "unsupported trace type mix".to_string(), + } + .build() + }); + }; + + if observed_types.iter().copied().all(|request_type| { + request_type == existing_type + || is_supported_trace_coercion(request_type, existing_type) + }) { + Ok(Some(existing_type)) + } else { + error::InvalidParameterSnafu { + reason: "unsupported trace type mix".to_string(), + } + .fail() + } + } + + /// Coerce request column types and values to match the existing table schema + /// for compatible type pairs. Existing table schema wins when present; + /// otherwise the full request batch decides a stable target type. + async fn reconcile_trace_column_types( + &self, + requests: &mut RowInsertRequests, + ctx: &QueryContextRef, + ) -> ServerResult<()> { + let catalog = ctx.current_catalog(); + let schema = ctx.current_schema(); + + for req in &mut requests.inserts { + let table = self + .catalog_manager + .table(catalog, &schema, &req.table_name, None) + .await + .context(CatalogSnafu)?; + + let Some(rows) = req.rows.as_mut() else { + continue; + }; + + let table_schema = table.map(|table| table.schema()); + let mut pending_coercions = Vec::new(); + + for (col_idx, col_schema) in rows.schema.iter().enumerate() { + let Some(current_type) = ColumnDataType::try_from(col_schema.datatype).ok() else { + continue; + }; + + let mut observed_types = Vec::new(); + push_observed_trace_type(&mut observed_types, current_type); + + // Scan the full request first so the final type decision is not affected + // by row order inside the batch. + for row in &rows.rows { + let Some(value) = row + .values + .get(col_idx) + .and_then(|value| value.value_data.as_ref()) + else { + continue; + }; + + let Some(value_type) = trace_value_datatype(value) else { + continue; + }; + push_observed_trace_type(&mut observed_types, value_type); + } + + let existing_type = table_schema + .as_ref() + .and_then(|schema| schema.column_schema_by_name(&col_schema.column_name)) + .and_then(|table_col| { + ColumnDataTypeWrapper::try_from(table_col.data_type.clone()) + .ok() + .map(|wrapper| wrapper.datatype()) + }); + + if !observed_types + .iter() + .copied() + .any(is_trace_reconcile_candidate_type) + && existing_type + .map(|datatype| !is_trace_reconcile_candidate_type(datatype)) + .unwrap_or(true) + { + continue; + } + + // Decide the final type once per column, then rewrite all affected cells + // together in one row pass below. + let Some(target_type) = + Self::choose_trace_target_type(&observed_types, existing_type).map_err( + |_| { + enrich_trace_reconcile_error( + &req.table_name, + &col_schema.column_name, + &observed_types, + existing_type, + ) + }, + )? + else { + continue; + }; + + if observed_types + .iter() + .all(|observed| *observed == target_type) + && col_schema.datatype == target_type as i32 + { + continue; + } + + pending_coercions.push((col_idx, target_type, col_schema.column_name.clone())); + } + + if pending_coercions.is_empty() { + continue; + } + + // Update schema metadata before mutating row values so both stay in sync. + for (col_idx, target_type, ..) in &pending_coercions { + rows.schema[*col_idx].datatype = *target_type as i32; + } + + // Apply all pending column rewrites in one row pass. + for row in &mut rows.rows { + for (col_idx, target_type, column_name) in &pending_coercions { + let Some(value) = row.values.get_mut(*col_idx) else { + continue; + }; + let Some(request_type) = + value.value_data.as_ref().and_then(trace_value_datatype) + else { + continue; + }; + if request_type == *target_type { + continue; + } + + value.value_data = coerce_value_data( + &value.value_data, + *target_type, + request_type, + ) + .map_err(|_| { + error::InvalidParameterSnafu { + reason: format!( + "failed to coerce trace column '{}' in table '{}' from {:?} to {:?}", + column_name, req.table_name, request_type, target_type + ), + } + .build() + })?; + } + } + } + + Ok(()) + } +} + +fn enrich_trace_reconcile_error( + table_name: &str, + column_name: &str, + observed_types: &[ColumnDataType], + existing_type: Option, +) -> servers::error::Error { + let observed_types = observed_types + .iter() + .map(|datatype| format!("{datatype:?}")) + .collect::>() + .join(", "); + + error::InvalidParameterSnafu { + reason: match existing_type { + Some(existing_type) => format!( + "failed to reconcile trace column '{}' in table '{}' with observed types [{}] against existing {:?}", + column_name, table_name, observed_types, existing_type + ), + None => format!( + "failed to reconcile trace column '{}' in table '{}' with observed types [{}]", + column_name, table_name, observed_types + ), + }, + } + .build() +} + +/// Only these trace scalar types participate in reconciliation. Other column kinds +/// such as JSON and binary keep their original write path and schema checks. +fn is_trace_reconcile_candidate_type(datatype: ColumnDataType) -> bool { + matches!( + datatype, + ColumnDataType::String + | ColumnDataType::Boolean + | ColumnDataType::Int64 + | ColumnDataType::Float64 + ) +} + +/// Keeps the observed type list small without depending on enum ordering. +fn push_observed_trace_type(observed_types: &mut Vec, datatype: ColumnDataType) { + if !observed_types.contains(&datatype) { + observed_types.push(datatype); + } +} diff --git a/src/servers/src/otlp/trace.rs b/src/servers/src/otlp/trace.rs index b724bb1d22..ca56f9b868 100644 --- a/src/servers/src/otlp/trace.rs +++ b/src/servers/src/otlp/trace.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod attributes; +pub mod coerce; pub mod span; pub mod v0; pub mod v1; diff --git a/src/servers/src/otlp/trace/coerce.rs b/src/servers/src/otlp/trace/coerce.rs new file mode 100644 index 0000000000..febec0fda9 --- /dev/null +++ b/src/servers/src/otlp/trace/coerce.rs @@ -0,0 +1,343 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::ColumnDataType; +use api::v1::value::ValueData; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TraceCoerceError { + Unsupported, +} + +// For now we support the following coercions: +// - Int64 to Float64 +// - Int64 to String +// - Float64 to String +// - Boolean to String +// The following coercions are supported with parse, which could fail: +// If fails, we will return TraceCoerceError::Unsupported. +// - String to Int64 +// - String to Float64 +// - String to Boolean +pub fn is_supported_trace_coercion( + request_type: ColumnDataType, + target_type: ColumnDataType, +) -> bool { + matches!( + (request_type, target_type), + (ColumnDataType::Int64, ColumnDataType::Float64) + | (ColumnDataType::Int64, ColumnDataType::String) + | (ColumnDataType::Float64, ColumnDataType::String) + | (ColumnDataType::Boolean, ColumnDataType::String) + | (ColumnDataType::String, ColumnDataType::Int64) + | (ColumnDataType::String, ColumnDataType::Float64) + | (ColumnDataType::String, ColumnDataType::Boolean) + ) +} + +pub fn coerce_value_data( + value: &Option, + target: ColumnDataType, + request_type: ColumnDataType, +) -> Result, TraceCoerceError> { + let Some(v) = value else { + return Ok(None); + }; + + let Some(value) = coerce_non_null_value(target, request_type, v) else { + return Err(TraceCoerceError::Unsupported); + }; + Ok(Some(value)) +} + +pub fn coerce_non_null_value( + target: ColumnDataType, + request_type: ColumnDataType, + value: &ValueData, +) -> Option { + match (request_type, target, value) { + (ColumnDataType::Int64, ColumnDataType::Float64, ValueData::I64Value(n)) => { + Some(ValueData::F64Value(*n as f64)) + } + (ColumnDataType::Int64, ColumnDataType::String, ValueData::I64Value(n)) => { + Some(ValueData::StringValue(n.to_string())) + } + (ColumnDataType::Float64, ColumnDataType::String, ValueData::F64Value(n)) => { + Some(ValueData::StringValue(n.to_string())) + } + (ColumnDataType::Boolean, ColumnDataType::String, ValueData::BoolValue(b)) => { + Some(ValueData::StringValue(b.to_string())) + } + (ColumnDataType::String, ColumnDataType::Int64, ValueData::StringValue(s)) => { + s.parse::().ok().map(ValueData::I64Value) + } + (ColumnDataType::String, ColumnDataType::Float64, ValueData::StringValue(s)) => { + s.parse::().ok().map(ValueData::F64Value) + } + (ColumnDataType::String, ColumnDataType::Boolean, ValueData::StringValue(s)) => { + s.parse::().ok().map(ValueData::BoolValue) + } + _ => None, + } +} + +pub fn trace_value_datatype(value: &ValueData) -> Option { + match value { + ValueData::StringValue(_) => Some(ColumnDataType::String), + ValueData::BoolValue(_) => Some(ColumnDataType::Boolean), + ValueData::I64Value(_) => Some(ColumnDataType::Int64), + ValueData::F64Value(_) => Some(ColumnDataType::Float64), + ValueData::BinaryValue(_) => Some(ColumnDataType::Binary), + _ => None, + } +} + +/// Resolves the final datatype for a new trace column when there is no existing +/// table schema to override the request-local observations. +pub fn resolve_new_trace_column_type( + observed_types: impl IntoIterator, +) -> Result, TraceCoerceError> { + let mut observed_types = observed_types.into_iter().collect::>(); + observed_types.dedup(); + + match observed_types.as_slice() { + [] => Ok(None), + [datatype] => Ok(Some(*datatype)), + [_, _] + if observed_types.contains(&ColumnDataType::String) + && observed_types.contains(&ColumnDataType::Boolean) => + { + Ok(Some(ColumnDataType::Boolean)) + } + [_, _] + if observed_types.contains(&ColumnDataType::String) + && observed_types.contains(&ColumnDataType::Int64) => + { + Ok(Some(ColumnDataType::Int64)) + } + [_, _] + if observed_types.contains(&ColumnDataType::String) + && observed_types.contains(&ColumnDataType::Float64) => + { + Ok(Some(ColumnDataType::Float64)) + } + [_, _] + if observed_types.contains(&ColumnDataType::Int64) + && observed_types.contains(&ColumnDataType::Float64) => + { + Ok(Some(ColumnDataType::Float64)) + } + _ => Err(TraceCoerceError::Unsupported), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_coerce_int64_to_float64() { + let result = coerce_value_data( + &Some(ValueData::I64Value(42)), + ColumnDataType::Float64, + ColumnDataType::Int64, + ); + assert_eq!(result, Ok(Some(ValueData::F64Value(42.0)))); + } + + #[test] + fn test_coerce_string_to_int64() { + let result = coerce_value_data( + &Some(ValueData::StringValue("123".to_string())), + ColumnDataType::Int64, + ColumnDataType::String, + ); + assert_eq!(result, Ok(Some(ValueData::I64Value(123)))); + } + + #[test] + fn test_coerce_int64_to_string() { + let result = coerce_value_data( + &Some(ValueData::I64Value(123)), + ColumnDataType::String, + ColumnDataType::Int64, + ); + assert_eq!(result, Ok(Some(ValueData::StringValue("123".to_string())))); + } + + #[test] + fn test_coerce_string_to_float64() { + let result = coerce_value_data( + &Some(ValueData::StringValue("1.5".to_string())), + ColumnDataType::Float64, + ColumnDataType::String, + ); + assert_eq!(result, Ok(Some(ValueData::F64Value(1.5)))); + } + + #[test] + fn test_coerce_float64_to_string() { + let result = coerce_value_data( + &Some(ValueData::F64Value(1.5)), + ColumnDataType::String, + ColumnDataType::Float64, + ); + assert_eq!(result, Ok(Some(ValueData::StringValue("1.5".to_string())))); + } + + #[test] + fn test_coerce_string_to_boolean() { + let result = coerce_value_data( + &Some(ValueData::StringValue("true".to_string())), + ColumnDataType::Boolean, + ColumnDataType::String, + ); + assert_eq!(result, Ok(Some(ValueData::BoolValue(true)))); + + let result = coerce_value_data( + &Some(ValueData::StringValue("false".to_string())), + ColumnDataType::Boolean, + ColumnDataType::String, + ); + assert_eq!(result, Ok(Some(ValueData::BoolValue(false)))); + } + + #[test] + fn test_coerce_boolean_to_string() { + let result = coerce_value_data( + &Some(ValueData::BoolValue(true)), + ColumnDataType::String, + ColumnDataType::Boolean, + ); + assert_eq!(result, Ok(Some(ValueData::StringValue("true".to_string())))); + } + + #[test] + fn test_coerce_unparsable_string() { + let result = coerce_value_data( + &Some(ValueData::StringValue("not_a_number".to_string())), + ColumnDataType::Int64, + ColumnDataType::String, + ); + assert_eq!(result, Err(TraceCoerceError::Unsupported)); + } + + #[test] + fn test_coerce_float64_to_int64_not_supported() { + let result = coerce_value_data( + &Some(ValueData::F64Value(1.5)), + ColumnDataType::Int64, + ColumnDataType::Float64, + ); + assert_eq!(result, Err(TraceCoerceError::Unsupported)); + } + + #[test] + fn test_coerce_none_value() { + let result = coerce_value_data(&None, ColumnDataType::Float64, ColumnDataType::Int64); + assert_eq!(result, Ok(None)); + } + + #[test] + fn test_is_supported_trace_coercion() { + assert!(is_supported_trace_coercion( + ColumnDataType::Int64, + ColumnDataType::Float64 + )); + assert!(is_supported_trace_coercion( + ColumnDataType::Int64, + ColumnDataType::String + )); + assert!(is_supported_trace_coercion( + ColumnDataType::Float64, + ColumnDataType::String + )); + assert!(is_supported_trace_coercion( + ColumnDataType::Boolean, + ColumnDataType::String + )); + assert!(is_supported_trace_coercion( + ColumnDataType::String, + ColumnDataType::Int64 + )); + assert!(is_supported_trace_coercion( + ColumnDataType::String, + ColumnDataType::Float64 + )); + assert!(is_supported_trace_coercion( + ColumnDataType::String, + ColumnDataType::Boolean + )); + assert!(!is_supported_trace_coercion( + ColumnDataType::Binary, + ColumnDataType::Json + )); + } + + #[test] + fn test_trace_value_datatype() { + assert_eq!( + trace_value_datatype(&ValueData::StringValue("x".to_string())), + Some(ColumnDataType::String) + ); + assert_eq!( + trace_value_datatype(&ValueData::BoolValue(true)), + Some(ColumnDataType::Boolean) + ); + assert_eq!( + trace_value_datatype(&ValueData::I64Value(1)), + Some(ColumnDataType::Int64) + ); + assert_eq!( + trace_value_datatype(&ValueData::F64Value(1.0)), + Some(ColumnDataType::Float64) + ); + assert_eq!( + trace_value_datatype(&ValueData::BinaryValue(vec![1_u8])), + Some(ColumnDataType::Binary) + ); + } + + #[test] + fn test_resolve_new_trace_column_type() { + assert_eq!( + resolve_new_trace_column_type([ColumnDataType::Int64]), + Ok(Some(ColumnDataType::Int64)) + ); + assert_eq!( + resolve_new_trace_column_type([ColumnDataType::String, ColumnDataType::Int64]), + Ok(Some(ColumnDataType::Int64)) + ); + assert_eq!( + resolve_new_trace_column_type([ColumnDataType::String, ColumnDataType::Float64]), + Ok(Some(ColumnDataType::Float64)) + ); + assert_eq!( + resolve_new_trace_column_type([ColumnDataType::String, ColumnDataType::Boolean]), + Ok(Some(ColumnDataType::Boolean)) + ); + assert_eq!( + resolve_new_trace_column_type([ColumnDataType::Int64, ColumnDataType::Float64]), + Ok(Some(ColumnDataType::Float64)) + ); + assert_eq!( + resolve_new_trace_column_type([ + ColumnDataType::String, + ColumnDataType::Int64, + ColumnDataType::Float64, + ]), + Err(TraceCoerceError::Unsupported) + ); + } +} diff --git a/src/servers/src/otlp/trace/v1.rs b/src/servers/src/otlp/trace/v1.rs index 86f8229769..11e986de04 100644 --- a/src/servers/src/otlp/trace/v1.rs +++ b/src/servers/src/otlp/trace/v1.rs @@ -230,7 +230,7 @@ fn write_trace_operations_to_row( Ok(()) } -fn write_attributes( +pub(crate) fn write_attributes( writer: &mut TableData, prefix: &str, attributes: Attributes, @@ -247,44 +247,40 @@ fn write_attributes( let key = format!("{}.{}", prefix, key_suffix); match attr.value.and_then(|v| v.value) { Some(OtlpValue::StringValue(v)) => { - row_writer::write_fields( - writer, - std::iter::once(make_string_column_data(&key, Some(v))), + // Keep the raw request value here. Mixed trace types are reconciled later + // in the frontend once we can also see the existing table schema. + writer.write_field_unchecked( + &key, + ColumnDataType::String, + Some(ValueData::StringValue(v)), row, - )?; + ); } Some(OtlpValue::BoolValue(v)) => { - row_writer::write_fields( - writer, - std::iter::once(make_column_data( - &key, - ColumnDataType::Boolean, - Some(ValueData::BoolValue(v)), - )), + // Do not coerce or promote types while building the request-local rows. + writer.write_field_unchecked( + &key, + ColumnDataType::Boolean, + Some(ValueData::BoolValue(v)), row, - )?; + ); } Some(OtlpValue::IntValue(v)) => { - row_writer::write_fields( - writer, - std::iter::once(make_column_data( - &key, - ColumnDataType::Int64, - Some(ValueData::I64Value(v)), - )), + // Preserving the original value avoids order-dependent behavior inside one batch. + writer.write_field_unchecked( + &key, + ColumnDataType::Int64, + Some(ValueData::I64Value(v)), row, - )?; + ); } Some(OtlpValue::DoubleValue(v)) => { - row_writer::write_fields( - writer, - std::iter::once(make_column_data( - &key, - ColumnDataType::Float64, - Some(ValueData::F64Value(v)), - )), + writer.write_field_unchecked( + &key, + ColumnDataType::Float64, + Some(ValueData::F64Value(v)), row, - )?; + ); } Some(OtlpValue::ArrayValue(v)) => row_writer::write_json( writer, @@ -315,3 +311,214 @@ fn write_attributes( Ok(()) } + +#[cfg(test)] +mod tests { + use api::v1::value::ValueData; + use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue; + use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue}; + + use super::*; + use crate::otlp::trace::attributes::Attributes; + use crate::row_writer::TableData; + + fn make_kv(key: &str, value: OtlpValue) -> KeyValue { + KeyValue { + key: key.to_string(), + value: Some(AnyValue { value: Some(value) }), + } + } + + #[test] + fn test_keep_mixed_numeric_values_until_frontend_reconciliation() { + let mut writer = TableData::new(4, 2); + + let attrs1 = Attributes::from(vec![make_kv("val", OtlpValue::DoubleValue(1.5))]); + let mut row1 = writer.alloc_one_row(); + write_attributes(&mut writer, "attr", attrs1, &mut row1).unwrap(); + writer.add_row(row1); + + let attrs2 = Attributes::from(vec![make_kv("val", OtlpValue::IntValue(42))]); + let mut row2 = writer.alloc_one_row(); + write_attributes(&mut writer, "attr", attrs2, &mut row2).unwrap(); + writer.add_row(row2); + + let (schema, rows) = writer.into_schema_and_rows(); + + let col_idx = schema + .iter() + .position(|c| c.column_name == "attr.val") + .unwrap(); + assert_eq!(schema[col_idx].datatype, ColumnDataType::Float64 as i32); + + assert_eq!( + rows[0].values[col_idx].value_data, + Some(ValueData::F64Value(1.5)) + ); + assert_eq!( + rows[1].values[col_idx].value_data, + Some(ValueData::I64Value(42)) + ); + } + + #[test] + fn test_keep_mixed_string_and_int_values_until_frontend_reconciliation() { + let mut writer = TableData::new(4, 2); + + let attrs1 = Attributes::from(vec![make_kv("val", OtlpValue::IntValue(10))]); + let mut row1 = writer.alloc_one_row(); + write_attributes(&mut writer, "attr", attrs1, &mut row1).unwrap(); + writer.add_row(row1); + + let attrs2 = Attributes::from(vec![make_kv( + "val", + OtlpValue::StringValue("20".to_string()), + )]); + let mut row2 = writer.alloc_one_row(); + write_attributes(&mut writer, "attr", attrs2, &mut row2).unwrap(); + writer.add_row(row2); + + let (schema, rows) = writer.into_schema_and_rows(); + let col_idx = schema + .iter() + .position(|c| c.column_name == "attr.val") + .unwrap(); + assert_eq!(schema[col_idx].datatype, ColumnDataType::Int64 as i32); + assert_eq!( + rows[1].values[col_idx].value_data, + Some(ValueData::StringValue("20".to_string())) + ); + } + + #[test] + fn test_keep_first_seen_schema_until_frontend_reconciliation() { + let mut writer = TableData::new(4, 2); + + let attrs1 = Attributes::from(vec![make_kv( + "val", + OtlpValue::StringValue("10".to_string()), + )]); + let mut row1 = writer.alloc_one_row(); + write_attributes(&mut writer, "attr", attrs1, &mut row1).unwrap(); + writer.add_row(row1); + + let attrs2 = Attributes::from(vec![make_kv("val", OtlpValue::IntValue(20))]); + let mut row2 = writer.alloc_one_row(); + write_attributes(&mut writer, "attr", attrs2, &mut row2).unwrap(); + writer.add_row(row2); + + let (schema, rows) = writer.into_schema_and_rows(); + let col_idx = schema + .iter() + .position(|c| c.column_name == "attr.val") + .unwrap(); + assert_eq!(schema[col_idx].datatype, ColumnDataType::String as i32); + assert_eq!( + rows[0].values[col_idx].value_data, + Some(ValueData::StringValue("10".to_string())) + ); + assert_eq!( + rows[1].values[col_idx].value_data, + Some(ValueData::I64Value(20)) + ); + } + + #[test] + fn test_keep_mixed_string_and_float_values_until_frontend_reconciliation() { + let mut writer = TableData::new(4, 2); + + let attrs1 = Attributes::from(vec![make_kv("val", OtlpValue::DoubleValue(1.5))]); + let mut row1 = writer.alloc_one_row(); + write_attributes(&mut writer, "attr", attrs1, &mut row1).unwrap(); + writer.add_row(row1); + + let attrs2 = Attributes::from(vec![make_kv( + "val", + OtlpValue::StringValue("1.5".to_string()), + )]); + let mut row2 = writer.alloc_one_row(); + write_attributes(&mut writer, "attr", attrs2, &mut row2).unwrap(); + writer.add_row(row2); + + let (schema, rows) = writer.into_schema_and_rows(); + let col_idx = schema + .iter() + .position(|c| c.column_name == "attr.val") + .unwrap(); + assert_eq!(schema[col_idx].datatype, ColumnDataType::Float64 as i32); + assert_eq!( + rows[1].values[col_idx].value_data, + Some(ValueData::StringValue("1.5".to_string())) + ); + } + + #[test] + fn test_keep_mixed_string_and_bool_values_until_frontend_reconciliation() { + let mut writer = TableData::new(4, 2); + + let attrs1 = Attributes::from(vec![make_kv( + "val", + OtlpValue::StringValue("true".to_string()), + )]); + let mut row1 = writer.alloc_one_row(); + write_attributes(&mut writer, "attr", attrs1, &mut row1).unwrap(); + writer.add_row(row1); + + let attrs2 = Attributes::from(vec![make_kv("val", OtlpValue::BoolValue(false))]); + let mut row2 = writer.alloc_one_row(); + write_attributes(&mut writer, "attr", attrs2, &mut row2).unwrap(); + writer.add_row(row2); + + let (schema, rows) = writer.into_schema_and_rows(); + let col_idx = schema + .iter() + .position(|c| c.column_name == "attr.val") + .unwrap(); + assert_eq!(schema[col_idx].datatype, ColumnDataType::String as i32); + assert_eq!( + rows[0].values[col_idx].value_data, + Some(ValueData::StringValue("true".to_string())) + ); + assert_eq!( + rows[1].values[col_idx].value_data, + Some(ValueData::BoolValue(false)) + ); + } + + #[test] + fn test_keep_mixed_binary_and_string_values_until_frontend_reconciliation() { + let mut writer = TableData::new(4, 2); + + let attrs1 = Attributes::from(vec![make_kv( + "val", + OtlpValue::BytesValue(vec![1_u8, 2, 3]), + )]); + let mut row1 = writer.alloc_one_row(); + write_attributes(&mut writer, "attr", attrs1, &mut row1).unwrap(); + writer.add_row(row1); + + let attrs2 = Attributes::from(vec![make_kv( + "val", + OtlpValue::StringValue("false".to_string()), + )]); + let mut row2 = writer.alloc_one_row(); + write_attributes(&mut writer, "attr", attrs2, &mut row2).unwrap(); + writer.add_row(row2); + + let (schema, rows) = writer.into_schema_and_rows(); + let col_idx = schema + .iter() + .position(|c| c.column_name == "attr.val") + .unwrap(); + assert_eq!(schema[col_idx].datatype, ColumnDataType::Binary as i32); + assert_eq!( + rows[0].values[col_idx].value_data, + Some(ValueData::BinaryValue(vec![1_u8, 2, 3])) + ); + assert_eq!( + rows[1].values[col_idx].value_data, + Some(ValueData::StringValue("false".to_string())) + ); + } + // Conversion matrix coverage lives in the shared coercion helper tests. +} diff --git a/src/servers/src/row_writer.rs b/src/servers/src/row_writer.rs index ec439a8659..fca2c21b41 100644 --- a/src/servers/src/row_writer.rs +++ b/src/servers/src/row_writer.rs @@ -77,6 +77,35 @@ impl TableData { pub fn into_schema_and_rows(self) -> (Vec, Vec) { (self.schema, self.rows) } + + /// Writes a field value without enforcing that later writes use the same datatype + /// as the first-seen schema entry. + /// + /// The OTLP trace v1 path uses this to preserve raw mixed values inside one request + /// so the frontend can reconcile them later against both the full batch and the + /// existing table schema. + pub fn write_field_unchecked( + &mut self, + name: impl ToString, + datatype: ColumnDataType, + value: Option, + one_row: &mut Vec, + ) { + let name = name.to_string(); + if let Some(index) = self.column_indexes.get(&name).copied() { + one_row[index].value_data = value; + } else { + let index = self.schema.len(); + self.schema.push(ColumnSchema { + column_name: name.clone(), + datatype: datatype as i32, + semantic_type: SemanticType::Field as i32, + ..Default::default() + }); + self.column_indexes.insert(name, index); + one_row.push(Value { value_data: value }); + } + } } pub struct MultiTableData { diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 933fcadf6b..c0d858a592 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -5283,6 +5283,315 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) { ) .await; + let coercion_table_name = "trace_type_coercion"; + let coercion_req = make_trace_v1_request( + "type-coercion", + vec![make_trace_v1_span( + "00000000000000000000000000000001", + "0000000000000001", + "coercion-seed", + 1_736_480_942_444_376_000, + 1_736_480_942_444_499_000, + vec![ + make_double_attr("attr_float", 1.5), + make_int_attr("attr_int", 10), + make_bool_attr("attr_bool", true), + ], + )], + ); + let res = send_trace_v1_req(&client, coercion_table_name, coercion_req, false).await; + assert_eq!(StatusCode::OK, res.status()); + + let coercion_req = make_trace_v1_request( + "type-coercion", + vec![make_trace_v1_span( + "00000000000000000000000000000002", + "0000000000000002", + "coercion-apply", + 1_736_480_942_444_589_000, + 1_736_480_942_444_712_000, + vec![ + make_int_attr("attr_float", 2), + make_string_attr("attr_int", "20"), + make_string_attr("attr_bool", "false"), + ], + )], + ); + let res = send_trace_v1_req(&client, coercion_table_name, coercion_req, false).await; + assert_eq!(StatusCode::OK, res.status()); + + let string_target_table_name = "trace_type_coercion_to_string"; + let string_target_seed_req = make_trace_v1_request( + "type-coercion-string", + vec![make_trace_v1_span( + "00000000000000000000000000000021", + "0000000000000021", + "string-target-seed", + 1_736_480_942_444_720_000, + 1_736_480_942_444_820_000, + vec![ + make_string_attr("attr_int", "seed"), + make_string_attr("attr_float", "seed"), + make_string_attr("attr_bool", "seed"), + ], + )], + ); + let res = send_trace_v1_req( + &client, + string_target_table_name, + string_target_seed_req, + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + let string_target_req = make_trace_v1_request( + "type-coercion-string", + vec![make_trace_v1_span( + "00000000000000000000000000000022", + "0000000000000022", + "string-target-apply", + 1_736_480_942_444_830_000, + 1_736_480_942_444_930_000, + vec![ + make_int_attr("attr_int", 20), + make_double_attr("attr_float", 2.5), + make_bool_attr("attr_bool", false), + ], + )], + ); + let res = send_trace_v1_req(&client, string_target_table_name, string_target_req, false).await; + assert_eq!(StatusCode::OK, res.status()); + + validate_data( + "otlp_traces_v1_type_coercion_to_string_rows", + &client, + &format!( + "select trace_id, \"span_attributes.attr_bool\", \"span_attributes.attr_float\", \"span_attributes.attr_int\" from {} order by trace_id;", + string_target_table_name + ), + r#"[["00000000000000000000000000000021","seed","seed","seed"],["00000000000000000000000000000022","false","2.5","20"]]"#, + ) + .await; + validate_data( + "otlp_traces_v1_type_coercion_to_string_schema", + &client, + "select column_name, lower(data_type), semantic_type from information_schema.columns where table_name = 'trace_type_coercion_to_string' and column_name in ('span_attributes.attr_bool', 'span_attributes.attr_float', 'span_attributes.attr_int') order by column_name;", + r#"[["span_attributes.attr_bool","string","FIELD"],["span_attributes.attr_float","string","FIELD"],["span_attributes.attr_int","string","FIELD"]]"#, + ) + .await; + + let intra_batch_prefer_non_string_table_name = "trace_type_prefer_non_string"; + let intra_batch_prefer_non_string_req = make_trace_v1_request( + "type-prefer-non-string", + vec![ + make_trace_v1_span( + "00000000000000000000000000000031", + "0000000000000031", + "prefer-non-string-seed", + 1_736_480_942_444_940_000, + 1_736_480_942_445_040_000, + vec![ + make_string_attr("attr_int", "10"), + make_string_attr("attr_float", "1.5"), + make_string_attr("attr_bool", "true"), + ], + ), + make_trace_v1_span( + "00000000000000000000000000000032", + "0000000000000032", + "prefer-non-string-apply", + 1_736_480_942_445_050_000, + 1_736_480_942_445_150_000, + vec![ + make_int_attr("attr_int", 20), + make_double_attr("attr_float", 2.5), + make_bool_attr("attr_bool", false), + ], + ), + ], + ); + let res = send_trace_v1_req( + &client, + intra_batch_prefer_non_string_table_name, + intra_batch_prefer_non_string_req, + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + validate_data( + "otlp_traces_v1_prefer_non_string_rows", + &client, + &format!( + "select trace_id, \"span_attributes.attr_bool\", \"span_attributes.attr_float\", \"span_attributes.attr_int\" from {} order by trace_id;", + intra_batch_prefer_non_string_table_name + ), + r#"[["00000000000000000000000000000031",true,1.5,10],["00000000000000000000000000000032",false,2.5,20]]"#, + ) + .await; + validate_data( + "otlp_traces_v1_prefer_non_string_schema", + &client, + "select column_name, lower(data_type), semantic_type from information_schema.columns where table_name = 'trace_type_prefer_non_string' and column_name in ('span_attributes.attr_bool', 'span_attributes.attr_float', 'span_attributes.attr_int') order by column_name;", + r#"[["span_attributes.attr_bool","boolean","FIELD"],["span_attributes.attr_float","double","FIELD"],["span_attributes.attr_int","bigint","FIELD"]]"#, + ) + .await; + + let existing_float_table_name = "trace_type_existing_float_prefers_schema"; + let existing_float_seed_req = make_trace_v1_request( + "type-existing-float", + vec![make_trace_v1_span( + "00000000000000000000000000000041", + "0000000000000041", + "existing-float-seed", + 1_736_480_942_445_160_000, + 1_736_480_942_445_260_000, + vec![make_double_attr("attr_num", 1.25)], + )], + ); + let res = send_trace_v1_req( + &client, + existing_float_table_name, + existing_float_seed_req, + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + let existing_float_req = make_trace_v1_request( + "type-existing-float", + vec![ + make_trace_v1_span( + "00000000000000000000000000000042", + "0000000000000042", + "existing-float-int-first", + 1_736_480_942_445_270_000, + 1_736_480_942_445_370_000, + vec![make_int_attr("attr_num", 2)], + ), + make_trace_v1_span( + "00000000000000000000000000000043", + "0000000000000043", + "existing-float-float-later", + 1_736_480_942_445_380_000, + 1_736_480_942_445_480_000, + vec![make_double_attr("attr_num", 3.5)], + ), + ], + ); + let res = send_trace_v1_req( + &client, + existing_float_table_name, + existing_float_req, + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + validate_data( + "otlp_traces_v1_existing_float_prefers_schema_rows", + &client, + &format!( + "select trace_id, \"span_attributes.attr_num\" from {} order by trace_id;", + existing_float_table_name + ), + r#"[["00000000000000000000000000000041",1.25],["00000000000000000000000000000042",2.0],["00000000000000000000000000000043",3.5]]"#, + ) + .await; + validate_data( + "otlp_traces_v1_existing_float_prefers_schema_type", + &client, + "select column_name, lower(data_type), semantic_type from information_schema.columns where table_name = 'trace_type_existing_float_prefers_schema' and column_name = 'span_attributes.attr_num';", + r#"[["span_attributes.attr_num","double","FIELD"]]"#, + ) + .await; + + validate_data( + "otlp_traces_v1_type_coercion_rows", + &client, + &format!( + "select trace_id, \"span_attributes.attr_int\", \"span_attributes.attr_bool\" from {} order by trace_id;", + coercion_table_name + ), + r#"[["00000000000000000000000000000001",10,true],["00000000000000000000000000000002",20,false]]"#, + ) + .await; + validate_data( + "otlp_traces_v1_type_coercion_float_sum", + &client, + &format!( + "select sum(\"span_attributes.attr_float\") from {};", + coercion_table_name + ), + r#"[[3.5]]"#, + ) + .await; + validate_data( + "otlp_traces_v1_type_coercion_schema", + &client, + "select column_name, lower(data_type), semantic_type from information_schema.columns where table_name = 'trace_type_coercion' and column_name in ('span_attributes.attr_bool', 'span_attributes.attr_float', 'span_attributes.attr_int') order by column_name;", + r#"[["span_attributes.attr_bool","boolean","FIELD"],["span_attributes.attr_float","double","FIELD"],["span_attributes.attr_int","bigint","FIELD"]]"#, + ) + .await; + + let abort_table_name = "trace_type_abort"; + let abort_seed_req = make_trace_v1_request( + "type-abort", + vec![make_trace_v1_span( + "00000000000000000000000000000011", + "0000000000000011", + "abort-seed", + 1_736_480_942_444_800_000, + 1_736_480_942_444_900_000, + vec![make_int_attr("attr_int", 10)], + )], + ); + let res = send_trace_v1_req(&client, abort_table_name, abort_seed_req, false).await; + assert_eq!(StatusCode::OK, res.status()); + + let abort_req = make_trace_v1_request( + "type-abort", + vec![ + make_trace_v1_span( + "00000000000000000000000000000012", + "0000000000000012", + "abort-parseable", + 1_736_480_942_445_000_000, + 1_736_480_942_445_100_000, + vec![make_string_attr("attr_int", "20")], + ), + make_trace_v1_span( + "00000000000000000000000000000013", + "0000000000000013", + "abort-unparsable", + 1_736_480_942_445_200_000, + 1_736_480_942_445_300_000, + vec![make_string_attr("attr_int", "not_a_number")], + ), + ], + ); + let res = send_trace_v1_req(&client, abort_table_name, abort_req, false).await; + assert_eq!(StatusCode::BAD_REQUEST, res.status()); + let body: Value = res.json().await; + assert!( + body["error"].as_str().unwrap().contains( + "failed to coerce trace column 'span_attributes.attr_int' in table 'trace_type_abort'" + ), + "unexpected error body: {body}" + ); + + validate_data( + "otlp_traces_v1_type_abort_rows", + &client, + &format!( + "select trace_id, \"span_attributes.attr_int\" from {} order by trace_id;", + abort_table_name + ), + r#"[["00000000000000000000000000000011",10]]"#, + ) + .await; + guard.remove_all().await; } @@ -7515,6 +7824,115 @@ async fn send_req( req.header("content-length", len).send().await } +async fn send_trace_v1_req( + client: &TestClient, + table_name: &str, + req: ExportTraceServiceRequest, + with_gzip: bool, +) -> TestResponse { + send_req( + client, + vec![ + ( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/x-protobuf"), + ), + ( + HeaderName::from_static("x-greptime-pipeline-name"), + HeaderValue::from_static(GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME), + ), + ( + HeaderName::from_static("x-greptime-trace-table-name"), + HeaderValue::from_str(table_name).unwrap(), + ), + ], + "/v1/otlp/v1/traces", + req.encode_to_vec(), + with_gzip, + ) + .await +} + +fn make_trace_v1_request(service_name: &str, spans: Vec) -> ExportTraceServiceRequest { + serde_json::from_value(json!({ + "resourceSpans": [{ + "resource": { + "attributes": [{ + "key": "service.name", + "value": { "stringValue": service_name } + }] + }, + "scopeSpans": [{ + "scope": { + "name": "trace-v1-type-tests" + }, + "spans": spans + }], + "schemaUrl": "https://opentelemetry.io/schemas/1.4.0" + }] + })) + .unwrap() +} + +fn make_trace_v1_span( + trace_id: &str, + span_id: &str, + name: &str, + start_time_unix_nano: i64, + end_time_unix_nano: i64, + attributes: Vec, +) -> Value { + json!({ + "traceId": trace_id, + "spanId": span_id, + "name": name, + "kind": 2, + "startTimeUnixNano": start_time_unix_nano.to_string(), + "endTimeUnixNano": end_time_unix_nano.to_string(), + "attributes": attributes, + "status": { + "message": "", + "code": 0 + } + }) +} + +fn make_string_attr(key: &str, value: &str) -> Value { + json!({ + "key": key, + "value": { + "stringValue": value + } + }) +} + +fn make_int_attr(key: &str, value: i64) -> Value { + json!({ + "key": key, + "value": { + "intValue": value.to_string() + } + }) +} + +fn make_double_attr(key: &str, value: f64) -> Value { + json!({ + "key": key, + "value": { + "doubleValue": value + } + }) +} + +fn make_bool_attr(key: &str, value: bool) -> Value { + json!({ + "key": key, + "value": { + "boolValue": value + } + }) +} + fn get_rows_from_output(output: &str) -> String { let resp: Value = serde_json::from_str(output).unwrap(); resp.get("output")