mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-30 20:00:36 +00:00
feat: implement trace type whitelist (#7930)
* feat: implement trace type whitelist Signed-off-by: shuiyisong <xixing.sys@gmail.com> * chore: use opentelemetry_semantic_conventions for key name Signed-off-by: shuiyisong <xixing.sys@gmail.com> * chore: add ref doc in the comments Signed-off-by: shuiyisong <xixing.sys@gmail.com> * fix: fmt toml Signed-off-by: shuiyisong <xixing.sys@gmail.com> * chore: introduce trace_semconv.rs for holding the mapping Signed-off-by: shuiyisong <xixing.sys@gmail.com> * chore: update key list Signed-off-by: shuiyisong <xixing.sys@gmail.com> * fix: fmt Signed-off-by: shuiyisong <xixing.sys@gmail.com> --------- Signed-off-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
9
Cargo.lock
generated
9
Cargo.lock
generated
@@ -2756,7 +2756,7 @@ dependencies = [
|
||||
"once_cell",
|
||||
"opentelemetry 0.30.0",
|
||||
"opentelemetry-otlp",
|
||||
"opentelemetry-semantic-conventions",
|
||||
"opentelemetry-semantic-conventions 0.30.0",
|
||||
"opentelemetry_sdk 0.30.0",
|
||||
"parking_lot 0.12.4",
|
||||
"prometheus 0.14.0",
|
||||
@@ -5257,6 +5257,7 @@ dependencies = [
|
||||
"meta-srv",
|
||||
"num_cpus",
|
||||
"opentelemetry-proto 0.31.0",
|
||||
"opentelemetry-semantic-conventions 0.31.0",
|
||||
"operator",
|
||||
"otel-arrow-rust",
|
||||
"partition",
|
||||
@@ -8997,6 +8998,12 @@ version = "0.30.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "83d059a296a47436748557a353c5e6c5705b9470ef6c95cfc52c21a8814ddac2"
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry-semantic-conventions"
|
||||
version = "0.31.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846"
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry_sdk"
|
||||
version = "0.30.0"
|
||||
|
||||
@@ -58,6 +58,7 @@ log-query.workspace = true
|
||||
meta-client.workspace = true
|
||||
num_cpus.workspace = true
|
||||
opentelemetry-proto.workspace = true
|
||||
opentelemetry-semantic-conventions = { version = "0.31", features = ["semconv_experimental"] }
|
||||
operator.workspace = true
|
||||
otel-arrow-rust.workspace = true
|
||||
partition.workspace = true
|
||||
|
||||
@@ -46,6 +46,7 @@ use snafu::{IntoError, ResultExt};
|
||||
use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
|
||||
|
||||
use crate::instance::Instance;
|
||||
use crate::instance::otlp::trace_semconv::trace_semconv_fixed_type;
|
||||
use crate::instance::otlp::trace_types::{
|
||||
PendingTraceColumnRewrite, choose_trace_reconcile_decision, enrich_trace_reconcile_error,
|
||||
is_trace_reconcile_candidate_type, push_observed_trace_type, validate_trace_column_rewrites,
|
||||
@@ -54,6 +55,7 @@ use crate::metrics::{
|
||||
OTLP_LOGS_ROWS, OTLP_METRICS_ROWS, OTLP_TRACES_FAILURE_COUNT, OTLP_TRACES_ROWS,
|
||||
};
|
||||
|
||||
pub mod trace_semconv;
|
||||
pub mod trace_types;
|
||||
|
||||
const TRACE_INGEST_CHUNK_SIZE: usize = 64;
|
||||
@@ -678,6 +680,7 @@ impl Instance {
|
||||
.ok()
|
||||
.map(|wrapper| wrapper.datatype())
|
||||
});
|
||||
let fixed_type = trace_semconv_fixed_type(&col_schema.column_name);
|
||||
|
||||
if !observed_types
|
||||
.iter()
|
||||
@@ -686,23 +689,27 @@ impl Instance {
|
||||
&& existing_type
|
||||
.map(|datatype| !is_trace_reconcile_candidate_type(datatype))
|
||||
.unwrap_or(true)
|
||||
&& fixed_type.is_none()
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
// Decide the final type once per column, then rewrite all affected cells
|
||||
// together in one row pass below.
|
||||
let Some(decision) =
|
||||
choose_trace_reconcile_decision(&observed_types, existing_type).map_err(
|
||||
|_| {
|
||||
enrich_trace_reconcile_error(
|
||||
&req.table_name,
|
||||
&col_schema.column_name,
|
||||
&observed_types,
|
||||
existing_type,
|
||||
)
|
||||
},
|
||||
)?
|
||||
let Some(decision) = choose_trace_reconcile_decision(
|
||||
&col_schema.column_name,
|
||||
&observed_types,
|
||||
existing_type,
|
||||
)
|
||||
.map_err(|_| {
|
||||
enrich_trace_reconcile_error(
|
||||
&req.table_name,
|
||||
&col_schema.column_name,
|
||||
&observed_types,
|
||||
existing_type,
|
||||
fixed_type,
|
||||
)
|
||||
})?
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
|
||||
308
src/frontend/src/instance/otlp/trace_semconv.rs
Normal file
308
src/frontend/src/instance/otlp/trace_semconv.rs
Normal file
@@ -0,0 +1,308 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::ColumnDataType;
|
||||
use opentelemetry_semantic_conventions::{attribute, resource};
|
||||
|
||||
/// Returns fixed scalar types for flattened trace semconv columns.
|
||||
///
|
||||
/// The mapping is maintained from the official OpenTelemetry semantic
|
||||
/// conventions docs under `docs/` and `docs/registry/attributes/` in:
|
||||
/// https://github.com/open-telemetry/semantic-conventions/tree/main/docs
|
||||
///
|
||||
/// Only attributes whose docs mark them as `Stable` or `Release Candidate` are
|
||||
/// included here. `Development` and lower-stability attributes must keep the
|
||||
/// dynamic reconciliation path instead.
|
||||
pub(super) fn trace_semconv_fixed_type(column_name: &str) -> Option<ColumnDataType> {
|
||||
if let Some(resource_attribute) = column_name.strip_prefix("resource_attributes.") {
|
||||
return match resource_attribute {
|
||||
// Service resource attributes:
|
||||
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/resource/service.md
|
||||
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/registry/attributes/service.md
|
||||
resource::SERVICE_NAME
|
||||
| resource::SERVICE_INSTANCE_ID
|
||||
| resource::SERVICE_NAMESPACE
|
||||
| resource::SERVICE_VERSION => Some(ColumnDataType::String),
|
||||
|
||||
// Telemetry SDK resource attributes:
|
||||
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/resource/README.md
|
||||
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/registry/attributes/telemetry.md
|
||||
resource::TELEMETRY_SDK_LANGUAGE
|
||||
| resource::TELEMETRY_SDK_NAME
|
||||
| resource::TELEMETRY_SDK_VERSION => Some(ColumnDataType::String),
|
||||
|
||||
// Container resource attributes:
|
||||
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/resource/container.md
|
||||
resource::CONTAINER_ID => Some(ColumnDataType::String),
|
||||
|
||||
// Browser resource attributes:
|
||||
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/resource/browser.md
|
||||
resource::USER_AGENT_ORIGINAL => Some(ColumnDataType::String),
|
||||
|
||||
// Kubernetes resource attributes:
|
||||
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/resource/k8s/README.md
|
||||
resource::K8S_CLUSTER_NAME
|
||||
| resource::K8S_CLUSTER_UID
|
||||
| resource::K8S_CONTAINER_NAME
|
||||
| resource::K8S_CRONJOB_NAME
|
||||
| resource::K8S_CRONJOB_UID
|
||||
| resource::K8S_DAEMONSET_NAME
|
||||
| resource::K8S_DAEMONSET_UID
|
||||
| resource::K8S_DEPLOYMENT_NAME
|
||||
| resource::K8S_DEPLOYMENT_UID
|
||||
| resource::K8S_JOB_NAME
|
||||
| resource::K8S_JOB_UID
|
||||
| resource::K8S_NAMESPACE_NAME
|
||||
| resource::K8S_NODE_NAME
|
||||
| resource::K8S_NODE_UID
|
||||
| resource::K8S_POD_NAME
|
||||
| resource::K8S_POD_UID
|
||||
| resource::K8S_REPLICASET_NAME
|
||||
| resource::K8S_REPLICASET_UID
|
||||
| resource::K8S_STATEFULSET_NAME
|
||||
| resource::K8S_STATEFULSET_UID
|
||||
// The current docs include these `k8s.pod.*` keys, but crate 0.31
|
||||
// does not export constants for them yet.
|
||||
| "k8s.pod.hostname"
|
||||
| "k8s.pod.ip"
|
||||
| "k8s.pod.start_time" => Some(ColumnDataType::String),
|
||||
resource::K8S_CONTAINER_RESTART_COUNT => Some(ColumnDataType::Int64),
|
||||
|
||||
_ => None,
|
||||
};
|
||||
}
|
||||
|
||||
if let Some(span_attribute) = column_name.strip_prefix("span_attributes.") {
|
||||
return match span_attribute {
|
||||
// General client, server, and network attributes:
|
||||
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/general/attributes.md
|
||||
attribute::CLIENT_ADDRESS
|
||||
| attribute::SERVER_ADDRESS
|
||||
| attribute::NETWORK_LOCAL_ADDRESS
|
||||
| attribute::NETWORK_PEER_ADDRESS
|
||||
| attribute::NETWORK_PROTOCOL_NAME
|
||||
| attribute::NETWORK_PROTOCOL_VERSION
|
||||
| attribute::NETWORK_TRANSPORT
|
||||
| attribute::NETWORK_TYPE => Some(ColumnDataType::String),
|
||||
attribute::CLIENT_PORT
|
||||
| attribute::SERVER_PORT
|
||||
| attribute::NETWORK_LOCAL_PORT
|
||||
| attribute::NETWORK_PEER_PORT => Some(ColumnDataType::Int64),
|
||||
|
||||
// HTTP attributes:
|
||||
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/http/http-spans.md
|
||||
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/registry/attributes/http.md
|
||||
attribute::HTTP_REQUEST_METHOD
|
||||
| attribute::HTTP_REQUEST_METHOD_ORIGINAL
|
||||
| attribute::HTTP_ROUTE
|
||||
| attribute::URL_FULL
|
||||
| attribute::URL_PATH
|
||||
| attribute::URL_QUERY
|
||||
| attribute::URL_SCHEME
|
||||
| attribute::USER_AGENT_ORIGINAL => Some(ColumnDataType::String),
|
||||
attribute::HTTP_REQUEST_RESEND_COUNT | attribute::HTTP_RESPONSE_STATUS_CODE => {
|
||||
Some(ColumnDataType::Int64)
|
||||
}
|
||||
|
||||
// RPC attributes:
|
||||
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/rpc/rpc-spans.md
|
||||
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/registry/attributes/rpc.md
|
||||
attribute::RPC_METHOD
|
||||
| attribute::RPC_SYSTEM
|
||||
// The current docs renamed this attribute to `rpc.system.name`,
|
||||
// but crate 0.31 still exports `RPC_SYSTEM = "rpc.system"`.
|
||||
| "rpc.system.name"
|
||||
| "rpc.method_original"
|
||||
| "rpc.response.status_code" => Some(ColumnDataType::String),
|
||||
|
||||
// General database attributes:
|
||||
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/db/database-spans.md
|
||||
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/registry/attributes/db.md
|
||||
attribute::DB_COLLECTION_NAME
|
||||
| attribute::DB_NAMESPACE
|
||||
| attribute::DB_OPERATION_NAME
|
||||
| attribute::DB_QUERY_SUMMARY
|
||||
| attribute::DB_QUERY_TEXT
|
||||
| attribute::DB_RESPONSE_STATUS_CODE
|
||||
| attribute::DB_STORED_PROCEDURE_NAME
|
||||
| attribute::DB_SYSTEM_NAME => Some(ColumnDataType::String),
|
||||
attribute::DB_OPERATION_BATCH_SIZE => Some(ColumnDataType::Int64),
|
||||
|
||||
// Error attributes:
|
||||
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/registry/attributes/error.md
|
||||
attribute::ERROR_TYPE => Some(ColumnDataType::String),
|
||||
|
||||
_ => None,
|
||||
};
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use api::v1::ColumnDataType;
|
||||
use opentelemetry_semantic_conventions::{attribute, resource};
|
||||
|
||||
use super::trace_semconv_fixed_type;
|
||||
|
||||
fn resource_column(key: &str) -> String {
|
||||
format!("resource_attributes.{key}")
|
||||
}
|
||||
|
||||
fn span_column(key: &str) -> String {
|
||||
format!("span_attributes.{key}")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_trace_semconv_fixed_type_includes_stable_service_key() {
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&resource_column(resource::SERVICE_NAME)),
|
||||
Some(ColumnDataType::String)
|
||||
);
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&resource_column(resource::SERVICE_VERSION)),
|
||||
Some(ColumnDataType::String)
|
||||
);
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&resource_column(resource::SERVICE_INSTANCE_ID)),
|
||||
Some(ColumnDataType::String)
|
||||
);
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&resource_column(resource::SERVICE_NAMESPACE)),
|
||||
Some(ColumnDataType::String)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_trace_semconv_fixed_type_includes_http_server_and_error_keys() {
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&span_column(attribute::HTTP_RESPONSE_STATUS_CODE)),
|
||||
Some(ColumnDataType::Int64)
|
||||
);
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&span_column(attribute::SERVER_PORT)),
|
||||
Some(ColumnDataType::Int64)
|
||||
);
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&span_column(attribute::ERROR_TYPE)),
|
||||
Some(ColumnDataType::String)
|
||||
);
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&span_column(attribute::CLIENT_ADDRESS)),
|
||||
Some(ColumnDataType::String)
|
||||
);
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&span_column(attribute::CLIENT_PORT)),
|
||||
Some(ColumnDataType::Int64)
|
||||
);
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&span_column(attribute::URL_FULL)),
|
||||
Some(ColumnDataType::String)
|
||||
);
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&span_column(attribute::URL_PATH)),
|
||||
Some(ColumnDataType::String)
|
||||
);
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&span_column(attribute::URL_QUERY)),
|
||||
Some(ColumnDataType::String)
|
||||
);
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&span_column(attribute::URL_SCHEME)),
|
||||
Some(ColumnDataType::String)
|
||||
);
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&span_column(attribute::USER_AGENT_ORIGINAL)),
|
||||
Some(ColumnDataType::String)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_trace_semconv_fixed_type_includes_rc_rpc_key() {
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&span_column(attribute::RPC_SYSTEM)),
|
||||
Some(ColumnDataType::String)
|
||||
);
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type("span_attributes.rpc.system.name"),
|
||||
Some(ColumnDataType::String)
|
||||
);
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&span_column("rpc.response.status_code")),
|
||||
Some(ColumnDataType::String)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_trace_semconv_fixed_type_includes_db_and_network_keys() {
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&span_column(attribute::DB_SYSTEM_NAME)),
|
||||
Some(ColumnDataType::String)
|
||||
);
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&span_column(attribute::DB_OPERATION_BATCH_SIZE)),
|
||||
Some(ColumnDataType::Int64)
|
||||
);
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&span_column(attribute::NETWORK_PEER_PORT)),
|
||||
Some(ColumnDataType::Int64)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_trace_semconv_fixed_type_includes_resource_semconv_keys() {
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&resource_column(resource::CONTAINER_ID)),
|
||||
Some(ColumnDataType::String)
|
||||
);
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&resource_column(resource::K8S_CONTAINER_RESTART_COUNT)),
|
||||
Some(ColumnDataType::Int64)
|
||||
);
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&resource_column(resource::TELEMETRY_SDK_LANGUAGE)),
|
||||
Some(ColumnDataType::String)
|
||||
);
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&resource_column(resource::TELEMETRY_SDK_NAME)),
|
||||
Some(ColumnDataType::String)
|
||||
);
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&resource_column(resource::TELEMETRY_SDK_VERSION)),
|
||||
Some(ColumnDataType::String)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_trace_semconv_fixed_type_excludes_development_keys() {
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&span_column("messaging.system")),
|
||||
None
|
||||
);
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&span_column("rpc.request.metadata.x-request-id")),
|
||||
None
|
||||
);
|
||||
assert_eq!(
|
||||
trace_semconv_fixed_type(&resource_column("k8s.pod.label.app")),
|
||||
None
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_trace_semconv_fixed_type_unknown_key() {
|
||||
assert_eq!(trace_semconv_fixed_type(&span_column("custom.attr")), None);
|
||||
}
|
||||
}
|
||||
@@ -19,6 +19,8 @@ use servers::otlp::trace::coerce::{
|
||||
trace_value_datatype,
|
||||
};
|
||||
|
||||
use crate::instance::otlp::trace_semconv::trace_semconv_fixed_type;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub(super) enum TraceReconcileDecision {
|
||||
UseExisting(ColumnDataType),
|
||||
@@ -51,9 +53,14 @@ pub(super) struct PendingTraceColumnRewrite {
|
||||
/// Existing table schema is authoritative unless the only incompatible case is
|
||||
/// widening an existing Int64 column to Float64 for incoming Int64/Float64 data.
|
||||
pub(super) fn choose_trace_reconcile_decision(
|
||||
column_name: &str,
|
||||
observed_types: &[ColumnDataType],
|
||||
existing_type: Option<ColumnDataType>,
|
||||
) -> ServerResult<Option<TraceReconcileDecision>> {
|
||||
if let Some(fixed_type) = trace_semconv_fixed_type(column_name) {
|
||||
return choose_fixed_trace_reconcile_decision(fixed_type, observed_types, existing_type);
|
||||
}
|
||||
|
||||
let Some(existing_type) = existing_type else {
|
||||
return resolve_new_trace_column_type(observed_types.iter().copied())
|
||||
.map(|target_type| target_type.map(TraceReconcileDecision::UseRequestLocal))
|
||||
@@ -91,6 +98,37 @@ pub(super) fn choose_trace_reconcile_decision(
|
||||
.fail()
|
||||
}
|
||||
|
||||
fn choose_fixed_trace_reconcile_decision(
|
||||
fixed_type: ColumnDataType,
|
||||
observed_types: &[ColumnDataType],
|
||||
existing_type: Option<ColumnDataType>,
|
||||
) -> ServerResult<Option<TraceReconcileDecision>> {
|
||||
let Some(existing_type) = existing_type else {
|
||||
return Ok(Some(TraceReconcileDecision::UseRequestLocal(fixed_type)));
|
||||
};
|
||||
|
||||
if existing_type == fixed_type {
|
||||
return Ok(Some(TraceReconcileDecision::UseExisting(fixed_type)));
|
||||
}
|
||||
|
||||
if fixed_type == ColumnDataType::Float64
|
||||
&& existing_type == ColumnDataType::Int64
|
||||
&& observed_types.iter().all(|observed_type| {
|
||||
matches!(
|
||||
observed_type,
|
||||
ColumnDataType::Int64 | ColumnDataType::Float64
|
||||
)
|
||||
})
|
||||
{
|
||||
return Ok(Some(TraceReconcileDecision::AlterExistingTo(fixed_type)));
|
||||
}
|
||||
|
||||
error::InvalidParameterSnafu {
|
||||
reason: "unsupported trace type mix".to_string(),
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
|
||||
/// Validate all pending trace column rewrites before any schema mutation happens.
|
||||
pub(super) fn validate_trace_column_rewrites(
|
||||
rows: &[Row],
|
||||
@@ -134,6 +172,7 @@ pub(super) fn enrich_trace_reconcile_error(
|
||||
column_name: &str,
|
||||
observed_types: &[ColumnDataType],
|
||||
existing_type: Option<ColumnDataType>,
|
||||
fixed_type: Option<ColumnDataType>,
|
||||
) -> servers::error::Error {
|
||||
let observed_types = observed_types
|
||||
.iter()
|
||||
@@ -142,12 +181,20 @@ pub(super) fn enrich_trace_reconcile_error(
|
||||
.join(", ");
|
||||
|
||||
error::InvalidParameterSnafu {
|
||||
reason: match existing_type {
|
||||
Some(existing_type) => format!(
|
||||
reason: match (existing_type, fixed_type) {
|
||||
(Some(existing_type), Some(fixed_type)) => format!(
|
||||
"failed to reconcile trace column '{}' in table '{}' with observed types [{}] against existing {:?} and fixed semconv {:?}",
|
||||
column_name, table_name, observed_types, existing_type, fixed_type
|
||||
),
|
||||
(Some(existing_type), None) => format!(
|
||||
"failed to reconcile trace column '{}' in table '{}' with observed types [{}] against existing {:?}",
|
||||
column_name, table_name, observed_types, existing_type
|
||||
),
|
||||
None => format!(
|
||||
(None, Some(fixed_type)) => format!(
|
||||
"failed to reconcile trace column '{}' in table '{}' with observed types [{}] and fixed semconv {:?}",
|
||||
column_name, table_name, observed_types, fixed_type
|
||||
),
|
||||
(None, None) => format!(
|
||||
"failed to reconcile trace column '{}' in table '{}' with observed types [{}]",
|
||||
column_name, table_name, observed_types
|
||||
),
|
||||
@@ -194,8 +241,12 @@ mod tests {
|
||||
#[test]
|
||||
fn test_choose_trace_reconcile_decision_existing_int64_keeps_int64() {
|
||||
assert_eq!(
|
||||
choose_trace_reconcile_decision(&[ColumnDataType::Int64], Some(ColumnDataType::Int64))
|
||||
.unwrap(),
|
||||
choose_trace_reconcile_decision(
|
||||
"span_attributes.attr_int",
|
||||
&[ColumnDataType::Int64],
|
||||
Some(ColumnDataType::Int64)
|
||||
)
|
||||
.unwrap(),
|
||||
Some(TraceReconcileDecision::UseExisting(ColumnDataType::Int64))
|
||||
);
|
||||
}
|
||||
@@ -204,6 +255,7 @@ mod tests {
|
||||
fn test_choose_trace_reconcile_decision_existing_int64_widens_to_float64() {
|
||||
assert_eq!(
|
||||
choose_trace_reconcile_decision(
|
||||
"span_attributes.attr_double",
|
||||
&[ColumnDataType::Int64, ColumnDataType::Float64],
|
||||
Some(ColumnDataType::Int64)
|
||||
)
|
||||
@@ -218,6 +270,7 @@ mod tests {
|
||||
fn test_choose_trace_reconcile_decision_existing_float64_stays_authoritative() {
|
||||
assert_eq!(
|
||||
choose_trace_reconcile_decision(
|
||||
"span_attributes.attr_double",
|
||||
&[ColumnDataType::Int64, ColumnDataType::Float64],
|
||||
Some(ColumnDataType::Float64)
|
||||
)
|
||||
@@ -229,6 +282,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_choose_trace_reconcile_decision_existing_int64_with_boolean_is_error() {
|
||||
let err = choose_trace_reconcile_decision(
|
||||
"span_attributes.attr_numeric",
|
||||
&[ColumnDataType::Boolean, ColumnDataType::Int64],
|
||||
Some(ColumnDataType::Int64),
|
||||
)
|
||||
@@ -240,6 +294,76 @@ mod tests {
|
||||
fn test_choose_trace_reconcile_decision_request_local_prefers_float64() {
|
||||
assert_eq!(
|
||||
choose_trace_reconcile_decision(
|
||||
"span_attributes.attr_numeric",
|
||||
&[ColumnDataType::Int64, ColumnDataType::Float64],
|
||||
None
|
||||
)
|
||||
.unwrap(),
|
||||
Some(TraceReconcileDecision::UseRequestLocal(
|
||||
ColumnDataType::Float64
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_choose_trace_reconcile_decision_whitelisted_new_int64_column_uses_fixed_type() {
|
||||
assert_eq!(
|
||||
choose_trace_reconcile_decision(
|
||||
"span_attributes.http.response.status_code",
|
||||
&[ColumnDataType::String, ColumnDataType::Int64],
|
||||
None
|
||||
)
|
||||
.unwrap(),
|
||||
Some(TraceReconcileDecision::UseRequestLocal(
|
||||
ColumnDataType::Int64
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_choose_trace_reconcile_decision_new_boolean_column_uses_dynamic_resolution() {
|
||||
assert_eq!(
|
||||
choose_trace_reconcile_decision(
|
||||
"span_attributes.messaging.destination.temporary",
|
||||
&[ColumnDataType::String, ColumnDataType::Boolean],
|
||||
None
|
||||
)
|
||||
.unwrap(),
|
||||
Some(TraceReconcileDecision::UseRequestLocal(
|
||||
ColumnDataType::Boolean
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_choose_trace_reconcile_decision_whitelisted_existing_matching_type_uses_fixed_type() {
|
||||
assert_eq!(
|
||||
choose_trace_reconcile_decision(
|
||||
"resource_attributes.service.name",
|
||||
&[ColumnDataType::String],
|
||||
Some(ColumnDataType::String)
|
||||
)
|
||||
.unwrap(),
|
||||
Some(TraceReconcileDecision::UseExisting(ColumnDataType::String))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_choose_trace_reconcile_decision_whitelisted_existing_conflicting_type_is_error() {
|
||||
let err = choose_trace_reconcile_decision(
|
||||
"span_attributes.server.port",
|
||||
&[ColumnDataType::Int64],
|
||||
Some(ColumnDataType::String),
|
||||
)
|
||||
.unwrap_err();
|
||||
assert_eq!(err.status_code(), StatusCode::InvalidArguments);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_choose_trace_reconcile_decision_non_whitelisted_retains_dynamic_behavior() {
|
||||
assert_eq!(
|
||||
choose_trace_reconcile_decision(
|
||||
"span_attributes.attr_numeric",
|
||||
&[ColumnDataType::Int64, ColumnDataType::Float64],
|
||||
None
|
||||
)
|
||||
@@ -268,6 +392,40 @@ mod tests {
|
||||
assert_eq!(err.status_code(), StatusCode::InvalidArguments);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_trace_column_rewrites_whitelisted_values_validate_against_fixed_type() {
|
||||
let rows = vec![Row {
|
||||
values: vec![Value {
|
||||
value_data: Some(ValueData::StringValue("503".to_string())),
|
||||
}],
|
||||
}];
|
||||
let pending_rewrites = vec![PendingTraceColumnRewrite {
|
||||
col_idx: 0,
|
||||
target_type: ColumnDataType::Int64,
|
||||
column_name: "span_attributes.http.response.status_code".to_string(),
|
||||
}];
|
||||
|
||||
validate_trace_column_rewrites(&rows, &pending_rewrites, "trace_type_atomicity").unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_trace_column_rewrites_whitelisted_boolean_rejects_invalid_string_parse() {
|
||||
let rows = vec![Row {
|
||||
values: vec![Value {
|
||||
value_data: Some(ValueData::StringValue("not_a_bool".to_string())),
|
||||
}],
|
||||
}];
|
||||
let pending_rewrites = vec![PendingTraceColumnRewrite {
|
||||
col_idx: 0,
|
||||
target_type: ColumnDataType::Boolean,
|
||||
column_name: "span_attributes.messaging.destination.temporary".to_string(),
|
||||
}];
|
||||
|
||||
let err = validate_trace_column_rewrites(&rows, &pending_rewrites, "trace_type_atomicity")
|
||||
.unwrap_err();
|
||||
assert_eq!(err.status_code(), StatusCode::InvalidArguments);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_enrich_trace_reconcile_error_includes_existing_type() {
|
||||
let err = enrich_trace_reconcile_error(
|
||||
@@ -275,6 +433,7 @@ mod tests {
|
||||
"span_attributes.attr_int",
|
||||
&[ColumnDataType::String, ColumnDataType::Int64],
|
||||
Some(ColumnDataType::Boolean),
|
||||
None,
|
||||
);
|
||||
|
||||
assert_eq!(err.status_code(), StatusCode::InvalidArguments);
|
||||
@@ -282,6 +441,21 @@ mod tests {
|
||||
assert!(err.to_string().contains("Boolean"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_enrich_trace_reconcile_error_includes_fixed_semconv_type() {
|
||||
let err = enrich_trace_reconcile_error(
|
||||
"trace_type_atomicity",
|
||||
"span_attributes.server.port",
|
||||
&[ColumnDataType::String, ColumnDataType::Int64],
|
||||
Some(ColumnDataType::String),
|
||||
Some(ColumnDataType::Int64),
|
||||
);
|
||||
|
||||
assert_eq!(err.status_code(), StatusCode::InvalidArguments);
|
||||
assert!(err.to_string().contains("span_attributes.server.port"));
|
||||
assert!(err.to_string().contains("fixed semconv Int64"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_is_trace_reconcile_candidate_type_filters_non_scalar_types() {
|
||||
assert!(is_trace_reconcile_candidate_type(ColumnDataType::String));
|
||||
|
||||
Reference in New Issue
Block a user