fix: allow auto type upscale conversion in trace ingestion (#7870)

* fix: allow auto type upscale conversion in trace ingestion

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: immediate return when parse fails

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: typos

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* test: add integration test and fix

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* feat: add Int/Float/Bool to String conversion

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* refactor: coerce rows together

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* refactor: extract coerce

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* refactor: save clone

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: add comments

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* refactor: unify in- and cross-batch check

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: add comments

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
shuiyisong
2026-03-30 16:30:32 +08:00
committed by GitHub
parent 6bd14aaf9f
commit a8fe6b5e44
6 changed files with 1258 additions and 31 deletions

View File

@@ -14,6 +14,8 @@
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::{ColumnDataType, RowInsertRequests};
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::Output;
@@ -24,10 +26,14 @@ use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
use pipeline::{GreptimePipelineParams, PipelineWay};
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::error::{self, AuthSnafu, CatalogSnafu, Result as ServerResult};
use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
use servers::otlp;
use servers::otlp::trace::coerce::{
coerce_value_data, is_supported_trace_coercion, resolve_new_trace_column_type,
trace_value_datatype,
};
use servers::query_handler::{OpenTelemetryProtocolHandler, PipelineHandlerRef};
use session::context::QueryContextRef;
use snafu::ResultExt;
@@ -124,7 +130,7 @@ impl OpenTelemetryProtocolHandler for Instance {
let is_trace_v1_model = matches!(pipeline, PipelineWay::OtlpTraceDirectV1);
let (requests, rows) = otlp::trace::to_grpc_insert_requests(
let (mut requests, rows) = otlp::trace::to_grpc_insert_requests(
request,
pipeline,
pipeline_params,
@@ -136,6 +142,8 @@ impl OpenTelemetryProtocolHandler for Instance {
OTLP_TRACES_ROWS.inc_by(rows as u64);
if is_trace_v1_model {
self.reconcile_trace_column_types(&mut requests, &ctx)
.await?;
self.handle_trace_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
@@ -200,3 +208,224 @@ impl OpenTelemetryProtocolHandler for Instance {
Ok(outputs)
}
}
impl Instance {
/// Picks the final datatype for one trace column.
///
/// Existing table schema is authoritative when present. Otherwise we resolve the
/// request-local observed types using the shared trace coercion rules.
fn choose_trace_target_type(
observed_types: &[ColumnDataType],
existing_type: Option<ColumnDataType>,
) -> ServerResult<Option<ColumnDataType>> {
let Some(existing_type) = existing_type else {
return resolve_new_trace_column_type(observed_types.iter().copied()).map_err(|_| {
error::InvalidParameterSnafu {
reason: "unsupported trace type mix".to_string(),
}
.build()
});
};
if observed_types.iter().copied().all(|request_type| {
request_type == existing_type
|| is_supported_trace_coercion(request_type, existing_type)
}) {
Ok(Some(existing_type))
} else {
error::InvalidParameterSnafu {
reason: "unsupported trace type mix".to_string(),
}
.fail()
}
}
/// Coerce request column types and values to match the existing table schema
/// for compatible type pairs. Existing table schema wins when present;
/// otherwise the full request batch decides a stable target type.
async fn reconcile_trace_column_types(
&self,
requests: &mut RowInsertRequests,
ctx: &QueryContextRef,
) -> ServerResult<()> {
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
for req in &mut requests.inserts {
let table = self
.catalog_manager
.table(catalog, &schema, &req.table_name, None)
.await
.context(CatalogSnafu)?;
let Some(rows) = req.rows.as_mut() else {
continue;
};
let table_schema = table.map(|table| table.schema());
let mut pending_coercions = Vec::new();
for (col_idx, col_schema) in rows.schema.iter().enumerate() {
let Some(current_type) = ColumnDataType::try_from(col_schema.datatype).ok() else {
continue;
};
let mut observed_types = Vec::new();
push_observed_trace_type(&mut observed_types, current_type);
// Scan the full request first so the final type decision is not affected
// by row order inside the batch.
for row in &rows.rows {
let Some(value) = row
.values
.get(col_idx)
.and_then(|value| value.value_data.as_ref())
else {
continue;
};
let Some(value_type) = trace_value_datatype(value) else {
continue;
};
push_observed_trace_type(&mut observed_types, value_type);
}
let existing_type = table_schema
.as_ref()
.and_then(|schema| schema.column_schema_by_name(&col_schema.column_name))
.and_then(|table_col| {
ColumnDataTypeWrapper::try_from(table_col.data_type.clone())
.ok()
.map(|wrapper| wrapper.datatype())
});
if !observed_types
.iter()
.copied()
.any(is_trace_reconcile_candidate_type)
&& existing_type
.map(|datatype| !is_trace_reconcile_candidate_type(datatype))
.unwrap_or(true)
{
continue;
}
// Decide the final type once per column, then rewrite all affected cells
// together in one row pass below.
let Some(target_type) =
Self::choose_trace_target_type(&observed_types, existing_type).map_err(
|_| {
enrich_trace_reconcile_error(
&req.table_name,
&col_schema.column_name,
&observed_types,
existing_type,
)
},
)?
else {
continue;
};
if observed_types
.iter()
.all(|observed| *observed == target_type)
&& col_schema.datatype == target_type as i32
{
continue;
}
pending_coercions.push((col_idx, target_type, col_schema.column_name.clone()));
}
if pending_coercions.is_empty() {
continue;
}
// Update schema metadata before mutating row values so both stay in sync.
for (col_idx, target_type, ..) in &pending_coercions {
rows.schema[*col_idx].datatype = *target_type as i32;
}
// Apply all pending column rewrites in one row pass.
for row in &mut rows.rows {
for (col_idx, target_type, column_name) in &pending_coercions {
let Some(value) = row.values.get_mut(*col_idx) else {
continue;
};
let Some(request_type) =
value.value_data.as_ref().and_then(trace_value_datatype)
else {
continue;
};
if request_type == *target_type {
continue;
}
value.value_data = coerce_value_data(
&value.value_data,
*target_type,
request_type,
)
.map_err(|_| {
error::InvalidParameterSnafu {
reason: format!(
"failed to coerce trace column '{}' in table '{}' from {:?} to {:?}",
column_name, req.table_name, request_type, target_type
),
}
.build()
})?;
}
}
}
Ok(())
}
}
fn enrich_trace_reconcile_error(
table_name: &str,
column_name: &str,
observed_types: &[ColumnDataType],
existing_type: Option<ColumnDataType>,
) -> servers::error::Error {
let observed_types = observed_types
.iter()
.map(|datatype| format!("{datatype:?}"))
.collect::<Vec<_>>()
.join(", ");
error::InvalidParameterSnafu {
reason: match existing_type {
Some(existing_type) => format!(
"failed to reconcile trace column '{}' in table '{}' with observed types [{}] against existing {:?}",
column_name, table_name, observed_types, existing_type
),
None => format!(
"failed to reconcile trace column '{}' in table '{}' with observed types [{}]",
column_name, table_name, observed_types
),
},
}
.build()
}
/// Only these trace scalar types participate in reconciliation. Other column kinds
/// such as JSON and binary keep their original write path and schema checks.
fn is_trace_reconcile_candidate_type(datatype: ColumnDataType) -> bool {
matches!(
datatype,
ColumnDataType::String
| ColumnDataType::Boolean
| ColumnDataType::Int64
| ColumnDataType::Float64
)
}
/// Keeps the observed type list small without depending on enum ordering.
fn push_observed_trace_type(observed_types: &mut Vec<ColumnDataType>, datatype: ColumnDataType) {
if !observed_types.contains(&datatype) {
observed_types.push(datatype);
}
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
pub mod attributes;
pub mod coerce;
pub mod span;
pub mod v0;
pub mod v1;

View File

@@ -0,0 +1,343 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::ColumnDataType;
use api::v1::value::ValueData;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TraceCoerceError {
Unsupported,
}
// For now we support the following coercions:
// - Int64 to Float64
// - Int64 to String
// - Float64 to String
// - Boolean to String
// The following coercions are supported with parse, which could fail:
// If fails, we will return TraceCoerceError::Unsupported.
// - String to Int64
// - String to Float64
// - String to Boolean
pub fn is_supported_trace_coercion(
request_type: ColumnDataType,
target_type: ColumnDataType,
) -> bool {
matches!(
(request_type, target_type),
(ColumnDataType::Int64, ColumnDataType::Float64)
| (ColumnDataType::Int64, ColumnDataType::String)
| (ColumnDataType::Float64, ColumnDataType::String)
| (ColumnDataType::Boolean, ColumnDataType::String)
| (ColumnDataType::String, ColumnDataType::Int64)
| (ColumnDataType::String, ColumnDataType::Float64)
| (ColumnDataType::String, ColumnDataType::Boolean)
)
}
pub fn coerce_value_data(
value: &Option<ValueData>,
target: ColumnDataType,
request_type: ColumnDataType,
) -> Result<Option<ValueData>, TraceCoerceError> {
let Some(v) = value else {
return Ok(None);
};
let Some(value) = coerce_non_null_value(target, request_type, v) else {
return Err(TraceCoerceError::Unsupported);
};
Ok(Some(value))
}
pub fn coerce_non_null_value(
target: ColumnDataType,
request_type: ColumnDataType,
value: &ValueData,
) -> Option<ValueData> {
match (request_type, target, value) {
(ColumnDataType::Int64, ColumnDataType::Float64, ValueData::I64Value(n)) => {
Some(ValueData::F64Value(*n as f64))
}
(ColumnDataType::Int64, ColumnDataType::String, ValueData::I64Value(n)) => {
Some(ValueData::StringValue(n.to_string()))
}
(ColumnDataType::Float64, ColumnDataType::String, ValueData::F64Value(n)) => {
Some(ValueData::StringValue(n.to_string()))
}
(ColumnDataType::Boolean, ColumnDataType::String, ValueData::BoolValue(b)) => {
Some(ValueData::StringValue(b.to_string()))
}
(ColumnDataType::String, ColumnDataType::Int64, ValueData::StringValue(s)) => {
s.parse::<i64>().ok().map(ValueData::I64Value)
}
(ColumnDataType::String, ColumnDataType::Float64, ValueData::StringValue(s)) => {
s.parse::<f64>().ok().map(ValueData::F64Value)
}
(ColumnDataType::String, ColumnDataType::Boolean, ValueData::StringValue(s)) => {
s.parse::<bool>().ok().map(ValueData::BoolValue)
}
_ => None,
}
}
pub fn trace_value_datatype(value: &ValueData) -> Option<ColumnDataType> {
match value {
ValueData::StringValue(_) => Some(ColumnDataType::String),
ValueData::BoolValue(_) => Some(ColumnDataType::Boolean),
ValueData::I64Value(_) => Some(ColumnDataType::Int64),
ValueData::F64Value(_) => Some(ColumnDataType::Float64),
ValueData::BinaryValue(_) => Some(ColumnDataType::Binary),
_ => None,
}
}
/// Resolves the final datatype for a new trace column when there is no existing
/// table schema to override the request-local observations.
pub fn resolve_new_trace_column_type(
observed_types: impl IntoIterator<Item = ColumnDataType>,
) -> Result<Option<ColumnDataType>, TraceCoerceError> {
let mut observed_types = observed_types.into_iter().collect::<Vec<_>>();
observed_types.dedup();
match observed_types.as_slice() {
[] => Ok(None),
[datatype] => Ok(Some(*datatype)),
[_, _]
if observed_types.contains(&ColumnDataType::String)
&& observed_types.contains(&ColumnDataType::Boolean) =>
{
Ok(Some(ColumnDataType::Boolean))
}
[_, _]
if observed_types.contains(&ColumnDataType::String)
&& observed_types.contains(&ColumnDataType::Int64) =>
{
Ok(Some(ColumnDataType::Int64))
}
[_, _]
if observed_types.contains(&ColumnDataType::String)
&& observed_types.contains(&ColumnDataType::Float64) =>
{
Ok(Some(ColumnDataType::Float64))
}
[_, _]
if observed_types.contains(&ColumnDataType::Int64)
&& observed_types.contains(&ColumnDataType::Float64) =>
{
Ok(Some(ColumnDataType::Float64))
}
_ => Err(TraceCoerceError::Unsupported),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_coerce_int64_to_float64() {
let result = coerce_value_data(
&Some(ValueData::I64Value(42)),
ColumnDataType::Float64,
ColumnDataType::Int64,
);
assert_eq!(result, Ok(Some(ValueData::F64Value(42.0))));
}
#[test]
fn test_coerce_string_to_int64() {
let result = coerce_value_data(
&Some(ValueData::StringValue("123".to_string())),
ColumnDataType::Int64,
ColumnDataType::String,
);
assert_eq!(result, Ok(Some(ValueData::I64Value(123))));
}
#[test]
fn test_coerce_int64_to_string() {
let result = coerce_value_data(
&Some(ValueData::I64Value(123)),
ColumnDataType::String,
ColumnDataType::Int64,
);
assert_eq!(result, Ok(Some(ValueData::StringValue("123".to_string()))));
}
#[test]
fn test_coerce_string_to_float64() {
let result = coerce_value_data(
&Some(ValueData::StringValue("1.5".to_string())),
ColumnDataType::Float64,
ColumnDataType::String,
);
assert_eq!(result, Ok(Some(ValueData::F64Value(1.5))));
}
#[test]
fn test_coerce_float64_to_string() {
let result = coerce_value_data(
&Some(ValueData::F64Value(1.5)),
ColumnDataType::String,
ColumnDataType::Float64,
);
assert_eq!(result, Ok(Some(ValueData::StringValue("1.5".to_string()))));
}
#[test]
fn test_coerce_string_to_boolean() {
let result = coerce_value_data(
&Some(ValueData::StringValue("true".to_string())),
ColumnDataType::Boolean,
ColumnDataType::String,
);
assert_eq!(result, Ok(Some(ValueData::BoolValue(true))));
let result = coerce_value_data(
&Some(ValueData::StringValue("false".to_string())),
ColumnDataType::Boolean,
ColumnDataType::String,
);
assert_eq!(result, Ok(Some(ValueData::BoolValue(false))));
}
#[test]
fn test_coerce_boolean_to_string() {
let result = coerce_value_data(
&Some(ValueData::BoolValue(true)),
ColumnDataType::String,
ColumnDataType::Boolean,
);
assert_eq!(result, Ok(Some(ValueData::StringValue("true".to_string()))));
}
#[test]
fn test_coerce_unparsable_string() {
let result = coerce_value_data(
&Some(ValueData::StringValue("not_a_number".to_string())),
ColumnDataType::Int64,
ColumnDataType::String,
);
assert_eq!(result, Err(TraceCoerceError::Unsupported));
}
#[test]
fn test_coerce_float64_to_int64_not_supported() {
let result = coerce_value_data(
&Some(ValueData::F64Value(1.5)),
ColumnDataType::Int64,
ColumnDataType::Float64,
);
assert_eq!(result, Err(TraceCoerceError::Unsupported));
}
#[test]
fn test_coerce_none_value() {
let result = coerce_value_data(&None, ColumnDataType::Float64, ColumnDataType::Int64);
assert_eq!(result, Ok(None));
}
#[test]
fn test_is_supported_trace_coercion() {
assert!(is_supported_trace_coercion(
ColumnDataType::Int64,
ColumnDataType::Float64
));
assert!(is_supported_trace_coercion(
ColumnDataType::Int64,
ColumnDataType::String
));
assert!(is_supported_trace_coercion(
ColumnDataType::Float64,
ColumnDataType::String
));
assert!(is_supported_trace_coercion(
ColumnDataType::Boolean,
ColumnDataType::String
));
assert!(is_supported_trace_coercion(
ColumnDataType::String,
ColumnDataType::Int64
));
assert!(is_supported_trace_coercion(
ColumnDataType::String,
ColumnDataType::Float64
));
assert!(is_supported_trace_coercion(
ColumnDataType::String,
ColumnDataType::Boolean
));
assert!(!is_supported_trace_coercion(
ColumnDataType::Binary,
ColumnDataType::Json
));
}
#[test]
fn test_trace_value_datatype() {
assert_eq!(
trace_value_datatype(&ValueData::StringValue("x".to_string())),
Some(ColumnDataType::String)
);
assert_eq!(
trace_value_datatype(&ValueData::BoolValue(true)),
Some(ColumnDataType::Boolean)
);
assert_eq!(
trace_value_datatype(&ValueData::I64Value(1)),
Some(ColumnDataType::Int64)
);
assert_eq!(
trace_value_datatype(&ValueData::F64Value(1.0)),
Some(ColumnDataType::Float64)
);
assert_eq!(
trace_value_datatype(&ValueData::BinaryValue(vec![1_u8])),
Some(ColumnDataType::Binary)
);
}
#[test]
fn test_resolve_new_trace_column_type() {
assert_eq!(
resolve_new_trace_column_type([ColumnDataType::Int64]),
Ok(Some(ColumnDataType::Int64))
);
assert_eq!(
resolve_new_trace_column_type([ColumnDataType::String, ColumnDataType::Int64]),
Ok(Some(ColumnDataType::Int64))
);
assert_eq!(
resolve_new_trace_column_type([ColumnDataType::String, ColumnDataType::Float64]),
Ok(Some(ColumnDataType::Float64))
);
assert_eq!(
resolve_new_trace_column_type([ColumnDataType::String, ColumnDataType::Boolean]),
Ok(Some(ColumnDataType::Boolean))
);
assert_eq!(
resolve_new_trace_column_type([ColumnDataType::Int64, ColumnDataType::Float64]),
Ok(Some(ColumnDataType::Float64))
);
assert_eq!(
resolve_new_trace_column_type([
ColumnDataType::String,
ColumnDataType::Int64,
ColumnDataType::Float64,
]),
Err(TraceCoerceError::Unsupported)
);
}
}

View File

@@ -230,7 +230,7 @@ fn write_trace_operations_to_row(
Ok(())
}
fn write_attributes(
pub(crate) fn write_attributes(
writer: &mut TableData,
prefix: &str,
attributes: Attributes,
@@ -247,44 +247,40 @@ fn write_attributes(
let key = format!("{}.{}", prefix, key_suffix);
match attr.value.and_then(|v| v.value) {
Some(OtlpValue::StringValue(v)) => {
row_writer::write_fields(
writer,
std::iter::once(make_string_column_data(&key, Some(v))),
// Keep the raw request value here. Mixed trace types are reconciled later
// in the frontend once we can also see the existing table schema.
writer.write_field_unchecked(
&key,
ColumnDataType::String,
Some(ValueData::StringValue(v)),
row,
)?;
);
}
Some(OtlpValue::BoolValue(v)) => {
row_writer::write_fields(
writer,
std::iter::once(make_column_data(
&key,
ColumnDataType::Boolean,
Some(ValueData::BoolValue(v)),
)),
// Do not coerce or promote types while building the request-local rows.
writer.write_field_unchecked(
&key,
ColumnDataType::Boolean,
Some(ValueData::BoolValue(v)),
row,
)?;
);
}
Some(OtlpValue::IntValue(v)) => {
row_writer::write_fields(
writer,
std::iter::once(make_column_data(
&key,
ColumnDataType::Int64,
Some(ValueData::I64Value(v)),
)),
// Preserving the original value avoids order-dependent behavior inside one batch.
writer.write_field_unchecked(
&key,
ColumnDataType::Int64,
Some(ValueData::I64Value(v)),
row,
)?;
);
}
Some(OtlpValue::DoubleValue(v)) => {
row_writer::write_fields(
writer,
std::iter::once(make_column_data(
&key,
ColumnDataType::Float64,
Some(ValueData::F64Value(v)),
)),
writer.write_field_unchecked(
&key,
ColumnDataType::Float64,
Some(ValueData::F64Value(v)),
row,
)?;
);
}
Some(OtlpValue::ArrayValue(v)) => row_writer::write_json(
writer,
@@ -315,3 +311,214 @@ fn write_attributes(
Ok(())
}
#[cfg(test)]
mod tests {
use api::v1::value::ValueData;
use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue;
use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue};
use super::*;
use crate::otlp::trace::attributes::Attributes;
use crate::row_writer::TableData;
fn make_kv(key: &str, value: OtlpValue) -> KeyValue {
KeyValue {
key: key.to_string(),
value: Some(AnyValue { value: Some(value) }),
}
}
#[test]
fn test_keep_mixed_numeric_values_until_frontend_reconciliation() {
let mut writer = TableData::new(4, 2);
let attrs1 = Attributes::from(vec![make_kv("val", OtlpValue::DoubleValue(1.5))]);
let mut row1 = writer.alloc_one_row();
write_attributes(&mut writer, "attr", attrs1, &mut row1).unwrap();
writer.add_row(row1);
let attrs2 = Attributes::from(vec![make_kv("val", OtlpValue::IntValue(42))]);
let mut row2 = writer.alloc_one_row();
write_attributes(&mut writer, "attr", attrs2, &mut row2).unwrap();
writer.add_row(row2);
let (schema, rows) = writer.into_schema_and_rows();
let col_idx = schema
.iter()
.position(|c| c.column_name == "attr.val")
.unwrap();
assert_eq!(schema[col_idx].datatype, ColumnDataType::Float64 as i32);
assert_eq!(
rows[0].values[col_idx].value_data,
Some(ValueData::F64Value(1.5))
);
assert_eq!(
rows[1].values[col_idx].value_data,
Some(ValueData::I64Value(42))
);
}
#[test]
fn test_keep_mixed_string_and_int_values_until_frontend_reconciliation() {
let mut writer = TableData::new(4, 2);
let attrs1 = Attributes::from(vec![make_kv("val", OtlpValue::IntValue(10))]);
let mut row1 = writer.alloc_one_row();
write_attributes(&mut writer, "attr", attrs1, &mut row1).unwrap();
writer.add_row(row1);
let attrs2 = Attributes::from(vec![make_kv(
"val",
OtlpValue::StringValue("20".to_string()),
)]);
let mut row2 = writer.alloc_one_row();
write_attributes(&mut writer, "attr", attrs2, &mut row2).unwrap();
writer.add_row(row2);
let (schema, rows) = writer.into_schema_and_rows();
let col_idx = schema
.iter()
.position(|c| c.column_name == "attr.val")
.unwrap();
assert_eq!(schema[col_idx].datatype, ColumnDataType::Int64 as i32);
assert_eq!(
rows[1].values[col_idx].value_data,
Some(ValueData::StringValue("20".to_string()))
);
}
#[test]
fn test_keep_first_seen_schema_until_frontend_reconciliation() {
let mut writer = TableData::new(4, 2);
let attrs1 = Attributes::from(vec![make_kv(
"val",
OtlpValue::StringValue("10".to_string()),
)]);
let mut row1 = writer.alloc_one_row();
write_attributes(&mut writer, "attr", attrs1, &mut row1).unwrap();
writer.add_row(row1);
let attrs2 = Attributes::from(vec![make_kv("val", OtlpValue::IntValue(20))]);
let mut row2 = writer.alloc_one_row();
write_attributes(&mut writer, "attr", attrs2, &mut row2).unwrap();
writer.add_row(row2);
let (schema, rows) = writer.into_schema_and_rows();
let col_idx = schema
.iter()
.position(|c| c.column_name == "attr.val")
.unwrap();
assert_eq!(schema[col_idx].datatype, ColumnDataType::String as i32);
assert_eq!(
rows[0].values[col_idx].value_data,
Some(ValueData::StringValue("10".to_string()))
);
assert_eq!(
rows[1].values[col_idx].value_data,
Some(ValueData::I64Value(20))
);
}
#[test]
fn test_keep_mixed_string_and_float_values_until_frontend_reconciliation() {
let mut writer = TableData::new(4, 2);
let attrs1 = Attributes::from(vec![make_kv("val", OtlpValue::DoubleValue(1.5))]);
let mut row1 = writer.alloc_one_row();
write_attributes(&mut writer, "attr", attrs1, &mut row1).unwrap();
writer.add_row(row1);
let attrs2 = Attributes::from(vec![make_kv(
"val",
OtlpValue::StringValue("1.5".to_string()),
)]);
let mut row2 = writer.alloc_one_row();
write_attributes(&mut writer, "attr", attrs2, &mut row2).unwrap();
writer.add_row(row2);
let (schema, rows) = writer.into_schema_and_rows();
let col_idx = schema
.iter()
.position(|c| c.column_name == "attr.val")
.unwrap();
assert_eq!(schema[col_idx].datatype, ColumnDataType::Float64 as i32);
assert_eq!(
rows[1].values[col_idx].value_data,
Some(ValueData::StringValue("1.5".to_string()))
);
}
#[test]
fn test_keep_mixed_string_and_bool_values_until_frontend_reconciliation() {
let mut writer = TableData::new(4, 2);
let attrs1 = Attributes::from(vec![make_kv(
"val",
OtlpValue::StringValue("true".to_string()),
)]);
let mut row1 = writer.alloc_one_row();
write_attributes(&mut writer, "attr", attrs1, &mut row1).unwrap();
writer.add_row(row1);
let attrs2 = Attributes::from(vec![make_kv("val", OtlpValue::BoolValue(false))]);
let mut row2 = writer.alloc_one_row();
write_attributes(&mut writer, "attr", attrs2, &mut row2).unwrap();
writer.add_row(row2);
let (schema, rows) = writer.into_schema_and_rows();
let col_idx = schema
.iter()
.position(|c| c.column_name == "attr.val")
.unwrap();
assert_eq!(schema[col_idx].datatype, ColumnDataType::String as i32);
assert_eq!(
rows[0].values[col_idx].value_data,
Some(ValueData::StringValue("true".to_string()))
);
assert_eq!(
rows[1].values[col_idx].value_data,
Some(ValueData::BoolValue(false))
);
}
#[test]
fn test_keep_mixed_binary_and_string_values_until_frontend_reconciliation() {
let mut writer = TableData::new(4, 2);
let attrs1 = Attributes::from(vec![make_kv(
"val",
OtlpValue::BytesValue(vec![1_u8, 2, 3]),
)]);
let mut row1 = writer.alloc_one_row();
write_attributes(&mut writer, "attr", attrs1, &mut row1).unwrap();
writer.add_row(row1);
let attrs2 = Attributes::from(vec![make_kv(
"val",
OtlpValue::StringValue("false".to_string()),
)]);
let mut row2 = writer.alloc_one_row();
write_attributes(&mut writer, "attr", attrs2, &mut row2).unwrap();
writer.add_row(row2);
let (schema, rows) = writer.into_schema_and_rows();
let col_idx = schema
.iter()
.position(|c| c.column_name == "attr.val")
.unwrap();
assert_eq!(schema[col_idx].datatype, ColumnDataType::Binary as i32);
assert_eq!(
rows[0].values[col_idx].value_data,
Some(ValueData::BinaryValue(vec![1_u8, 2, 3]))
);
assert_eq!(
rows[1].values[col_idx].value_data,
Some(ValueData::StringValue("false".to_string()))
);
}
// Conversion matrix coverage lives in the shared coercion helper tests.
}

View File

@@ -77,6 +77,35 @@ impl TableData {
pub fn into_schema_and_rows(self) -> (Vec<ColumnSchema>, Vec<Row>) {
(self.schema, self.rows)
}
/// Writes a field value without enforcing that later writes use the same datatype
/// as the first-seen schema entry.
///
/// The OTLP trace v1 path uses this to preserve raw mixed values inside one request
/// so the frontend can reconcile them later against both the full batch and the
/// existing table schema.
pub fn write_field_unchecked(
&mut self,
name: impl ToString,
datatype: ColumnDataType,
value: Option<ValueData>,
one_row: &mut Vec<Value>,
) {
let name = name.to_string();
if let Some(index) = self.column_indexes.get(&name).copied() {
one_row[index].value_data = value;
} else {
let index = self.schema.len();
self.schema.push(ColumnSchema {
column_name: name.clone(),
datatype: datatype as i32,
semantic_type: SemanticType::Field as i32,
..Default::default()
});
self.column_indexes.insert(name, index);
one_row.push(Value { value_data: value });
}
}
}
pub struct MultiTableData {

View File

@@ -5283,6 +5283,315 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
)
.await;
let coercion_table_name = "trace_type_coercion";
let coercion_req = make_trace_v1_request(
"type-coercion",
vec![make_trace_v1_span(
"00000000000000000000000000000001",
"0000000000000001",
"coercion-seed",
1_736_480_942_444_376_000,
1_736_480_942_444_499_000,
vec![
make_double_attr("attr_float", 1.5),
make_int_attr("attr_int", 10),
make_bool_attr("attr_bool", true),
],
)],
);
let res = send_trace_v1_req(&client, coercion_table_name, coercion_req, false).await;
assert_eq!(StatusCode::OK, res.status());
let coercion_req = make_trace_v1_request(
"type-coercion",
vec![make_trace_v1_span(
"00000000000000000000000000000002",
"0000000000000002",
"coercion-apply",
1_736_480_942_444_589_000,
1_736_480_942_444_712_000,
vec![
make_int_attr("attr_float", 2),
make_string_attr("attr_int", "20"),
make_string_attr("attr_bool", "false"),
],
)],
);
let res = send_trace_v1_req(&client, coercion_table_name, coercion_req, false).await;
assert_eq!(StatusCode::OK, res.status());
let string_target_table_name = "trace_type_coercion_to_string";
let string_target_seed_req = make_trace_v1_request(
"type-coercion-string",
vec![make_trace_v1_span(
"00000000000000000000000000000021",
"0000000000000021",
"string-target-seed",
1_736_480_942_444_720_000,
1_736_480_942_444_820_000,
vec![
make_string_attr("attr_int", "seed"),
make_string_attr("attr_float", "seed"),
make_string_attr("attr_bool", "seed"),
],
)],
);
let res = send_trace_v1_req(
&client,
string_target_table_name,
string_target_seed_req,
false,
)
.await;
assert_eq!(StatusCode::OK, res.status());
let string_target_req = make_trace_v1_request(
"type-coercion-string",
vec![make_trace_v1_span(
"00000000000000000000000000000022",
"0000000000000022",
"string-target-apply",
1_736_480_942_444_830_000,
1_736_480_942_444_930_000,
vec![
make_int_attr("attr_int", 20),
make_double_attr("attr_float", 2.5),
make_bool_attr("attr_bool", false),
],
)],
);
let res = send_trace_v1_req(&client, string_target_table_name, string_target_req, false).await;
assert_eq!(StatusCode::OK, res.status());
validate_data(
"otlp_traces_v1_type_coercion_to_string_rows",
&client,
&format!(
"select trace_id, \"span_attributes.attr_bool\", \"span_attributes.attr_float\", \"span_attributes.attr_int\" from {} order by trace_id;",
string_target_table_name
),
r#"[["00000000000000000000000000000021","seed","seed","seed"],["00000000000000000000000000000022","false","2.5","20"]]"#,
)
.await;
validate_data(
"otlp_traces_v1_type_coercion_to_string_schema",
&client,
"select column_name, lower(data_type), semantic_type from information_schema.columns where table_name = 'trace_type_coercion_to_string' and column_name in ('span_attributes.attr_bool', 'span_attributes.attr_float', 'span_attributes.attr_int') order by column_name;",
r#"[["span_attributes.attr_bool","string","FIELD"],["span_attributes.attr_float","string","FIELD"],["span_attributes.attr_int","string","FIELD"]]"#,
)
.await;
let intra_batch_prefer_non_string_table_name = "trace_type_prefer_non_string";
let intra_batch_prefer_non_string_req = make_trace_v1_request(
"type-prefer-non-string",
vec![
make_trace_v1_span(
"00000000000000000000000000000031",
"0000000000000031",
"prefer-non-string-seed",
1_736_480_942_444_940_000,
1_736_480_942_445_040_000,
vec![
make_string_attr("attr_int", "10"),
make_string_attr("attr_float", "1.5"),
make_string_attr("attr_bool", "true"),
],
),
make_trace_v1_span(
"00000000000000000000000000000032",
"0000000000000032",
"prefer-non-string-apply",
1_736_480_942_445_050_000,
1_736_480_942_445_150_000,
vec![
make_int_attr("attr_int", 20),
make_double_attr("attr_float", 2.5),
make_bool_attr("attr_bool", false),
],
),
],
);
let res = send_trace_v1_req(
&client,
intra_batch_prefer_non_string_table_name,
intra_batch_prefer_non_string_req,
false,
)
.await;
assert_eq!(StatusCode::OK, res.status());
validate_data(
"otlp_traces_v1_prefer_non_string_rows",
&client,
&format!(
"select trace_id, \"span_attributes.attr_bool\", \"span_attributes.attr_float\", \"span_attributes.attr_int\" from {} order by trace_id;",
intra_batch_prefer_non_string_table_name
),
r#"[["00000000000000000000000000000031",true,1.5,10],["00000000000000000000000000000032",false,2.5,20]]"#,
)
.await;
validate_data(
"otlp_traces_v1_prefer_non_string_schema",
&client,
"select column_name, lower(data_type), semantic_type from information_schema.columns where table_name = 'trace_type_prefer_non_string' and column_name in ('span_attributes.attr_bool', 'span_attributes.attr_float', 'span_attributes.attr_int') order by column_name;",
r#"[["span_attributes.attr_bool","boolean","FIELD"],["span_attributes.attr_float","double","FIELD"],["span_attributes.attr_int","bigint","FIELD"]]"#,
)
.await;
let existing_float_table_name = "trace_type_existing_float_prefers_schema";
let existing_float_seed_req = make_trace_v1_request(
"type-existing-float",
vec![make_trace_v1_span(
"00000000000000000000000000000041",
"0000000000000041",
"existing-float-seed",
1_736_480_942_445_160_000,
1_736_480_942_445_260_000,
vec![make_double_attr("attr_num", 1.25)],
)],
);
let res = send_trace_v1_req(
&client,
existing_float_table_name,
existing_float_seed_req,
false,
)
.await;
assert_eq!(StatusCode::OK, res.status());
let existing_float_req = make_trace_v1_request(
"type-existing-float",
vec![
make_trace_v1_span(
"00000000000000000000000000000042",
"0000000000000042",
"existing-float-int-first",
1_736_480_942_445_270_000,
1_736_480_942_445_370_000,
vec![make_int_attr("attr_num", 2)],
),
make_trace_v1_span(
"00000000000000000000000000000043",
"0000000000000043",
"existing-float-float-later",
1_736_480_942_445_380_000,
1_736_480_942_445_480_000,
vec![make_double_attr("attr_num", 3.5)],
),
],
);
let res = send_trace_v1_req(
&client,
existing_float_table_name,
existing_float_req,
false,
)
.await;
assert_eq!(StatusCode::OK, res.status());
validate_data(
"otlp_traces_v1_existing_float_prefers_schema_rows",
&client,
&format!(
"select trace_id, \"span_attributes.attr_num\" from {} order by trace_id;",
existing_float_table_name
),
r#"[["00000000000000000000000000000041",1.25],["00000000000000000000000000000042",2.0],["00000000000000000000000000000043",3.5]]"#,
)
.await;
validate_data(
"otlp_traces_v1_existing_float_prefers_schema_type",
&client,
"select column_name, lower(data_type), semantic_type from information_schema.columns where table_name = 'trace_type_existing_float_prefers_schema' and column_name = 'span_attributes.attr_num';",
r#"[["span_attributes.attr_num","double","FIELD"]]"#,
)
.await;
validate_data(
"otlp_traces_v1_type_coercion_rows",
&client,
&format!(
"select trace_id, \"span_attributes.attr_int\", \"span_attributes.attr_bool\" from {} order by trace_id;",
coercion_table_name
),
r#"[["00000000000000000000000000000001",10,true],["00000000000000000000000000000002",20,false]]"#,
)
.await;
validate_data(
"otlp_traces_v1_type_coercion_float_sum",
&client,
&format!(
"select sum(\"span_attributes.attr_float\") from {};",
coercion_table_name
),
r#"[[3.5]]"#,
)
.await;
validate_data(
"otlp_traces_v1_type_coercion_schema",
&client,
"select column_name, lower(data_type), semantic_type from information_schema.columns where table_name = 'trace_type_coercion' and column_name in ('span_attributes.attr_bool', 'span_attributes.attr_float', 'span_attributes.attr_int') order by column_name;",
r#"[["span_attributes.attr_bool","boolean","FIELD"],["span_attributes.attr_float","double","FIELD"],["span_attributes.attr_int","bigint","FIELD"]]"#,
)
.await;
let abort_table_name = "trace_type_abort";
let abort_seed_req = make_trace_v1_request(
"type-abort",
vec![make_trace_v1_span(
"00000000000000000000000000000011",
"0000000000000011",
"abort-seed",
1_736_480_942_444_800_000,
1_736_480_942_444_900_000,
vec![make_int_attr("attr_int", 10)],
)],
);
let res = send_trace_v1_req(&client, abort_table_name, abort_seed_req, false).await;
assert_eq!(StatusCode::OK, res.status());
let abort_req = make_trace_v1_request(
"type-abort",
vec![
make_trace_v1_span(
"00000000000000000000000000000012",
"0000000000000012",
"abort-parseable",
1_736_480_942_445_000_000,
1_736_480_942_445_100_000,
vec![make_string_attr("attr_int", "20")],
),
make_trace_v1_span(
"00000000000000000000000000000013",
"0000000000000013",
"abort-unparsable",
1_736_480_942_445_200_000,
1_736_480_942_445_300_000,
vec![make_string_attr("attr_int", "not_a_number")],
),
],
);
let res = send_trace_v1_req(&client, abort_table_name, abort_req, false).await;
assert_eq!(StatusCode::BAD_REQUEST, res.status());
let body: Value = res.json().await;
assert!(
body["error"].as_str().unwrap().contains(
"failed to coerce trace column 'span_attributes.attr_int' in table 'trace_type_abort'"
),
"unexpected error body: {body}"
);
validate_data(
"otlp_traces_v1_type_abort_rows",
&client,
&format!(
"select trace_id, \"span_attributes.attr_int\" from {} order by trace_id;",
abort_table_name
),
r#"[["00000000000000000000000000000011",10]]"#,
)
.await;
guard.remove_all().await;
}
@@ -7515,6 +7824,115 @@ async fn send_req(
req.header("content-length", len).send().await
}
async fn send_trace_v1_req(
client: &TestClient,
table_name: &str,
req: ExportTraceServiceRequest,
with_gzip: bool,
) -> TestResponse {
send_req(
client,
vec![
(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/x-protobuf"),
),
(
HeaderName::from_static("x-greptime-pipeline-name"),
HeaderValue::from_static(GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME),
),
(
HeaderName::from_static("x-greptime-trace-table-name"),
HeaderValue::from_str(table_name).unwrap(),
),
],
"/v1/otlp/v1/traces",
req.encode_to_vec(),
with_gzip,
)
.await
}
fn make_trace_v1_request(service_name: &str, spans: Vec<Value>) -> ExportTraceServiceRequest {
serde_json::from_value(json!({
"resourceSpans": [{
"resource": {
"attributes": [{
"key": "service.name",
"value": { "stringValue": service_name }
}]
},
"scopeSpans": [{
"scope": {
"name": "trace-v1-type-tests"
},
"spans": spans
}],
"schemaUrl": "https://opentelemetry.io/schemas/1.4.0"
}]
}))
.unwrap()
}
fn make_trace_v1_span(
trace_id: &str,
span_id: &str,
name: &str,
start_time_unix_nano: i64,
end_time_unix_nano: i64,
attributes: Vec<Value>,
) -> Value {
json!({
"traceId": trace_id,
"spanId": span_id,
"name": name,
"kind": 2,
"startTimeUnixNano": start_time_unix_nano.to_string(),
"endTimeUnixNano": end_time_unix_nano.to_string(),
"attributes": attributes,
"status": {
"message": "",
"code": 0
}
})
}
fn make_string_attr(key: &str, value: &str) -> Value {
json!({
"key": key,
"value": {
"stringValue": value
}
})
}
fn make_int_attr(key: &str, value: i64) -> Value {
json!({
"key": key,
"value": {
"intValue": value.to_string()
}
})
}
fn make_double_attr(key: &str, value: f64) -> Value {
json!({
"key": key,
"value": {
"doubleValue": value
}
})
}
fn make_bool_attr(key: &str, value: bool) -> Value {
json!({
"key": key,
"value": {
"boolValue": value
}
})
}
fn get_rows_from_output(output: &str) -> String {
let resp: Value = serde_json::from_str(output).unwrap();
resp.get("output")