chore: add schema urls to otlp logs (#4876)

* chore: add schema urls to otlp logs table

* chore: update meter-macros version to remove anymap warning

* chore: change span id and trace id to field
This commit is contained in:
shuiyisong
2024-10-25 11:45:24 +08:00
committed by GitHub
parent 4e9c251041
commit 32a0023010
6 changed files with 219 additions and 193 deletions

View File

@@ -113,6 +113,7 @@ impl OpenTelemetryProtocolHandler for Instance {
.plugins
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_execute(ctx.clone())?;
let (requests, rows) = otlp::logs::to_grpc_insert_requests(request, pipeline, table_name)?;
self.handle_log_inserts(requests, ctx)
.await

View File

@@ -41,7 +41,7 @@ use snafu::prelude::*;
use super::header::constants::GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME;
use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF};
use crate::error::{self, Result};
use crate::error::{self, PipelineSnafu, Result};
use crate::http::header::constants::{
GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME,
GREPTIME_LOG_TABLE_NAME_HEADER_NAME,
@@ -227,15 +227,9 @@ pub async fn logs(
.start_timer();
let request = ExportLogsServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;
let pipeline_way;
if let Some(pipeline_name) = &pipeline_info.pipeline_name {
let pipeline_way = if let Some(pipeline_name) = &pipeline_info.pipeline_name {
let pipeline_version =
to_pipeline_version(pipeline_info.pipeline_version).map_err(|_| {
error::InvalidParameterSnafu {
reason: GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME,
}
.build()
})?;
to_pipeline_version(pipeline_info.pipeline_version).context(PipelineSnafu)?;
let pipeline = match handler
.get_pipeline(pipeline_name, pipeline_version, query_ctx.clone())
.await
@@ -245,10 +239,10 @@ pub async fn logs(
return Err(e);
}
};
pipeline_way = PipelineWay::Custom(pipeline);
PipelineWay::Custom(pipeline)
} else {
pipeline_way = PipelineWay::OtlpLog(Box::new(select_info));
}
PipelineWay::OtlpLog(Box::new(select_info))
};
handler
.logs(request, pipeline_way, table_info.table_name, query_ctx)

View File

@@ -185,47 +185,6 @@ fn log_to_pipeline_value(
fn build_otlp_logs_identity_schema() -> Vec<ColumnSchema> {
[
(
"scope_name",
ColumnDataType::String,
SemanticType::Tag,
None,
None,
),
(
"scope_version",
ColumnDataType::String,
SemanticType::Field,
None,
None,
),
(
"scope_attributes",
ColumnDataType::Binary,
SemanticType::Field,
Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
}),
None,
),
(
"resource_attributes",
ColumnDataType::Binary,
SemanticType::Field,
Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
}),
None,
),
(
"log_attributes",
ColumnDataType::Binary,
SemanticType::Field,
Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
}),
None,
),
(
"timestamp",
ColumnDataType::TimestampNanosecond,
@@ -233,30 +192,16 @@ fn build_otlp_logs_identity_schema() -> Vec<ColumnSchema> {
None,
None,
),
(
"observed_timestamp",
ColumnDataType::TimestampNanosecond,
SemanticType::Field,
None,
None,
),
(
"trace_id",
ColumnDataType::String,
SemanticType::Tag,
SemanticType::Field,
None,
None,
),
(
"span_id",
ColumnDataType::String,
SemanticType::Tag,
None,
None,
),
(
"trace_flags",
ColumnDataType::Uint32,
SemanticType::Field,
None,
None,
@@ -287,6 +232,68 @@ fn build_otlp_logs_identity_schema() -> Vec<ColumnSchema> {
)]),
}),
),
(
"log_attributes",
ColumnDataType::Binary,
SemanticType::Field,
Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
}),
None,
),
(
"trace_flags",
ColumnDataType::Uint32,
SemanticType::Field,
None,
None,
),
(
"scope_name",
ColumnDataType::String,
SemanticType::Tag,
None,
None,
),
(
"scope_version",
ColumnDataType::String,
SemanticType::Field,
None,
None,
),
(
"scope_attributes",
ColumnDataType::Binary,
SemanticType::Field,
Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
}),
None,
),
(
"scope_schema_url",
ColumnDataType::String,
SemanticType::Field,
None,
None,
),
(
"resource_attributes",
ColumnDataType::Binary,
SemanticType::Field,
Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
}),
None,
),
(
"resource_schema_url",
ColumnDataType::String,
SemanticType::Field,
None,
None,
),
]
.into_iter()
.map(
@@ -301,39 +308,17 @@ fn build_otlp_logs_identity_schema() -> Vec<ColumnSchema> {
.collect::<Vec<ColumnSchema>>()
}
fn build_otlp_build_in_row(
log: LogRecord,
resource_attr: JsonbValue<'static>,
scope_name: Option<String>,
scope_version: Option<String>,
scope_attrs: JsonbValue<'static>,
) -> Row {
fn build_otlp_build_in_row(log: LogRecord, parse_ctx: &mut ParseContext) -> Row {
let log_attr = key_value_to_jsonb(log.attributes);
let ts = if log.time_unix_nano != 0 {
log.time_unix_nano
} else {
log.observed_time_unix_nano
};
let row = vec![
GreptimeValue {
value_data: scope_name.map(ValueData::StringValue),
},
GreptimeValue {
value_data: scope_version.map(ValueData::StringValue),
},
GreptimeValue {
value_data: Some(ValueData::BinaryValue(scope_attrs.to_vec())),
},
GreptimeValue {
value_data: Some(ValueData::BinaryValue(resource_attr.to_vec())),
},
GreptimeValue {
value_data: Some(ValueData::BinaryValue(log_attr.to_vec())),
},
GreptimeValue {
value_data: Some(ValueData::TimestampNanosecondValue(
log.time_unix_nano as i64,
)),
},
GreptimeValue {
value_data: Some(ValueData::TimestampNanosecondValue(
log.observed_time_unix_nano as i64,
)),
value_data: Some(ValueData::TimestampNanosecondValue(ts as i64)),
},
GreptimeValue {
value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.trace_id))),
@@ -341,9 +326,6 @@ fn build_otlp_build_in_row(
GreptimeValue {
value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.span_id))),
},
GreptimeValue {
value_data: Some(ValueData::U32Value(log.flags)),
},
GreptimeValue {
value_data: Some(ValueData::StringValue(log.severity_text)),
},
@@ -356,6 +338,30 @@ fn build_otlp_build_in_row(
.as_ref()
.map(|x| ValueData::StringValue(log_body_to_string(x))),
},
GreptimeValue {
value_data: Some(ValueData::BinaryValue(log_attr.to_vec())),
},
GreptimeValue {
value_data: Some(ValueData::U32Value(log.flags)),
},
GreptimeValue {
value_data: parse_ctx.scope_name.clone().map(ValueData::StringValue),
},
GreptimeValue {
value_data: parse_ctx.scope_version.clone().map(ValueData::StringValue),
},
GreptimeValue {
value_data: Some(ValueData::BinaryValue(parse_ctx.scope_attrs.to_vec())),
},
GreptimeValue {
value_data: Some(ValueData::StringValue(parse_ctx.scope_url.clone())),
},
GreptimeValue {
value_data: Some(ValueData::BinaryValue(parse_ctx.resource_attr.to_vec())),
},
GreptimeValue {
value_data: Some(ValueData::StringValue(parse_ctx.resource_url.clone())),
},
];
Row { values: row }
}
@@ -363,7 +369,7 @@ fn build_otlp_build_in_row(
fn extract_field_from_attr_and_combine_schema(
schema_info: &mut SchemaInfo,
log_select: &SelectInfo,
jsonb: &jsonb::Value<'static>,
jsonb: &jsonb::Value,
) -> Result<Vec<GreptimeValue>> {
if log_select.keys.is_empty() {
return Ok(Vec::new());
@@ -498,13 +504,12 @@ fn parse_export_logs_service_request_to_rows(
let mut extra_resource_schema = SchemaInfo::default();
let mut extra_scope_schema = SchemaInfo::default();
let mut extra_log_schema = SchemaInfo::default();
let parse_infos = parse_resource(
&select_info,
let mut parse_ctx = ParseContext::new(
&mut extra_resource_schema,
&mut extra_scope_schema,
&mut extra_log_schema,
request.resource_logs,
)?;
);
let parse_infos = parse_resource(&select_info, &mut parse_ctx, request.resource_logs)?;
// order of schema is important
// resource < scope < log
@@ -557,28 +562,27 @@ fn parse_export_logs_service_request_to_rows(
fn parse_resource(
select_info: &SelectInfo,
extra_resource_schema: &mut SchemaInfo,
extra_scope_schema: &mut SchemaInfo,
extra_log_schema: &mut SchemaInfo,
parse_ctx: &mut ParseContext,
resource_logs_vec: Vec<ResourceLogs>,
) -> Result<Vec<ParseInfo>> {
let mut results = Vec::new();
for r in resource_logs_vec {
let resource_attr = r
parse_ctx.resource_attr = r
.resource
.map(|resource| key_value_to_jsonb(resource.attributes))
.unwrap_or(JsonbValue::Null);
parse_ctx.resource_url = r.schema_url;
let resource_extracted_values = extract_field_from_attr_and_combine_schema(
extra_resource_schema,
parse_ctx.extra_resource_schema,
select_info,
&resource_attr,
&parse_ctx.resource_attr,
)?;
let rows = parse_scope(
extra_scope_schema,
extra_log_schema,
select_info,
r.scope_logs,
resource_attr,
parse_ctx,
resource_extracted_values,
)?;
results.extend(rows);
@@ -586,38 +590,65 @@ fn parse_resource(
Ok(results)
}
struct ScopeInfo {
struct ParseContext<'a> {
// selector schema
extra_resource_schema: &'a mut SchemaInfo,
extra_scope_schema: &'a mut SchemaInfo,
extra_log_schema: &'a mut SchemaInfo,
// passdown values
resource_url: String,
resource_attr: JsonbValue<'a>,
scope_name: Option<String>,
scope_version: Option<String>,
scope_attrs: JsonbValue<'static>,
scope_url: String,
scope_attrs: JsonbValue<'a>,
}
impl<'a> ParseContext<'a> {
pub fn new(
extra_resource_schema: &'a mut SchemaInfo,
extra_scope_schema: &'a mut SchemaInfo,
extra_log_schema: &'a mut SchemaInfo,
) -> ParseContext<'a> {
ParseContext {
extra_resource_schema,
extra_scope_schema,
extra_log_schema,
resource_url: String::new(),
resource_attr: JsonbValue::Null,
scope_name: None,
scope_version: None,
scope_url: String::new(),
scope_attrs: JsonbValue::Null,
}
}
}
fn parse_scope(
extra_scope_schema: &mut SchemaInfo,
extra_log_schema: &mut SchemaInfo,
select_info: &SelectInfo,
scopes_log_vec: Vec<ScopeLogs>,
resource_attr: JsonbValue<'static>,
parse_ctx: &mut ParseContext,
resource_extracted_values: Vec<GreptimeValue>,
) -> Result<Vec<ParseInfo>> {
let mut results = Vec::new();
for scope_logs in scopes_log_vec {
let (scope_attrs, scope_version, scope_name) = scope_to_jsonb(scope_logs.scope);
parse_ctx.scope_name = scope_name;
parse_ctx.scope_version = scope_version;
parse_ctx.scope_attrs = scope_attrs;
parse_ctx.scope_url = scope_logs.schema_url;
let scope_extracted_values = extract_field_from_attr_and_combine_schema(
extra_scope_schema,
parse_ctx.extra_scope_schema,
select_info,
&scope_attrs,
&parse_ctx.scope_attrs,
)?;
let rows = parse_log(
extra_log_schema,
select_info,
scope_logs.log_records,
&resource_attr,
ScopeInfo {
scope_name,
scope_version,
scope_attrs,
},
parse_ctx,
&resource_extracted_values,
&scope_extracted_values,
)?;
@@ -627,15 +658,9 @@ fn parse_scope(
}
fn parse_log(
extra_log_schema: &mut SchemaInfo,
select_info: &SelectInfo,
log_records: Vec<LogRecord>,
resource_attr: &JsonbValue<'static>,
ScopeInfo {
scope_name,
scope_version,
scope_attrs,
}: ScopeInfo,
parse_ctx: &mut ParseContext,
resource_extracted_values: &[GreptimeValue],
scope_extracted_values: &[GreptimeValue],
) -> Result<Vec<ParseInfo>> {
@@ -644,16 +669,13 @@ fn parse_log(
for log in log_records {
let log_attr = key_value_to_jsonb(log.attributes.clone());
let row = build_otlp_build_in_row(
log,
resource_attr.clone(),
scope_name.clone(),
scope_version.clone(),
scope_attrs.clone(),
);
let row = build_otlp_build_in_row(log, parse_ctx);
let log_extracted_values =
extract_field_from_attr_and_combine_schema(extra_log_schema, select_info, &log_attr)?;
let log_extracted_values = extract_field_from_attr_and_combine_schema(
parse_ctx.extra_log_schema,
select_info,
&log_attr,
)?;
let parse_info = ParseInfo {
values: row,