mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
proxy: replace serde_json with our new json ser crate in the logging impl (#12602)
This doesn't solve any particular problem, but it does simplify some of the code that was forced to round-trip through verbose Serialize impls.
This commit is contained in:
@@ -6,7 +6,6 @@ use std::{env, io};
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use opentelemetry::trace::TraceContextExt;
|
||||
use serde::ser::{SerializeMap, Serializer};
|
||||
use tracing::subscriber::Interest;
|
||||
use tracing::{Event, Metadata, Span, Subscriber, callsite, span};
|
||||
use tracing_opentelemetry::OpenTelemetrySpanExt;
|
||||
@@ -16,7 +15,9 @@ use tracing_subscriber::fmt::time::SystemTime;
|
||||
use tracing_subscriber::fmt::{FormatEvent, FormatFields};
|
||||
use tracing_subscriber::layer::{Context, Layer};
|
||||
use tracing_subscriber::prelude::*;
|
||||
use tracing_subscriber::registry::{LookupSpan, SpanRef};
|
||||
use tracing_subscriber::registry::LookupSpan;
|
||||
|
||||
use crate::metrics::Metrics;
|
||||
|
||||
/// Initialize logging and OpenTelemetry tracing and exporter.
|
||||
///
|
||||
@@ -249,7 +250,7 @@ where
|
||||
// early, before OTel machinery, and add as event extension.
|
||||
let now = self.clock.now();
|
||||
|
||||
let res: io::Result<()> = EVENT_FORMATTER.with(|f| {
|
||||
EVENT_FORMATTER.with(|f| {
|
||||
let mut borrow = f.try_borrow_mut();
|
||||
let formatter = match borrow.as_deref_mut() {
|
||||
Ok(formatter) => formatter,
|
||||
@@ -259,31 +260,19 @@ where
|
||||
Err(_) => &mut EventFormatter::new(),
|
||||
};
|
||||
|
||||
formatter.reset();
|
||||
formatter.format(
|
||||
now,
|
||||
event,
|
||||
&ctx,
|
||||
&self.skipped_field_indices,
|
||||
self.extract_fields,
|
||||
)?;
|
||||
self.writer.make_writer().write_all(formatter.buffer())
|
||||
});
|
||||
);
|
||||
|
||||
// In case logging fails we generate a simpler JSON object.
|
||||
if let Err(err) = res
|
||||
&& let Ok(mut line) = serde_json::to_vec(&serde_json::json!( {
|
||||
"timestamp": now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
|
||||
"level": "ERROR",
|
||||
"message": format_args!("cannot log event: {err:?}"),
|
||||
"fields": {
|
||||
"event": format_args!("{event:?}"),
|
||||
},
|
||||
}))
|
||||
{
|
||||
line.push(b'\n');
|
||||
self.writer.make_writer().write_all(&line).ok();
|
||||
}
|
||||
let mut writer = self.writer.make_writer();
|
||||
if writer.write_all(formatter.buffer()).is_err() {
|
||||
Metrics::get().proxy.logging_errors_count.inc();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Registers a SpanFields instance as span extension.
|
||||
@@ -382,9 +371,24 @@ impl CallsiteSpanInfo {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct RawValue(Box<[u8]>);
|
||||
|
||||
impl RawValue {
|
||||
fn new(v: impl json::ValueEncoder) -> Self {
|
||||
Self(json::value_to_vec!(|val| v.encode(val)).into_boxed_slice())
|
||||
}
|
||||
}
|
||||
|
||||
impl json::ValueEncoder for &RawValue {
|
||||
fn encode(self, v: json::ValueSer<'_>) {
|
||||
v.write_raw_json(&self.0);
|
||||
}
|
||||
}
|
||||
|
||||
/// Stores span field values recorded during the spans lifetime.
|
||||
struct SpanFields {
|
||||
values: [serde_json::Value; MAX_TRACING_FIELDS],
|
||||
values: [Option<RawValue>; MAX_TRACING_FIELDS],
|
||||
|
||||
/// cached span info so we can avoid extra hashmap lookups in the hot path.
|
||||
span_info: CallsiteSpanInfo,
|
||||
@@ -394,7 +398,7 @@ impl SpanFields {
|
||||
fn new(span_info: CallsiteSpanInfo) -> Self {
|
||||
Self {
|
||||
span_info,
|
||||
values: [const { serde_json::Value::Null }; MAX_TRACING_FIELDS],
|
||||
values: [const { None }; MAX_TRACING_FIELDS],
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -402,55 +406,55 @@ impl SpanFields {
|
||||
impl tracing::field::Visit for SpanFields {
|
||||
#[inline]
|
||||
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
|
||||
self.values[field.index()] = serde_json::Value::from(value);
|
||||
self.values[field.index()] = Some(RawValue::new(value));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
|
||||
self.values[field.index()] = serde_json::Value::from(value);
|
||||
self.values[field.index()] = Some(RawValue::new(value));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
|
||||
self.values[field.index()] = serde_json::Value::from(value);
|
||||
self.values[field.index()] = Some(RawValue::new(value));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
|
||||
if let Ok(value) = i64::try_from(value) {
|
||||
self.values[field.index()] = serde_json::Value::from(value);
|
||||
self.values[field.index()] = Some(RawValue::new(value));
|
||||
} else {
|
||||
self.values[field.index()] = serde_json::Value::from(format!("{value}"));
|
||||
self.values[field.index()] = Some(RawValue::new(format_args!("{value}")));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
|
||||
if let Ok(value) = u64::try_from(value) {
|
||||
self.values[field.index()] = serde_json::Value::from(value);
|
||||
self.values[field.index()] = Some(RawValue::new(value));
|
||||
} else {
|
||||
self.values[field.index()] = serde_json::Value::from(format!("{value}"));
|
||||
self.values[field.index()] = Some(RawValue::new(format_args!("{value}")));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
|
||||
self.values[field.index()] = serde_json::Value::from(value);
|
||||
self.values[field.index()] = Some(RawValue::new(value));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
|
||||
self.values[field.index()] = serde_json::Value::from(value);
|
||||
self.values[field.index()] = Some(RawValue::new(value));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
|
||||
self.values[field.index()] = serde_json::Value::from(value);
|
||||
self.values[field.index()] = Some(RawValue::new(value));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
|
||||
self.values[field.index()] = serde_json::Value::from(format!("{value:?}"));
|
||||
self.values[field.index()] = Some(RawValue::new(format_args!("{value:?}")));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@@ -459,7 +463,7 @@ impl tracing::field::Visit for SpanFields {
|
||||
field: &tracing::field::Field,
|
||||
value: &(dyn std::error::Error + 'static),
|
||||
) {
|
||||
self.values[field.index()] = serde_json::Value::from(format!("{value}"));
|
||||
self.values[field.index()] = Some(RawValue::new(format_args!("{value}")));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -508,11 +512,6 @@ impl EventFormatter {
|
||||
&self.logline_buffer
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn reset(&mut self) {
|
||||
self.logline_buffer.clear();
|
||||
}
|
||||
|
||||
fn format<S>(
|
||||
&mut self,
|
||||
now: DateTime<Utc>,
|
||||
@@ -520,8 +519,7 @@ impl EventFormatter {
|
||||
ctx: &Context<'_, S>,
|
||||
skipped_field_indices: &CallsiteMap<SkippedFieldIndices>,
|
||||
extract_fields: &'static [&'static str],
|
||||
) -> io::Result<()>
|
||||
where
|
||||
) where
|
||||
S: Subscriber + for<'a> LookupSpan<'a>,
|
||||
{
|
||||
let timestamp = now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
|
||||
@@ -536,78 +534,99 @@ impl EventFormatter {
|
||||
.copied()
|
||||
.unwrap_or_default();
|
||||
|
||||
let mut serialize = || {
|
||||
let mut serializer = serde_json::Serializer::new(&mut self.logline_buffer);
|
||||
|
||||
let mut serializer = serializer.serialize_map(None)?;
|
||||
|
||||
self.logline_buffer.clear();
|
||||
let serializer = json::ValueSer::new(&mut self.logline_buffer);
|
||||
json::value_as_object!(|serializer| {
|
||||
// Timestamp comes first, so raw lines can be sorted by timestamp.
|
||||
serializer.serialize_entry("timestamp", ×tamp)?;
|
||||
serializer.entry("timestamp", &*timestamp);
|
||||
|
||||
// Level next.
|
||||
serializer.serialize_entry("level", &meta.level().as_str())?;
|
||||
serializer.entry("level", meta.level().as_str());
|
||||
|
||||
// Message next.
|
||||
serializer.serialize_key("message")?;
|
||||
let mut message_extractor =
|
||||
MessageFieldExtractor::new(serializer, skipped_field_indices);
|
||||
MessageFieldExtractor::new(serializer.key("message"), skipped_field_indices);
|
||||
event.record(&mut message_extractor);
|
||||
let mut serializer = message_extractor.into_serializer()?;
|
||||
message_extractor.finish();
|
||||
|
||||
// Direct message fields.
|
||||
let mut fields_present = FieldsPresent(false, skipped_field_indices);
|
||||
event.record(&mut fields_present);
|
||||
if fields_present.0 {
|
||||
serializer.serialize_entry(
|
||||
"fields",
|
||||
&SerializableEventFields(event, skipped_field_indices),
|
||||
)?;
|
||||
{
|
||||
let mut message_skipper = MessageFieldSkipper::new(
|
||||
serializer.key("fields").object(),
|
||||
skipped_field_indices,
|
||||
);
|
||||
event.record(&mut message_skipper);
|
||||
|
||||
// rollback if no fields are present.
|
||||
if message_skipper.present {
|
||||
message_skipper.serializer.finish();
|
||||
}
|
||||
}
|
||||
|
||||
let spans = SerializableSpans {
|
||||
// collect all spans from parent to root.
|
||||
spans: ctx
|
||||
let mut extracted = ExtractedSpanFields::new(extract_fields);
|
||||
|
||||
let spans = serializer.key("spans");
|
||||
json::value_as_object!(|spans| {
|
||||
let parent_spans = ctx
|
||||
.event_span(event)
|
||||
.map_or(vec![], |parent| parent.scope().collect()),
|
||||
extracted: ExtractedSpanFields::new(extract_fields),
|
||||
};
|
||||
serializer.serialize_entry("spans", &spans)?;
|
||||
.map_or(vec![], |parent| parent.scope().collect());
|
||||
|
||||
for span in parent_spans.iter().rev() {
|
||||
let ext = span.extensions();
|
||||
|
||||
// all spans should have this extension.
|
||||
let Some(fields) = ext.get() else { continue };
|
||||
|
||||
extracted.layer_span(fields);
|
||||
|
||||
let SpanFields { values, span_info } = fields;
|
||||
|
||||
let span_fields = spans.key(&*span_info.normalized_name);
|
||||
json::value_as_object!(|span_fields| {
|
||||
for (field, value) in std::iter::zip(span.metadata().fields(), values) {
|
||||
if let Some(value) = value {
|
||||
span_fields.entry(field.name(), value);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// TODO: thread-local cache?
|
||||
let pid = std::process::id();
|
||||
// Skip adding pid 1 to reduce noise for services running in containers.
|
||||
if pid != 1 {
|
||||
serializer.serialize_entry("process_id", &pid)?;
|
||||
serializer.entry("process_id", pid);
|
||||
}
|
||||
|
||||
THREAD_ID.with(|tid| serializer.serialize_entry("thread_id", tid))?;
|
||||
THREAD_ID.with(|tid| serializer.entry("thread_id", tid));
|
||||
|
||||
// TODO: tls cache? name could change
|
||||
if let Some(thread_name) = std::thread::current().name()
|
||||
&& !thread_name.is_empty()
|
||||
&& thread_name != "tokio-runtime-worker"
|
||||
{
|
||||
serializer.serialize_entry("thread_name", thread_name)?;
|
||||
serializer.entry("thread_name", thread_name);
|
||||
}
|
||||
|
||||
if let Some(task_id) = tokio::task::try_id() {
|
||||
serializer.serialize_entry("task_id", &format_args!("{task_id}"))?;
|
||||
serializer.entry("task_id", format_args!("{task_id}"));
|
||||
}
|
||||
|
||||
serializer.serialize_entry("target", meta.target())?;
|
||||
serializer.entry("target", meta.target());
|
||||
|
||||
// Skip adding module if it's the same as target.
|
||||
if let Some(module) = meta.module_path()
|
||||
&& module != meta.target()
|
||||
{
|
||||
serializer.serialize_entry("module", module)?;
|
||||
serializer.entry("module", module);
|
||||
}
|
||||
|
||||
if let Some(file) = meta.file() {
|
||||
if let Some(line) = meta.line() {
|
||||
serializer.serialize_entry("src", &format_args!("{file}:{line}"))?;
|
||||
serializer.entry("src", format_args!("{file}:{line}"));
|
||||
} else {
|
||||
serializer.serialize_entry("src", file)?;
|
||||
serializer.entry("src", file);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -616,124 +635,104 @@ impl EventFormatter {
|
||||
let otel_spanref = otel_context.span();
|
||||
let span_context = otel_spanref.span_context();
|
||||
if span_context.is_valid() {
|
||||
serializer.serialize_entry(
|
||||
"trace_id",
|
||||
&format_args!("{}", span_context.trace_id()),
|
||||
)?;
|
||||
serializer.entry("trace_id", format_args!("{}", span_context.trace_id()));
|
||||
}
|
||||
}
|
||||
|
||||
if spans.extracted.has_values() {
|
||||
if extracted.has_values() {
|
||||
// TODO: add fields from event, too?
|
||||
serializer.serialize_entry("extract", &spans.extracted)?;
|
||||
let extract = serializer.key("extract");
|
||||
json::value_as_object!(|extract| {
|
||||
for (key, value) in std::iter::zip(extracted.names, extracted.values) {
|
||||
if let Some(value) = value {
|
||||
extract.entry(*key, &value);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
serializer.end()
|
||||
};
|
||||
|
||||
serialize().map_err(io::Error::other)?;
|
||||
self.logline_buffer.push(b'\n');
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Extracts the message field that's mixed will other fields.
|
||||
struct MessageFieldExtractor<S: serde::ser::SerializeMap> {
|
||||
serializer: S,
|
||||
struct MessageFieldExtractor<'buf> {
|
||||
serializer: Option<json::ValueSer<'buf>>,
|
||||
skipped_field_indices: SkippedFieldIndices,
|
||||
state: Option<Result<(), S::Error>>,
|
||||
}
|
||||
|
||||
impl<S: serde::ser::SerializeMap> MessageFieldExtractor<S> {
|
||||
impl<'buf> MessageFieldExtractor<'buf> {
|
||||
#[inline]
|
||||
fn new(serializer: S, skipped_field_indices: SkippedFieldIndices) -> Self {
|
||||
fn new(serializer: json::ValueSer<'buf>, skipped_field_indices: SkippedFieldIndices) -> Self {
|
||||
Self {
|
||||
serializer,
|
||||
serializer: Some(serializer),
|
||||
skipped_field_indices,
|
||||
state: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn into_serializer(mut self) -> Result<S, S::Error> {
|
||||
match self.state {
|
||||
Some(Ok(())) => {}
|
||||
Some(Err(err)) => return Err(err),
|
||||
None => self.serializer.serialize_value("")?,
|
||||
fn finish(self) {
|
||||
if let Some(ser) = self.serializer {
|
||||
ser.value("");
|
||||
}
|
||||
Ok(self.serializer)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn accept_field(&self, field: &tracing::field::Field) -> bool {
|
||||
self.state.is_none()
|
||||
&& field.name() == MESSAGE_FIELD
|
||||
fn record_field(&mut self, field: &tracing::field::Field, v: impl json::ValueEncoder) {
|
||||
if field.name() == MESSAGE_FIELD
|
||||
&& !self.skipped_field_indices.contains(field.index())
|
||||
&& let Some(ser) = self.serializer.take()
|
||||
{
|
||||
ser.value(v);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtractor<S> {
|
||||
impl tracing::field::Visit for MessageFieldExtractor<'_> {
|
||||
#[inline]
|
||||
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&value));
|
||||
}
|
||||
self.record_field(field, value);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&value));
|
||||
}
|
||||
self.record_field(field, value);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&value));
|
||||
}
|
||||
self.record_field(field, value);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&value));
|
||||
}
|
||||
self.record_field(field, value);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&value));
|
||||
}
|
||||
self.record_field(field, value);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&value));
|
||||
}
|
||||
self.record_field(field, value);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&format_args!("{value:x?}")));
|
||||
}
|
||||
self.record_field(field, format_args!("{value:x?}"));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&value));
|
||||
}
|
||||
self.record_field(field, value);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&format_args!("{value:?}")));
|
||||
}
|
||||
self.record_field(field, format_args!("{value:?}"));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@@ -742,147 +741,83 @@ impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtracto
|
||||
field: &tracing::field::Field,
|
||||
value: &(dyn std::error::Error + 'static),
|
||||
) {
|
||||
if self.accept_field(field) {
|
||||
self.state = Some(self.serializer.serialize_value(&format_args!("{value}")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks if there's any fields and field values present. If not, the JSON subobject
|
||||
/// can be skipped.
|
||||
// This is entirely optional and only cosmetic, though maybe helps a
|
||||
// bit during log parsing in dashboards when there's no field with empty object.
|
||||
struct FieldsPresent(pub bool, SkippedFieldIndices);
|
||||
|
||||
// Even though some methods have an overhead (error, bytes) it is assumed the
|
||||
// compiler won't include this since we ignore the value entirely.
|
||||
impl tracing::field::Visit for FieldsPresent {
|
||||
#[inline]
|
||||
fn record_debug(&mut self, field: &tracing::field::Field, _: &dyn std::fmt::Debug) {
|
||||
if !self.1.contains(field.index())
|
||||
&& field.name() != MESSAGE_FIELD
|
||||
&& !field.name().starts_with("log.")
|
||||
{
|
||||
self.0 |= true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Serializes the fields directly supplied with a log event.
|
||||
struct SerializableEventFields<'a, 'event>(&'a tracing::Event<'event>, SkippedFieldIndices);
|
||||
|
||||
impl serde::ser::Serialize for SerializableEventFields<'_, '_> {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
use serde::ser::SerializeMap;
|
||||
let serializer = serializer.serialize_map(None)?;
|
||||
let mut message_skipper = MessageFieldSkipper::new(serializer, self.1);
|
||||
self.0.record(&mut message_skipper);
|
||||
let serializer = message_skipper.into_serializer()?;
|
||||
serializer.end()
|
||||
self.record_field(field, format_args!("{value}"));
|
||||
}
|
||||
}
|
||||
|
||||
/// A tracing field visitor that skips the message field.
|
||||
struct MessageFieldSkipper<S: serde::ser::SerializeMap> {
|
||||
serializer: S,
|
||||
struct MessageFieldSkipper<'buf> {
|
||||
serializer: json::ObjectSer<'buf>,
|
||||
skipped_field_indices: SkippedFieldIndices,
|
||||
state: Result<(), S::Error>,
|
||||
present: bool,
|
||||
}
|
||||
|
||||
impl<S: serde::ser::SerializeMap> MessageFieldSkipper<S> {
|
||||
impl<'buf> MessageFieldSkipper<'buf> {
|
||||
#[inline]
|
||||
fn new(serializer: S, skipped_field_indices: SkippedFieldIndices) -> Self {
|
||||
fn new(serializer: json::ObjectSer<'buf>, skipped_field_indices: SkippedFieldIndices) -> Self {
|
||||
Self {
|
||||
serializer,
|
||||
skipped_field_indices,
|
||||
state: Ok(()),
|
||||
present: false,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn accept_field(&self, field: &tracing::field::Field) -> bool {
|
||||
self.state.is_ok()
|
||||
&& field.name() != MESSAGE_FIELD
|
||||
fn record_field(&mut self, field: &tracing::field::Field, v: impl json::ValueEncoder) {
|
||||
if field.name() != MESSAGE_FIELD
|
||||
&& !field.name().starts_with("log.")
|
||||
&& !self.skipped_field_indices.contains(field.index())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn into_serializer(self) -> Result<S, S::Error> {
|
||||
self.state?;
|
||||
Ok(self.serializer)
|
||||
{
|
||||
self.serializer.entry(field.name(), v);
|
||||
self.present |= true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<S> {
|
||||
impl tracing::field::Visit for MessageFieldSkipper<'_> {
|
||||
#[inline]
|
||||
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self.serializer.serialize_entry(field.name(), &value);
|
||||
}
|
||||
self.record_field(field, value);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self.serializer.serialize_entry(field.name(), &value);
|
||||
}
|
||||
self.record_field(field, value);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self.serializer.serialize_entry(field.name(), &value);
|
||||
}
|
||||
self.record_field(field, value);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self.serializer.serialize_entry(field.name(), &value);
|
||||
}
|
||||
self.record_field(field, value);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self.serializer.serialize_entry(field.name(), &value);
|
||||
}
|
||||
self.record_field(field, value);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self.serializer.serialize_entry(field.name(), &value);
|
||||
}
|
||||
self.record_field(field, value);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self
|
||||
.serializer
|
||||
.serialize_entry(field.name(), &format_args!("{value:x?}"));
|
||||
}
|
||||
self.record_field(field, format_args!("{value:x?}"));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self.serializer.serialize_entry(field.name(), &value);
|
||||
}
|
||||
self.record_field(field, value);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self
|
||||
.serializer
|
||||
.serialize_entry(field.name(), &format_args!("{value:?}"));
|
||||
}
|
||||
self.record_field(field, format_args!("{value:?}"));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@@ -891,131 +826,40 @@ impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<
|
||||
field: &tracing::field::Field,
|
||||
value: &(dyn std::error::Error + 'static),
|
||||
) {
|
||||
if self.accept_field(field) {
|
||||
self.state = self.serializer.serialize_value(&format_args!("{value}"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Serializes the span stack from root to leaf (parent of event) as object
|
||||
/// with the span names as keys. To prevent collision we append a numberic value
|
||||
/// to the name. Also, collects any span fields we're interested in. Last one
|
||||
/// wins.
|
||||
struct SerializableSpans<'ctx, S>
|
||||
where
|
||||
S: for<'lookup> LookupSpan<'lookup>,
|
||||
{
|
||||
spans: Vec<SpanRef<'ctx, S>>,
|
||||
extracted: ExtractedSpanFields,
|
||||
}
|
||||
|
||||
impl<S> serde::ser::Serialize for SerializableSpans<'_, S>
|
||||
where
|
||||
S: for<'lookup> LookupSpan<'lookup>,
|
||||
{
|
||||
fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
|
||||
where
|
||||
Ser: serde::ser::Serializer,
|
||||
{
|
||||
let mut serializer = serializer.serialize_map(None)?;
|
||||
|
||||
for span in self.spans.iter().rev() {
|
||||
let ext = span.extensions();
|
||||
|
||||
// all spans should have this extension.
|
||||
let Some(fields) = ext.get() else { continue };
|
||||
|
||||
self.extracted.layer_span(fields);
|
||||
|
||||
let SpanFields { values, span_info } = fields;
|
||||
serializer.serialize_entry(
|
||||
&*span_info.normalized_name,
|
||||
&SerializableSpanFields {
|
||||
fields: span.metadata().fields(),
|
||||
values,
|
||||
},
|
||||
)?;
|
||||
}
|
||||
|
||||
serializer.end()
|
||||
}
|
||||
}
|
||||
|
||||
/// Serializes the span fields as object.
|
||||
struct SerializableSpanFields<'span> {
|
||||
fields: &'span tracing::field::FieldSet,
|
||||
values: &'span [serde_json::Value; MAX_TRACING_FIELDS],
|
||||
}
|
||||
|
||||
impl serde::ser::Serialize for SerializableSpanFields<'_> {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::ser::Serializer,
|
||||
{
|
||||
let mut serializer = serializer.serialize_map(None)?;
|
||||
|
||||
for (field, value) in std::iter::zip(self.fields, self.values) {
|
||||
if value.is_null() {
|
||||
continue;
|
||||
}
|
||||
serializer.serialize_entry(field.name(), value)?;
|
||||
}
|
||||
|
||||
serializer.end()
|
||||
self.record_field(field, format_args!("{value}"));
|
||||
}
|
||||
}
|
||||
|
||||
struct ExtractedSpanFields {
|
||||
names: &'static [&'static str],
|
||||
values: RefCell<Vec<serde_json::Value>>,
|
||||
values: Vec<Option<RawValue>>,
|
||||
}
|
||||
|
||||
impl ExtractedSpanFields {
|
||||
fn new(names: &'static [&'static str]) -> Self {
|
||||
ExtractedSpanFields {
|
||||
names,
|
||||
values: RefCell::new(vec![serde_json::Value::Null; names.len()]),
|
||||
values: vec![None; names.len()],
|
||||
}
|
||||
}
|
||||
|
||||
fn layer_span(&self, fields: &SpanFields) {
|
||||
let mut v = self.values.borrow_mut();
|
||||
fn layer_span(&mut self, fields: &SpanFields) {
|
||||
let SpanFields { values, span_info } = fields;
|
||||
|
||||
// extract the fields
|
||||
for (i, &j) in span_info.extract.iter().enumerate() {
|
||||
let Some(value) = values.get(j) else { continue };
|
||||
let Some(Some(value)) = values.get(j) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
if !value.is_null() {
|
||||
// TODO: replace clone with reference, if possible.
|
||||
v[i] = value.clone();
|
||||
}
|
||||
// TODO: replace clone with reference, if possible.
|
||||
self.values[i] = Some(value.clone());
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn has_values(&self) -> bool {
|
||||
self.values.borrow().iter().any(|v| !v.is_null())
|
||||
}
|
||||
}
|
||||
|
||||
impl serde::ser::Serialize for ExtractedSpanFields {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::ser::Serializer,
|
||||
{
|
||||
let mut serializer = serializer.serialize_map(None)?;
|
||||
|
||||
let values = self.values.borrow();
|
||||
for (key, value) in std::iter::zip(self.names, &*values) {
|
||||
if value.is_null() {
|
||||
continue;
|
||||
}
|
||||
|
||||
serializer.serialize_entry(key, value)?;
|
||||
}
|
||||
|
||||
serializer.end()
|
||||
self.values.iter().any(|v| v.is_some())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -112,6 +112,9 @@ pub struct ProxyMetrics {
|
||||
/// Number of bytes sent/received between all clients and backends.
|
||||
pub io_bytes: CounterVec<StaticLabelSet<Direction>>,
|
||||
|
||||
/// Number of IO errors while logging.
|
||||
pub logging_errors_count: Counter,
|
||||
|
||||
/// Number of errors by a given classification.
|
||||
pub errors_total: CounterVec<StaticLabelSet<crate::error::ErrorKind>>,
|
||||
|
||||
|
||||
Reference in New Issue
Block a user