From 6fcca6e0d63f7e65a0dc42b0301761a52f5ea5e3 Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Tue, 14 Apr 2026 17:12:41 +0800 Subject: [PATCH] feat: implement trace type whitelist (#7930) * feat: implement trace type whitelist Signed-off-by: shuiyisong * chore: use opentelemetry_semantic_conventions for key name Signed-off-by: shuiyisong * chore: add ref doc in the comments Signed-off-by: shuiyisong * fix: fmt toml Signed-off-by: shuiyisong * chore: introduce trace_semconv.rs for holding the mapping Signed-off-by: shuiyisong * chore: update key list Signed-off-by: shuiyisong * fix: fmt Signed-off-by: shuiyisong --------- Signed-off-by: shuiyisong --- Cargo.lock | 9 +- src/frontend/Cargo.toml | 1 + src/frontend/src/instance/otlp.rs | 29 +- .../src/instance/otlp/trace_semconv.rs | 308 ++++++++++++++++++ src/frontend/src/instance/otlp/trace_types.rs | 184 ++++++++++- 5 files changed, 514 insertions(+), 17 deletions(-) create mode 100644 src/frontend/src/instance/otlp/trace_semconv.rs diff --git a/Cargo.lock b/Cargo.lock index cb14133c53..c400d77873 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2756,7 +2756,7 @@ dependencies = [ "once_cell", "opentelemetry 0.30.0", "opentelemetry-otlp", - "opentelemetry-semantic-conventions", + "opentelemetry-semantic-conventions 0.30.0", "opentelemetry_sdk 0.30.0", "parking_lot 0.12.4", "prometheus 0.14.0", @@ -5257,6 +5257,7 @@ dependencies = [ "meta-srv", "num_cpus", "opentelemetry-proto 0.31.0", + "opentelemetry-semantic-conventions 0.31.0", "operator", "otel-arrow-rust", "partition", @@ -8997,6 +8998,12 @@ version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "83d059a296a47436748557a353c5e6c5705b9470ef6c95cfc52c21a8814ddac2" +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846" + [[package]] name = "opentelemetry_sdk" version = "0.30.0" diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 1b0ffe6e29..aa9276a0cb 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -58,6 +58,7 @@ log-query.workspace = true meta-client.workspace = true num_cpus.workspace = true opentelemetry-proto.workspace = true +opentelemetry-semantic-conventions = { version = "0.31", features = ["semconv_experimental"] } operator.workspace = true otel-arrow-rust.workspace = true partition.workspace = true diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index 75168b3b9a..434a413fed 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -46,6 +46,7 @@ use snafu::{IntoError, ResultExt}; use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM}; use crate::instance::Instance; +use crate::instance::otlp::trace_semconv::trace_semconv_fixed_type; use crate::instance::otlp::trace_types::{ PendingTraceColumnRewrite, choose_trace_reconcile_decision, enrich_trace_reconcile_error, is_trace_reconcile_candidate_type, push_observed_trace_type, validate_trace_column_rewrites, @@ -54,6 +55,7 @@ use crate::metrics::{ OTLP_LOGS_ROWS, OTLP_METRICS_ROWS, OTLP_TRACES_FAILURE_COUNT, OTLP_TRACES_ROWS, }; +pub mod trace_semconv; pub mod trace_types; const TRACE_INGEST_CHUNK_SIZE: usize = 64; @@ -678,6 +680,7 @@ impl Instance { .ok() .map(|wrapper| wrapper.datatype()) }); + let fixed_type = trace_semconv_fixed_type(&col_schema.column_name); if !observed_types .iter() @@ -686,23 +689,27 @@ impl Instance { && existing_type .map(|datatype| !is_trace_reconcile_candidate_type(datatype)) .unwrap_or(true) + && fixed_type.is_none() { continue; } // Decide the final type once per column, then rewrite all affected cells // together in one row pass below. - let Some(decision) = - choose_trace_reconcile_decision(&observed_types, existing_type).map_err( - |_| { - enrich_trace_reconcile_error( - &req.table_name, - &col_schema.column_name, - &observed_types, - existing_type, - ) - }, - )? + let Some(decision) = choose_trace_reconcile_decision( + &col_schema.column_name, + &observed_types, + existing_type, + ) + .map_err(|_| { + enrich_trace_reconcile_error( + &req.table_name, + &col_schema.column_name, + &observed_types, + existing_type, + fixed_type, + ) + })? else { continue; }; diff --git a/src/frontend/src/instance/otlp/trace_semconv.rs b/src/frontend/src/instance/otlp/trace_semconv.rs new file mode 100644 index 0000000000..70ad5a0676 --- /dev/null +++ b/src/frontend/src/instance/otlp/trace_semconv.rs @@ -0,0 +1,308 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::ColumnDataType; +use opentelemetry_semantic_conventions::{attribute, resource}; + +/// Returns fixed scalar types for flattened trace semconv columns. +/// +/// The mapping is maintained from the official OpenTelemetry semantic +/// conventions docs under `docs/` and `docs/registry/attributes/` in: +/// https://github.com/open-telemetry/semantic-conventions/tree/main/docs +/// +/// Only attributes whose docs mark them as `Stable` or `Release Candidate` are +/// included here. `Development` and lower-stability attributes must keep the +/// dynamic reconciliation path instead. +pub(super) fn trace_semconv_fixed_type(column_name: &str) -> Option { + if let Some(resource_attribute) = column_name.strip_prefix("resource_attributes.") { + return match resource_attribute { + // Service resource attributes: + // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/resource/service.md + // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/registry/attributes/service.md + resource::SERVICE_NAME + | resource::SERVICE_INSTANCE_ID + | resource::SERVICE_NAMESPACE + | resource::SERVICE_VERSION => Some(ColumnDataType::String), + + // Telemetry SDK resource attributes: + // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/resource/README.md + // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/registry/attributes/telemetry.md + resource::TELEMETRY_SDK_LANGUAGE + | resource::TELEMETRY_SDK_NAME + | resource::TELEMETRY_SDK_VERSION => Some(ColumnDataType::String), + + // Container resource attributes: + // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/resource/container.md + resource::CONTAINER_ID => Some(ColumnDataType::String), + + // Browser resource attributes: + // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/resource/browser.md + resource::USER_AGENT_ORIGINAL => Some(ColumnDataType::String), + + // Kubernetes resource attributes: + // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/resource/k8s/README.md + resource::K8S_CLUSTER_NAME + | resource::K8S_CLUSTER_UID + | resource::K8S_CONTAINER_NAME + | resource::K8S_CRONJOB_NAME + | resource::K8S_CRONJOB_UID + | resource::K8S_DAEMONSET_NAME + | resource::K8S_DAEMONSET_UID + | resource::K8S_DEPLOYMENT_NAME + | resource::K8S_DEPLOYMENT_UID + | resource::K8S_JOB_NAME + | resource::K8S_JOB_UID + | resource::K8S_NAMESPACE_NAME + | resource::K8S_NODE_NAME + | resource::K8S_NODE_UID + | resource::K8S_POD_NAME + | resource::K8S_POD_UID + | resource::K8S_REPLICASET_NAME + | resource::K8S_REPLICASET_UID + | resource::K8S_STATEFULSET_NAME + | resource::K8S_STATEFULSET_UID + // The current docs include these `k8s.pod.*` keys, but crate 0.31 + // does not export constants for them yet. + | "k8s.pod.hostname" + | "k8s.pod.ip" + | "k8s.pod.start_time" => Some(ColumnDataType::String), + resource::K8S_CONTAINER_RESTART_COUNT => Some(ColumnDataType::Int64), + + _ => None, + }; + } + + if let Some(span_attribute) = column_name.strip_prefix("span_attributes.") { + return match span_attribute { + // General client, server, and network attributes: + // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/general/attributes.md + attribute::CLIENT_ADDRESS + | attribute::SERVER_ADDRESS + | attribute::NETWORK_LOCAL_ADDRESS + | attribute::NETWORK_PEER_ADDRESS + | attribute::NETWORK_PROTOCOL_NAME + | attribute::NETWORK_PROTOCOL_VERSION + | attribute::NETWORK_TRANSPORT + | attribute::NETWORK_TYPE => Some(ColumnDataType::String), + attribute::CLIENT_PORT + | attribute::SERVER_PORT + | attribute::NETWORK_LOCAL_PORT + | attribute::NETWORK_PEER_PORT => Some(ColumnDataType::Int64), + + // HTTP attributes: + // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/http/http-spans.md + // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/registry/attributes/http.md + attribute::HTTP_REQUEST_METHOD + | attribute::HTTP_REQUEST_METHOD_ORIGINAL + | attribute::HTTP_ROUTE + | attribute::URL_FULL + | attribute::URL_PATH + | attribute::URL_QUERY + | attribute::URL_SCHEME + | attribute::USER_AGENT_ORIGINAL => Some(ColumnDataType::String), + attribute::HTTP_REQUEST_RESEND_COUNT | attribute::HTTP_RESPONSE_STATUS_CODE => { + Some(ColumnDataType::Int64) + } + + // RPC attributes: + // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/rpc/rpc-spans.md + // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/registry/attributes/rpc.md + attribute::RPC_METHOD + | attribute::RPC_SYSTEM + // The current docs renamed this attribute to `rpc.system.name`, + // but crate 0.31 still exports `RPC_SYSTEM = "rpc.system"`. + | "rpc.system.name" + | "rpc.method_original" + | "rpc.response.status_code" => Some(ColumnDataType::String), + + // General database attributes: + // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/db/database-spans.md + // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/registry/attributes/db.md + attribute::DB_COLLECTION_NAME + | attribute::DB_NAMESPACE + | attribute::DB_OPERATION_NAME + | attribute::DB_QUERY_SUMMARY + | attribute::DB_QUERY_TEXT + | attribute::DB_RESPONSE_STATUS_CODE + | attribute::DB_STORED_PROCEDURE_NAME + | attribute::DB_SYSTEM_NAME => Some(ColumnDataType::String), + attribute::DB_OPERATION_BATCH_SIZE => Some(ColumnDataType::Int64), + + // Error attributes: + // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/registry/attributes/error.md + attribute::ERROR_TYPE => Some(ColumnDataType::String), + + _ => None, + }; + } + + None +} + +#[cfg(test)] +mod tests { + use api::v1::ColumnDataType; + use opentelemetry_semantic_conventions::{attribute, resource}; + + use super::trace_semconv_fixed_type; + + fn resource_column(key: &str) -> String { + format!("resource_attributes.{key}") + } + + fn span_column(key: &str) -> String { + format!("span_attributes.{key}") + } + + #[test] + fn test_trace_semconv_fixed_type_includes_stable_service_key() { + assert_eq!( + trace_semconv_fixed_type(&resource_column(resource::SERVICE_NAME)), + Some(ColumnDataType::String) + ); + assert_eq!( + trace_semconv_fixed_type(&resource_column(resource::SERVICE_VERSION)), + Some(ColumnDataType::String) + ); + assert_eq!( + trace_semconv_fixed_type(&resource_column(resource::SERVICE_INSTANCE_ID)), + Some(ColumnDataType::String) + ); + assert_eq!( + trace_semconv_fixed_type(&resource_column(resource::SERVICE_NAMESPACE)), + Some(ColumnDataType::String) + ); + } + + #[test] + fn test_trace_semconv_fixed_type_includes_http_server_and_error_keys() { + assert_eq!( + trace_semconv_fixed_type(&span_column(attribute::HTTP_RESPONSE_STATUS_CODE)), + Some(ColumnDataType::Int64) + ); + assert_eq!( + trace_semconv_fixed_type(&span_column(attribute::SERVER_PORT)), + Some(ColumnDataType::Int64) + ); + assert_eq!( + trace_semconv_fixed_type(&span_column(attribute::ERROR_TYPE)), + Some(ColumnDataType::String) + ); + assert_eq!( + trace_semconv_fixed_type(&span_column(attribute::CLIENT_ADDRESS)), + Some(ColumnDataType::String) + ); + assert_eq!( + trace_semconv_fixed_type(&span_column(attribute::CLIENT_PORT)), + Some(ColumnDataType::Int64) + ); + assert_eq!( + trace_semconv_fixed_type(&span_column(attribute::URL_FULL)), + Some(ColumnDataType::String) + ); + assert_eq!( + trace_semconv_fixed_type(&span_column(attribute::URL_PATH)), + Some(ColumnDataType::String) + ); + assert_eq!( + trace_semconv_fixed_type(&span_column(attribute::URL_QUERY)), + Some(ColumnDataType::String) + ); + assert_eq!( + trace_semconv_fixed_type(&span_column(attribute::URL_SCHEME)), + Some(ColumnDataType::String) + ); + assert_eq!( + trace_semconv_fixed_type(&span_column(attribute::USER_AGENT_ORIGINAL)), + Some(ColumnDataType::String) + ); + } + + #[test] + fn test_trace_semconv_fixed_type_includes_rc_rpc_key() { + assert_eq!( + trace_semconv_fixed_type(&span_column(attribute::RPC_SYSTEM)), + Some(ColumnDataType::String) + ); + assert_eq!( + trace_semconv_fixed_type("span_attributes.rpc.system.name"), + Some(ColumnDataType::String) + ); + assert_eq!( + trace_semconv_fixed_type(&span_column("rpc.response.status_code")), + Some(ColumnDataType::String) + ); + } + + #[test] + fn test_trace_semconv_fixed_type_includes_db_and_network_keys() { + assert_eq!( + trace_semconv_fixed_type(&span_column(attribute::DB_SYSTEM_NAME)), + Some(ColumnDataType::String) + ); + assert_eq!( + trace_semconv_fixed_type(&span_column(attribute::DB_OPERATION_BATCH_SIZE)), + Some(ColumnDataType::Int64) + ); + assert_eq!( + trace_semconv_fixed_type(&span_column(attribute::NETWORK_PEER_PORT)), + Some(ColumnDataType::Int64) + ); + } + + #[test] + fn test_trace_semconv_fixed_type_includes_resource_semconv_keys() { + assert_eq!( + trace_semconv_fixed_type(&resource_column(resource::CONTAINER_ID)), + Some(ColumnDataType::String) + ); + assert_eq!( + trace_semconv_fixed_type(&resource_column(resource::K8S_CONTAINER_RESTART_COUNT)), + Some(ColumnDataType::Int64) + ); + assert_eq!( + trace_semconv_fixed_type(&resource_column(resource::TELEMETRY_SDK_LANGUAGE)), + Some(ColumnDataType::String) + ); + assert_eq!( + trace_semconv_fixed_type(&resource_column(resource::TELEMETRY_SDK_NAME)), + Some(ColumnDataType::String) + ); + assert_eq!( + trace_semconv_fixed_type(&resource_column(resource::TELEMETRY_SDK_VERSION)), + Some(ColumnDataType::String) + ); + } + + #[test] + fn test_trace_semconv_fixed_type_excludes_development_keys() { + assert_eq!( + trace_semconv_fixed_type(&span_column("messaging.system")), + None + ); + assert_eq!( + trace_semconv_fixed_type(&span_column("rpc.request.metadata.x-request-id")), + None + ); + assert_eq!( + trace_semconv_fixed_type(&resource_column("k8s.pod.label.app")), + None + ); + } + + #[test] + fn test_trace_semconv_fixed_type_unknown_key() { + assert_eq!(trace_semconv_fixed_type(&span_column("custom.attr")), None); + } +} diff --git a/src/frontend/src/instance/otlp/trace_types.rs b/src/frontend/src/instance/otlp/trace_types.rs index 0be3df550e..86eaf9c6b1 100644 --- a/src/frontend/src/instance/otlp/trace_types.rs +++ b/src/frontend/src/instance/otlp/trace_types.rs @@ -19,6 +19,8 @@ use servers::otlp::trace::coerce::{ trace_value_datatype, }; +use crate::instance::otlp::trace_semconv::trace_semconv_fixed_type; + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(super) enum TraceReconcileDecision { UseExisting(ColumnDataType), @@ -51,9 +53,14 @@ pub(super) struct PendingTraceColumnRewrite { /// Existing table schema is authoritative unless the only incompatible case is /// widening an existing Int64 column to Float64 for incoming Int64/Float64 data. pub(super) fn choose_trace_reconcile_decision( + column_name: &str, observed_types: &[ColumnDataType], existing_type: Option, ) -> ServerResult> { + if let Some(fixed_type) = trace_semconv_fixed_type(column_name) { + return choose_fixed_trace_reconcile_decision(fixed_type, observed_types, existing_type); + } + let Some(existing_type) = existing_type else { return resolve_new_trace_column_type(observed_types.iter().copied()) .map(|target_type| target_type.map(TraceReconcileDecision::UseRequestLocal)) @@ -91,6 +98,37 @@ pub(super) fn choose_trace_reconcile_decision( .fail() } +fn choose_fixed_trace_reconcile_decision( + fixed_type: ColumnDataType, + observed_types: &[ColumnDataType], + existing_type: Option, +) -> ServerResult> { + let Some(existing_type) = existing_type else { + return Ok(Some(TraceReconcileDecision::UseRequestLocal(fixed_type))); + }; + + if existing_type == fixed_type { + return Ok(Some(TraceReconcileDecision::UseExisting(fixed_type))); + } + + if fixed_type == ColumnDataType::Float64 + && existing_type == ColumnDataType::Int64 + && observed_types.iter().all(|observed_type| { + matches!( + observed_type, + ColumnDataType::Int64 | ColumnDataType::Float64 + ) + }) + { + return Ok(Some(TraceReconcileDecision::AlterExistingTo(fixed_type))); + } + + error::InvalidParameterSnafu { + reason: "unsupported trace type mix".to_string(), + } + .fail() +} + /// Validate all pending trace column rewrites before any schema mutation happens. pub(super) fn validate_trace_column_rewrites( rows: &[Row], @@ -134,6 +172,7 @@ pub(super) fn enrich_trace_reconcile_error( column_name: &str, observed_types: &[ColumnDataType], existing_type: Option, + fixed_type: Option, ) -> servers::error::Error { let observed_types = observed_types .iter() @@ -142,12 +181,20 @@ pub(super) fn enrich_trace_reconcile_error( .join(", "); error::InvalidParameterSnafu { - reason: match existing_type { - Some(existing_type) => format!( + reason: match (existing_type, fixed_type) { + (Some(existing_type), Some(fixed_type)) => format!( + "failed to reconcile trace column '{}' in table '{}' with observed types [{}] against existing {:?} and fixed semconv {:?}", + column_name, table_name, observed_types, existing_type, fixed_type + ), + (Some(existing_type), None) => format!( "failed to reconcile trace column '{}' in table '{}' with observed types [{}] against existing {:?}", column_name, table_name, observed_types, existing_type ), - None => format!( + (None, Some(fixed_type)) => format!( + "failed to reconcile trace column '{}' in table '{}' with observed types [{}] and fixed semconv {:?}", + column_name, table_name, observed_types, fixed_type + ), + (None, None) => format!( "failed to reconcile trace column '{}' in table '{}' with observed types [{}]", column_name, table_name, observed_types ), @@ -194,8 +241,12 @@ mod tests { #[test] fn test_choose_trace_reconcile_decision_existing_int64_keeps_int64() { assert_eq!( - choose_trace_reconcile_decision(&[ColumnDataType::Int64], Some(ColumnDataType::Int64)) - .unwrap(), + choose_trace_reconcile_decision( + "span_attributes.attr_int", + &[ColumnDataType::Int64], + Some(ColumnDataType::Int64) + ) + .unwrap(), Some(TraceReconcileDecision::UseExisting(ColumnDataType::Int64)) ); } @@ -204,6 +255,7 @@ mod tests { fn test_choose_trace_reconcile_decision_existing_int64_widens_to_float64() { assert_eq!( choose_trace_reconcile_decision( + "span_attributes.attr_double", &[ColumnDataType::Int64, ColumnDataType::Float64], Some(ColumnDataType::Int64) ) @@ -218,6 +270,7 @@ mod tests { fn test_choose_trace_reconcile_decision_existing_float64_stays_authoritative() { assert_eq!( choose_trace_reconcile_decision( + "span_attributes.attr_double", &[ColumnDataType::Int64, ColumnDataType::Float64], Some(ColumnDataType::Float64) ) @@ -229,6 +282,7 @@ mod tests { #[test] fn test_choose_trace_reconcile_decision_existing_int64_with_boolean_is_error() { let err = choose_trace_reconcile_decision( + "span_attributes.attr_numeric", &[ColumnDataType::Boolean, ColumnDataType::Int64], Some(ColumnDataType::Int64), ) @@ -240,6 +294,76 @@ mod tests { fn test_choose_trace_reconcile_decision_request_local_prefers_float64() { assert_eq!( choose_trace_reconcile_decision( + "span_attributes.attr_numeric", + &[ColumnDataType::Int64, ColumnDataType::Float64], + None + ) + .unwrap(), + Some(TraceReconcileDecision::UseRequestLocal( + ColumnDataType::Float64 + )) + ); + } + + #[test] + fn test_choose_trace_reconcile_decision_whitelisted_new_int64_column_uses_fixed_type() { + assert_eq!( + choose_trace_reconcile_decision( + "span_attributes.http.response.status_code", + &[ColumnDataType::String, ColumnDataType::Int64], + None + ) + .unwrap(), + Some(TraceReconcileDecision::UseRequestLocal( + ColumnDataType::Int64 + )) + ); + } + + #[test] + fn test_choose_trace_reconcile_decision_new_boolean_column_uses_dynamic_resolution() { + assert_eq!( + choose_trace_reconcile_decision( + "span_attributes.messaging.destination.temporary", + &[ColumnDataType::String, ColumnDataType::Boolean], + None + ) + .unwrap(), + Some(TraceReconcileDecision::UseRequestLocal( + ColumnDataType::Boolean + )) + ); + } + + #[test] + fn test_choose_trace_reconcile_decision_whitelisted_existing_matching_type_uses_fixed_type() { + assert_eq!( + choose_trace_reconcile_decision( + "resource_attributes.service.name", + &[ColumnDataType::String], + Some(ColumnDataType::String) + ) + .unwrap(), + Some(TraceReconcileDecision::UseExisting(ColumnDataType::String)) + ); + } + + #[test] + fn test_choose_trace_reconcile_decision_whitelisted_existing_conflicting_type_is_error() { + let err = choose_trace_reconcile_decision( + "span_attributes.server.port", + &[ColumnDataType::Int64], + Some(ColumnDataType::String), + ) + .unwrap_err(); + assert_eq!(err.status_code(), StatusCode::InvalidArguments); + } + + #[test] + fn test_choose_trace_reconcile_decision_non_whitelisted_retains_dynamic_behavior() { + assert_eq!( + choose_trace_reconcile_decision( + "span_attributes.attr_numeric", &[ColumnDataType::Int64, ColumnDataType::Float64], None ) @@ -268,6 +392,40 @@ mod tests { assert_eq!(err.status_code(), StatusCode::InvalidArguments); } + #[test] + fn test_validate_trace_column_rewrites_whitelisted_values_validate_against_fixed_type() { + let rows = vec![Row { + values: vec![Value { + value_data: Some(ValueData::StringValue("503".to_string())), + }], + }]; + let pending_rewrites = vec![PendingTraceColumnRewrite { + col_idx: 0, + target_type: ColumnDataType::Int64, + column_name: "span_attributes.http.response.status_code".to_string(), + }]; + + validate_trace_column_rewrites(&rows, &pending_rewrites, "trace_type_atomicity").unwrap(); + } + + #[test] + fn test_validate_trace_column_rewrites_whitelisted_boolean_rejects_invalid_string_parse() { + let rows = vec![Row { + values: vec![Value { + value_data: Some(ValueData::StringValue("not_a_bool".to_string())), + }], + }]; + let pending_rewrites = vec![PendingTraceColumnRewrite { + col_idx: 0, + target_type: ColumnDataType::Boolean, + column_name: "span_attributes.messaging.destination.temporary".to_string(), + }]; + + let err = validate_trace_column_rewrites(&rows, &pending_rewrites, "trace_type_atomicity") + .unwrap_err(); + assert_eq!(err.status_code(), StatusCode::InvalidArguments); + } + #[test] fn test_enrich_trace_reconcile_error_includes_existing_type() { let err = enrich_trace_reconcile_error( @@ -275,6 +433,7 @@ mod tests { "span_attributes.attr_int", &[ColumnDataType::String, ColumnDataType::Int64], Some(ColumnDataType::Boolean), + None, ); assert_eq!(err.status_code(), StatusCode::InvalidArguments); @@ -282,6 +441,21 @@ mod tests { assert!(err.to_string().contains("Boolean")); } + #[test] + fn test_enrich_trace_reconcile_error_includes_fixed_semconv_type() { + let err = enrich_trace_reconcile_error( + "trace_type_atomicity", + "span_attributes.server.port", + &[ColumnDataType::String, ColumnDataType::Int64], + Some(ColumnDataType::String), + Some(ColumnDataType::Int64), + ); + + assert_eq!(err.status_code(), StatusCode::InvalidArguments); + assert!(err.to_string().contains("span_attributes.server.port")); + assert!(err.to_string().contains("fixed semconv Int64")); + } + #[test] fn test_is_trace_reconcile_candidate_type_filters_non_scalar_types() { assert!(is_trace_reconcile_candidate_type(ColumnDataType::String));