refactor: extract otel helper (#7910)

* refactor: extract otel helper

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: move to submodule

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
shuiyisong
2026-04-03 12:13:44 +08:00
committed by GitHub
parent a424ee1c0a
commit a9256f0310
2 changed files with 317 additions and 248 deletions

View File

@@ -36,10 +36,7 @@ use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
use servers::otlp;
use servers::otlp::trace::TraceAuxData;
use servers::otlp::trace::coerce::{
coerce_value_data, is_supported_trace_coercion, resolve_new_trace_column_type,
trace_value_datatype,
};
use servers::otlp::trace::coerce::{coerce_value_data, trace_value_datatype};
use servers::otlp::trace::span::{TraceSpan, TraceSpanGroup};
use servers::query_handler::{
OpenTelemetryProtocolHandler, PipelineHandlerRef, TraceIngestOutcome,
@@ -49,10 +46,16 @@ use snafu::{IntoError, ResultExt};
use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
use crate::instance::Instance;
use crate::instance::otlp::trace_types::{
PendingTraceColumnRewrite, choose_trace_reconcile_decision, enrich_trace_reconcile_error,
is_trace_reconcile_candidate_type, push_observed_trace_type, validate_trace_column_rewrites,
};
use crate::metrics::{
OTLP_LOGS_ROWS, OTLP_METRICS_ROWS, OTLP_TRACES_FAILURE_COUNT, OTLP_TRACES_ROWS,
};
pub mod trace_types;
const TRACE_INGEST_CHUNK_SIZE: usize = 64;
const TRACE_FAILURE_MESSAGE_LIMIT: usize = 4;
@@ -63,33 +66,6 @@ enum ChunkFailureReaction {
Propagate,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum TraceReconcileDecision {
UseExisting(ColumnDataType),
UseRequestLocal(ColumnDataType),
AlterExistingTo(ColumnDataType),
}
impl TraceReconcileDecision {
fn target_type(self) -> ColumnDataType {
match self {
Self::UseExisting(target_type)
| Self::UseRequestLocal(target_type)
| Self::AlterExistingTo(target_type) => target_type,
}
}
fn requires_alter(self) -> bool {
matches!(self, Self::AlterExistingTo(_))
}
}
struct PendingTraceColumnRewrite {
col_idx: usize,
target_type: ColumnDataType,
column_name: String,
}
impl ChunkFailureReaction {
fn as_metric_label(self) -> &'static str {
match self {
@@ -576,52 +552,6 @@ impl Instance {
Some(summary)
}
/// Picks the reconciliation action for one trace column.
///
/// Existing table schema is authoritative unless the only incompatible case is
/// widening an existing Int64 column to Float64 for incoming Int64/Float64 data.
fn choose_trace_reconcile_decision(
observed_types: &[ColumnDataType],
existing_type: Option<ColumnDataType>,
) -> ServerResult<Option<TraceReconcileDecision>> {
let Some(existing_type) = existing_type else {
return resolve_new_trace_column_type(observed_types.iter().copied())
.map(|target_type| target_type.map(TraceReconcileDecision::UseRequestLocal))
.map_err(|_| {
error::InvalidParameterSnafu {
reason: "unsupported trace type mix".to_string(),
}
.build()
});
};
if observed_types.iter().all(|&request_type| {
request_type == existing_type
|| is_supported_trace_coercion(request_type, existing_type)
}) {
return Ok(Some(TraceReconcileDecision::UseExisting(existing_type)));
}
if existing_type == ColumnDataType::Int64
&& observed_types.contains(&ColumnDataType::Float64)
&& observed_types.iter().all(|observed_type| {
matches!(
observed_type,
ColumnDataType::Int64 | ColumnDataType::Float64
)
})
{
return Ok(Some(TraceReconcileDecision::AlterExistingTo(
ColumnDataType::Float64,
)));
}
error::InvalidParameterSnafu {
reason: "unsupported trace type mix".to_string(),
}
.fail()
}
/// Widen existing trace table columns to Float64 before request rewrite.
async fn alter_trace_table_columns_to_float64(
&self,
@@ -763,7 +693,7 @@ impl Instance {
// Decide the final type once per column, then rewrite all affected cells
// together in one row pass below.
let Some(decision) =
Self::choose_trace_reconcile_decision(&observed_types, existing_type).map_err(
choose_trace_reconcile_decision(&observed_types, existing_type).map_err(
|_| {
enrich_trace_reconcile_error(
&req.table_name,
@@ -860,44 +790,6 @@ impl Instance {
}
}
/// Validate all pending trace column rewrites before any schema mutation happens.
fn validate_trace_column_rewrites(
rows: &[api::v1::Row],
pending_rewrites: &[PendingTraceColumnRewrite],
table_name: &str,
) -> ServerResult<()> {
for row in rows {
for pending_rewrite in pending_rewrites {
let Some(value) = row.values.get(pending_rewrite.col_idx) else {
continue;
};
let Some(request_type) = value.value_data.as_ref().and_then(trace_value_datatype)
else {
continue;
};
if request_type == pending_rewrite.target_type {
continue;
}
coerce_value_data(&value.value_data, pending_rewrite.target_type, request_type)
.map_err(|_| {
error::InvalidParameterSnafu {
reason: format!(
"failed to coerce trace column '{}' in table '{}' from {:?} to {:?}",
pending_rewrite.column_name,
table_name,
request_type,
pending_rewrite.target_type
),
}
.build()
})?;
}
}
Ok(())
}
/// Preserve the original alter failure status so chunk retry behavior stays correct.
fn wrap_trace_alter_failure<E>(err: E) -> servers::error::Error
where
@@ -906,64 +798,13 @@ where
error::ExecuteGrpcQuerySnafu.into_error(BoxedError::new(err))
}
fn enrich_trace_reconcile_error(
table_name: &str,
column_name: &str,
observed_types: &[ColumnDataType],
existing_type: Option<ColumnDataType>,
) -> servers::error::Error {
let observed_types = observed_types
.iter()
.map(|datatype| format!("{datatype:?}"))
.collect::<Vec<_>>()
.join(", ");
error::InvalidParameterSnafu {
reason: match existing_type {
Some(existing_type) => format!(
"failed to reconcile trace column '{}' in table '{}' with observed types [{}] against existing {:?}",
column_name, table_name, observed_types, existing_type
),
None => format!(
"failed to reconcile trace column '{}' in table '{}' with observed types [{}]",
column_name, table_name, observed_types
),
},
}
.build()
}
/// Only these trace scalar types participate in reconciliation. Other column kinds
/// such as JSON and binary keep their original write path and schema checks.
fn is_trace_reconcile_candidate_type(datatype: ColumnDataType) -> bool {
matches!(
datatype,
ColumnDataType::String
| ColumnDataType::Boolean
| ColumnDataType::Int64
| ColumnDataType::Float64
)
}
/// Keeps the observed type list small without depending on enum ordering.
fn push_observed_trace_type(observed_types: &mut Vec<ColumnDataType>, datatype: ColumnDataType) {
if !observed_types.contains(&datatype) {
observed_types.push(datatype);
}
}
#[cfg(test)]
mod tests {
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, Row, Value};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use servers::query_handler::TraceIngestOutcome;
use super::{
ChunkFailureReaction, Instance, PendingTraceColumnRewrite, TraceReconcileDecision,
validate_trace_column_rewrites, wrap_trace_alter_failure,
};
use super::{ChunkFailureReaction, Instance, wrap_trace_alter_failure};
use crate::metrics::OTLP_TRACES_FAILURE_COUNT;
#[test]
@@ -1117,86 +958,6 @@ mod tests {
);
}
#[test]
fn test_choose_trace_reconcile_decision_existing_int64_keeps_int64() {
assert_eq!(
Instance::choose_trace_reconcile_decision(
&[ColumnDataType::Int64],
Some(ColumnDataType::Int64)
)
.unwrap(),
Some(TraceReconcileDecision::UseExisting(ColumnDataType::Int64))
);
}
#[test]
fn test_choose_trace_reconcile_decision_existing_int64_widens_to_float64() {
assert_eq!(
Instance::choose_trace_reconcile_decision(
&[ColumnDataType::Int64, ColumnDataType::Float64],
Some(ColumnDataType::Int64)
)
.unwrap(),
Some(TraceReconcileDecision::AlterExistingTo(
ColumnDataType::Float64
))
);
}
#[test]
fn test_choose_trace_reconcile_decision_existing_float64_stays_authoritative() {
assert_eq!(
Instance::choose_trace_reconcile_decision(
&[ColumnDataType::Int64, ColumnDataType::Float64],
Some(ColumnDataType::Float64)
)
.unwrap(),
Some(TraceReconcileDecision::UseExisting(ColumnDataType::Float64))
);
}
#[test]
fn test_choose_trace_reconcile_decision_existing_int64_with_boolean_is_error() {
let err = Instance::choose_trace_reconcile_decision(
&[ColumnDataType::Boolean, ColumnDataType::Int64],
Some(ColumnDataType::Int64),
)
.unwrap_err();
assert_eq!(err.status_code(), StatusCode::InvalidArguments);
}
#[test]
fn test_choose_trace_reconcile_decision_request_local_prefers_float64() {
assert_eq!(
Instance::choose_trace_reconcile_decision(
&[ColumnDataType::Int64, ColumnDataType::Float64],
None
)
.unwrap(),
Some(TraceReconcileDecision::UseRequestLocal(
ColumnDataType::Float64
))
);
}
#[test]
fn test_validate_trace_column_rewrites_rejects_invalid_string_parse() {
let rows = vec![Row {
values: vec![Value {
value_data: Some(ValueData::StringValue("not_a_number".to_string())),
}],
}];
let pending_rewrites = vec![PendingTraceColumnRewrite {
col_idx: 0,
target_type: ColumnDataType::Int64,
column_name: "span_attributes.attr_int".to_string(),
}];
let err = validate_trace_column_rewrites(&rows, &pending_rewrites, "trace_type_atomicity")
.unwrap_err();
assert_eq!(err.status_code(), StatusCode::InvalidArguments);
}
#[test]
fn test_wrap_trace_alter_failure_preserves_status_code() {
let err = wrap_trace_alter_failure(

View File

@@ -0,0 +1,308 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::{ColumnDataType, Row};
use servers::error::{self, Result as ServerResult};
use servers::otlp::trace::coerce::{
coerce_value_data, is_supported_trace_coercion, resolve_new_trace_column_type,
trace_value_datatype,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum TraceReconcileDecision {
UseExisting(ColumnDataType),
UseRequestLocal(ColumnDataType),
AlterExistingTo(ColumnDataType),
}
impl TraceReconcileDecision {
pub(super) fn target_type(self) -> ColumnDataType {
match self {
Self::UseExisting(target_type)
| Self::UseRequestLocal(target_type)
| Self::AlterExistingTo(target_type) => target_type,
}
}
pub(super) fn requires_alter(self) -> bool {
matches!(self, Self::AlterExistingTo(_))
}
}
pub(super) struct PendingTraceColumnRewrite {
pub(super) col_idx: usize,
pub(super) target_type: ColumnDataType,
pub(super) column_name: String,
}
/// Picks the reconciliation action for one trace column.
///
/// Existing table schema is authoritative unless the only incompatible case is
/// widening an existing Int64 column to Float64 for incoming Int64/Float64 data.
pub(super) fn choose_trace_reconcile_decision(
observed_types: &[ColumnDataType],
existing_type: Option<ColumnDataType>,
) -> ServerResult<Option<TraceReconcileDecision>> {
let Some(existing_type) = existing_type else {
return resolve_new_trace_column_type(observed_types.iter().copied())
.map(|target_type| target_type.map(TraceReconcileDecision::UseRequestLocal))
.map_err(|_| {
error::InvalidParameterSnafu {
reason: "unsupported trace type mix".to_string(),
}
.build()
});
};
if observed_types.iter().all(|&request_type| {
request_type == existing_type || is_supported_trace_coercion(request_type, existing_type)
}) {
return Ok(Some(TraceReconcileDecision::UseExisting(existing_type)));
}
if existing_type == ColumnDataType::Int64
&& observed_types.contains(&ColumnDataType::Float64)
&& observed_types.iter().all(|observed_type| {
matches!(
observed_type,
ColumnDataType::Int64 | ColumnDataType::Float64
)
})
{
return Ok(Some(TraceReconcileDecision::AlterExistingTo(
ColumnDataType::Float64,
)));
}
error::InvalidParameterSnafu {
reason: "unsupported trace type mix".to_string(),
}
.fail()
}
/// Validate all pending trace column rewrites before any schema mutation happens.
pub(super) fn validate_trace_column_rewrites(
rows: &[Row],
pending_rewrites: &[PendingTraceColumnRewrite],
table_name: &str,
) -> ServerResult<()> {
for row in rows {
for pending_rewrite in pending_rewrites {
let Some(value) = row.values.get(pending_rewrite.col_idx) else {
continue;
};
let Some(request_type) = value.value_data.as_ref().and_then(trace_value_datatype)
else {
continue;
};
if request_type == pending_rewrite.target_type {
continue;
}
coerce_value_data(&value.value_data, pending_rewrite.target_type, request_type)
.map_err(|_| {
error::InvalidParameterSnafu {
reason: format!(
"failed to coerce trace column '{}' in table '{}' from {:?} to {:?}",
pending_rewrite.column_name,
table_name,
request_type,
pending_rewrite.target_type
),
}
.build()
})?;
}
}
Ok(())
}
pub(super) fn enrich_trace_reconcile_error(
table_name: &str,
column_name: &str,
observed_types: &[ColumnDataType],
existing_type: Option<ColumnDataType>,
) -> servers::error::Error {
let observed_types = observed_types
.iter()
.map(|datatype| format!("{datatype:?}"))
.collect::<Vec<_>>()
.join(", ");
error::InvalidParameterSnafu {
reason: match existing_type {
Some(existing_type) => format!(
"failed to reconcile trace column '{}' in table '{}' with observed types [{}] against existing {:?}",
column_name, table_name, observed_types, existing_type
),
None => format!(
"failed to reconcile trace column '{}' in table '{}' with observed types [{}]",
column_name, table_name, observed_types
),
},
}
.build()
}
/// Only these trace scalar types participate in reconciliation. Other column kinds
/// such as JSON and binary keep their original write path and schema checks.
pub(super) fn is_trace_reconcile_candidate_type(datatype: ColumnDataType) -> bool {
matches!(
datatype,
ColumnDataType::String
| ColumnDataType::Boolean
| ColumnDataType::Int64
| ColumnDataType::Float64
)
}
/// Keeps the observed type list small without depending on enum ordering.
pub(super) fn push_observed_trace_type(
observed_types: &mut Vec<ColumnDataType>,
datatype: ColumnDataType,
) {
if !observed_types.contains(&datatype) {
observed_types.push(datatype);
}
}
#[cfg(test)]
mod tests {
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, Row, Value};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use super::{
PendingTraceColumnRewrite, TraceReconcileDecision, choose_trace_reconcile_decision,
enrich_trace_reconcile_error, is_trace_reconcile_candidate_type, push_observed_trace_type,
validate_trace_column_rewrites,
};
#[test]
fn test_choose_trace_reconcile_decision_existing_int64_keeps_int64() {
assert_eq!(
choose_trace_reconcile_decision(&[ColumnDataType::Int64], Some(ColumnDataType::Int64))
.unwrap(),
Some(TraceReconcileDecision::UseExisting(ColumnDataType::Int64))
);
}
#[test]
fn test_choose_trace_reconcile_decision_existing_int64_widens_to_float64() {
assert_eq!(
choose_trace_reconcile_decision(
&[ColumnDataType::Int64, ColumnDataType::Float64],
Some(ColumnDataType::Int64)
)
.unwrap(),
Some(TraceReconcileDecision::AlterExistingTo(
ColumnDataType::Float64
))
);
}
#[test]
fn test_choose_trace_reconcile_decision_existing_float64_stays_authoritative() {
assert_eq!(
choose_trace_reconcile_decision(
&[ColumnDataType::Int64, ColumnDataType::Float64],
Some(ColumnDataType::Float64)
)
.unwrap(),
Some(TraceReconcileDecision::UseExisting(ColumnDataType::Float64))
);
}
#[test]
fn test_choose_trace_reconcile_decision_existing_int64_with_boolean_is_error() {
let err = choose_trace_reconcile_decision(
&[ColumnDataType::Boolean, ColumnDataType::Int64],
Some(ColumnDataType::Int64),
)
.unwrap_err();
assert_eq!(err.status_code(), StatusCode::InvalidArguments);
}
#[test]
fn test_choose_trace_reconcile_decision_request_local_prefers_float64() {
assert_eq!(
choose_trace_reconcile_decision(
&[ColumnDataType::Int64, ColumnDataType::Float64],
None
)
.unwrap(),
Some(TraceReconcileDecision::UseRequestLocal(
ColumnDataType::Float64
))
);
}
#[test]
fn test_validate_trace_column_rewrites_rejects_invalid_string_parse() {
let rows = vec![Row {
values: vec![Value {
value_data: Some(ValueData::StringValue("not_a_number".to_string())),
}],
}];
let pending_rewrites = vec![PendingTraceColumnRewrite {
col_idx: 0,
target_type: ColumnDataType::Int64,
column_name: "span_attributes.attr_int".to_string(),
}];
let err = validate_trace_column_rewrites(&rows, &pending_rewrites, "trace_type_atomicity")
.unwrap_err();
assert_eq!(err.status_code(), StatusCode::InvalidArguments);
}
#[test]
fn test_enrich_trace_reconcile_error_includes_existing_type() {
let err = enrich_trace_reconcile_error(
"trace_type_atomicity",
"span_attributes.attr_int",
&[ColumnDataType::String, ColumnDataType::Int64],
Some(ColumnDataType::Boolean),
);
assert_eq!(err.status_code(), StatusCode::InvalidArguments);
assert!(err.to_string().contains("span_attributes.attr_int"));
assert!(err.to_string().contains("Boolean"));
}
#[test]
fn test_is_trace_reconcile_candidate_type_filters_non_scalar_types() {
assert!(is_trace_reconcile_candidate_type(ColumnDataType::String));
assert!(is_trace_reconcile_candidate_type(ColumnDataType::Boolean));
assert!(!is_trace_reconcile_candidate_type(ColumnDataType::Binary));
assert!(!is_trace_reconcile_candidate_type(
ColumnDataType::TimestampMillisecond
));
}
#[test]
fn test_push_observed_trace_type_deduplicates_types() {
let mut observed_types = Vec::new();
push_observed_trace_type(&mut observed_types, ColumnDataType::Int64);
push_observed_trace_type(&mut observed_types, ColumnDataType::Int64);
push_observed_trace_type(&mut observed_types, ColumnDataType::Float64);
assert_eq!(
observed_types,
vec![ColumnDataType::Int64, ColumnDataType::Float64]
);
}
}