diff --git a/proxy/src/logging.rs b/proxy/src/logging.rs index e608300bd2..a87b0f1175 100644 --- a/proxy/src/logging.rs +++ b/proxy/src/logging.rs @@ -6,7 +6,6 @@ use std::{env, io}; use chrono::{DateTime, Utc}; use opentelemetry::trace::TraceContextExt; -use serde::ser::{SerializeMap, Serializer}; use tracing::subscriber::Interest; use tracing::{Event, Metadata, Span, Subscriber, callsite, span}; use tracing_opentelemetry::OpenTelemetrySpanExt; @@ -16,7 +15,9 @@ use tracing_subscriber::fmt::time::SystemTime; use tracing_subscriber::fmt::{FormatEvent, FormatFields}; use tracing_subscriber::layer::{Context, Layer}; use tracing_subscriber::prelude::*; -use tracing_subscriber::registry::{LookupSpan, SpanRef}; +use tracing_subscriber::registry::LookupSpan; + +use crate::metrics::Metrics; /// Initialize logging and OpenTelemetry tracing and exporter. /// @@ -249,7 +250,7 @@ where // early, before OTel machinery, and add as event extension. let now = self.clock.now(); - let res: io::Result<()> = EVENT_FORMATTER.with(|f| { + EVENT_FORMATTER.with(|f| { let mut borrow = f.try_borrow_mut(); let formatter = match borrow.as_deref_mut() { Ok(formatter) => formatter, @@ -259,31 +260,19 @@ where Err(_) => &mut EventFormatter::new(), }; - formatter.reset(); formatter.format( now, event, &ctx, &self.skipped_field_indices, self.extract_fields, - )?; - self.writer.make_writer().write_all(formatter.buffer()) - }); + ); - // In case logging fails we generate a simpler JSON object. - if let Err(err) = res - && let Ok(mut line) = serde_json::to_vec(&serde_json::json!( { - "timestamp": now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true), - "level": "ERROR", - "message": format_args!("cannot log event: {err:?}"), - "fields": { - "event": format_args!("{event:?}"), - }, - })) - { - line.push(b'\n'); - self.writer.make_writer().write_all(&line).ok(); - } + let mut writer = self.writer.make_writer(); + if writer.write_all(formatter.buffer()).is_err() { + Metrics::get().proxy.logging_errors_count.inc(); + } + }); } /// Registers a SpanFields instance as span extension. @@ -382,9 +371,24 @@ impl CallsiteSpanInfo { } } +#[derive(Clone)] +struct RawValue(Box<[u8]>); + +impl RawValue { + fn new(v: impl json::ValueEncoder) -> Self { + Self(json::value_to_vec!(|val| v.encode(val)).into_boxed_slice()) + } +} + +impl json::ValueEncoder for &RawValue { + fn encode(self, v: json::ValueSer<'_>) { + v.write_raw_json(&self.0); + } +} + /// Stores span field values recorded during the spans lifetime. struct SpanFields { - values: [serde_json::Value; MAX_TRACING_FIELDS], + values: [Option; MAX_TRACING_FIELDS], /// cached span info so we can avoid extra hashmap lookups in the hot path. span_info: CallsiteSpanInfo, @@ -394,7 +398,7 @@ impl SpanFields { fn new(span_info: CallsiteSpanInfo) -> Self { Self { span_info, - values: [const { serde_json::Value::Null }; MAX_TRACING_FIELDS], + values: [const { None }; MAX_TRACING_FIELDS], } } } @@ -402,55 +406,55 @@ impl SpanFields { impl tracing::field::Visit for SpanFields { #[inline] fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { - self.values[field.index()] = serde_json::Value::from(value); + self.values[field.index()] = Some(RawValue::new(value)); } #[inline] fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { - self.values[field.index()] = serde_json::Value::from(value); + self.values[field.index()] = Some(RawValue::new(value)); } #[inline] fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { - self.values[field.index()] = serde_json::Value::from(value); + self.values[field.index()] = Some(RawValue::new(value)); } #[inline] fn record_i128(&mut self, field: &tracing::field::Field, value: i128) { if let Ok(value) = i64::try_from(value) { - self.values[field.index()] = serde_json::Value::from(value); + self.values[field.index()] = Some(RawValue::new(value)); } else { - self.values[field.index()] = serde_json::Value::from(format!("{value}")); + self.values[field.index()] = Some(RawValue::new(format_args!("{value}"))); } } #[inline] fn record_u128(&mut self, field: &tracing::field::Field, value: u128) { if let Ok(value) = u64::try_from(value) { - self.values[field.index()] = serde_json::Value::from(value); + self.values[field.index()] = Some(RawValue::new(value)); } else { - self.values[field.index()] = serde_json::Value::from(format!("{value}")); + self.values[field.index()] = Some(RawValue::new(format_args!("{value}"))); } } #[inline] fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { - self.values[field.index()] = serde_json::Value::from(value); + self.values[field.index()] = Some(RawValue::new(value)); } #[inline] fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) { - self.values[field.index()] = serde_json::Value::from(value); + self.values[field.index()] = Some(RawValue::new(value)); } #[inline] fn record_str(&mut self, field: &tracing::field::Field, value: &str) { - self.values[field.index()] = serde_json::Value::from(value); + self.values[field.index()] = Some(RawValue::new(value)); } #[inline] fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { - self.values[field.index()] = serde_json::Value::from(format!("{value:?}")); + self.values[field.index()] = Some(RawValue::new(format_args!("{value:?}"))); } #[inline] @@ -459,7 +463,7 @@ impl tracing::field::Visit for SpanFields { field: &tracing::field::Field, value: &(dyn std::error::Error + 'static), ) { - self.values[field.index()] = serde_json::Value::from(format!("{value}")); + self.values[field.index()] = Some(RawValue::new(format_args!("{value}"))); } } @@ -508,11 +512,6 @@ impl EventFormatter { &self.logline_buffer } - #[inline] - fn reset(&mut self) { - self.logline_buffer.clear(); - } - fn format( &mut self, now: DateTime, @@ -520,8 +519,7 @@ impl EventFormatter { ctx: &Context<'_, S>, skipped_field_indices: &CallsiteMap, extract_fields: &'static [&'static str], - ) -> io::Result<()> - where + ) where S: Subscriber + for<'a> LookupSpan<'a>, { let timestamp = now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true); @@ -536,78 +534,99 @@ impl EventFormatter { .copied() .unwrap_or_default(); - let mut serialize = || { - let mut serializer = serde_json::Serializer::new(&mut self.logline_buffer); - - let mut serializer = serializer.serialize_map(None)?; - + self.logline_buffer.clear(); + let serializer = json::ValueSer::new(&mut self.logline_buffer); + json::value_as_object!(|serializer| { // Timestamp comes first, so raw lines can be sorted by timestamp. - serializer.serialize_entry("timestamp", ×tamp)?; + serializer.entry("timestamp", &*timestamp); // Level next. - serializer.serialize_entry("level", &meta.level().as_str())?; + serializer.entry("level", meta.level().as_str()); // Message next. - serializer.serialize_key("message")?; let mut message_extractor = - MessageFieldExtractor::new(serializer, skipped_field_indices); + MessageFieldExtractor::new(serializer.key("message"), skipped_field_indices); event.record(&mut message_extractor); - let mut serializer = message_extractor.into_serializer()?; + message_extractor.finish(); // Direct message fields. - let mut fields_present = FieldsPresent(false, skipped_field_indices); - event.record(&mut fields_present); - if fields_present.0 { - serializer.serialize_entry( - "fields", - &SerializableEventFields(event, skipped_field_indices), - )?; + { + let mut message_skipper = MessageFieldSkipper::new( + serializer.key("fields").object(), + skipped_field_indices, + ); + event.record(&mut message_skipper); + + // rollback if no fields are present. + if message_skipper.present { + message_skipper.serializer.finish(); + } } - let spans = SerializableSpans { - // collect all spans from parent to root. - spans: ctx + let mut extracted = ExtractedSpanFields::new(extract_fields); + + let spans = serializer.key("spans"); + json::value_as_object!(|spans| { + let parent_spans = ctx .event_span(event) - .map_or(vec![], |parent| parent.scope().collect()), - extracted: ExtractedSpanFields::new(extract_fields), - }; - serializer.serialize_entry("spans", &spans)?; + .map_or(vec![], |parent| parent.scope().collect()); + + for span in parent_spans.iter().rev() { + let ext = span.extensions(); + + // all spans should have this extension. + let Some(fields) = ext.get() else { continue }; + + extracted.layer_span(fields); + + let SpanFields { values, span_info } = fields; + + let span_fields = spans.key(&*span_info.normalized_name); + json::value_as_object!(|span_fields| { + for (field, value) in std::iter::zip(span.metadata().fields(), values) { + if let Some(value) = value { + span_fields.entry(field.name(), value); + } + } + }); + } + }); // TODO: thread-local cache? let pid = std::process::id(); // Skip adding pid 1 to reduce noise for services running in containers. if pid != 1 { - serializer.serialize_entry("process_id", &pid)?; + serializer.entry("process_id", pid); } - THREAD_ID.with(|tid| serializer.serialize_entry("thread_id", tid))?; + THREAD_ID.with(|tid| serializer.entry("thread_id", tid)); // TODO: tls cache? name could change if let Some(thread_name) = std::thread::current().name() && !thread_name.is_empty() && thread_name != "tokio-runtime-worker" { - serializer.serialize_entry("thread_name", thread_name)?; + serializer.entry("thread_name", thread_name); } if let Some(task_id) = tokio::task::try_id() { - serializer.serialize_entry("task_id", &format_args!("{task_id}"))?; + serializer.entry("task_id", format_args!("{task_id}")); } - serializer.serialize_entry("target", meta.target())?; + serializer.entry("target", meta.target()); // Skip adding module if it's the same as target. if let Some(module) = meta.module_path() && module != meta.target() { - serializer.serialize_entry("module", module)?; + serializer.entry("module", module); } if let Some(file) = meta.file() { if let Some(line) = meta.line() { - serializer.serialize_entry("src", &format_args!("{file}:{line}"))?; + serializer.entry("src", format_args!("{file}:{line}")); } else { - serializer.serialize_entry("src", file)?; + serializer.entry("src", file); } } @@ -616,124 +635,104 @@ impl EventFormatter { let otel_spanref = otel_context.span(); let span_context = otel_spanref.span_context(); if span_context.is_valid() { - serializer.serialize_entry( - "trace_id", - &format_args!("{}", span_context.trace_id()), - )?; + serializer.entry("trace_id", format_args!("{}", span_context.trace_id())); } } - if spans.extracted.has_values() { + if extracted.has_values() { // TODO: add fields from event, too? - serializer.serialize_entry("extract", &spans.extracted)?; + let extract = serializer.key("extract"); + json::value_as_object!(|extract| { + for (key, value) in std::iter::zip(extracted.names, extracted.values) { + if let Some(value) = value { + extract.entry(*key, &value); + } + } + }); } + }); - serializer.end() - }; - - serialize().map_err(io::Error::other)?; self.logline_buffer.push(b'\n'); - Ok(()) } } /// Extracts the message field that's mixed will other fields. -struct MessageFieldExtractor { - serializer: S, +struct MessageFieldExtractor<'buf> { + serializer: Option>, skipped_field_indices: SkippedFieldIndices, - state: Option>, } -impl MessageFieldExtractor { +impl<'buf> MessageFieldExtractor<'buf> { #[inline] - fn new(serializer: S, skipped_field_indices: SkippedFieldIndices) -> Self { + fn new(serializer: json::ValueSer<'buf>, skipped_field_indices: SkippedFieldIndices) -> Self { Self { - serializer, + serializer: Some(serializer), skipped_field_indices, - state: None, } } #[inline] - fn into_serializer(mut self) -> Result { - match self.state { - Some(Ok(())) => {} - Some(Err(err)) => return Err(err), - None => self.serializer.serialize_value("")?, + fn finish(self) { + if let Some(ser) = self.serializer { + ser.value(""); } - Ok(self.serializer) } #[inline] - fn accept_field(&self, field: &tracing::field::Field) -> bool { - self.state.is_none() - && field.name() == MESSAGE_FIELD + fn record_field(&mut self, field: &tracing::field::Field, v: impl json::ValueEncoder) { + if field.name() == MESSAGE_FIELD && !self.skipped_field_indices.contains(field.index()) + && let Some(ser) = self.serializer.take() + { + ser.value(v); + } } } -impl tracing::field::Visit for MessageFieldExtractor { +impl tracing::field::Visit for MessageFieldExtractor<'_> { #[inline] fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { - if self.accept_field(field) { - self.state = Some(self.serializer.serialize_value(&value)); - } + self.record_field(field, value); } #[inline] fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { - if self.accept_field(field) { - self.state = Some(self.serializer.serialize_value(&value)); - } + self.record_field(field, value); } #[inline] fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { - if self.accept_field(field) { - self.state = Some(self.serializer.serialize_value(&value)); - } + self.record_field(field, value); } #[inline] fn record_i128(&mut self, field: &tracing::field::Field, value: i128) { - if self.accept_field(field) { - self.state = Some(self.serializer.serialize_value(&value)); - } + self.record_field(field, value); } #[inline] fn record_u128(&mut self, field: &tracing::field::Field, value: u128) { - if self.accept_field(field) { - self.state = Some(self.serializer.serialize_value(&value)); - } + self.record_field(field, value); } #[inline] fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { - if self.accept_field(field) { - self.state = Some(self.serializer.serialize_value(&value)); - } + self.record_field(field, value); } #[inline] fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) { - if self.accept_field(field) { - self.state = Some(self.serializer.serialize_value(&format_args!("{value:x?}"))); - } + self.record_field(field, format_args!("{value:x?}")); } #[inline] fn record_str(&mut self, field: &tracing::field::Field, value: &str) { - if self.accept_field(field) { - self.state = Some(self.serializer.serialize_value(&value)); - } + self.record_field(field, value); } #[inline] fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { - if self.accept_field(field) { - self.state = Some(self.serializer.serialize_value(&format_args!("{value:?}"))); - } + self.record_field(field, format_args!("{value:?}")); } #[inline] @@ -742,147 +741,83 @@ impl tracing::field::Visit for MessageFieldExtracto field: &tracing::field::Field, value: &(dyn std::error::Error + 'static), ) { - if self.accept_field(field) { - self.state = Some(self.serializer.serialize_value(&format_args!("{value}"))); - } - } -} - -/// Checks if there's any fields and field values present. If not, the JSON subobject -/// can be skipped. -// This is entirely optional and only cosmetic, though maybe helps a -// bit during log parsing in dashboards when there's no field with empty object. -struct FieldsPresent(pub bool, SkippedFieldIndices); - -// Even though some methods have an overhead (error, bytes) it is assumed the -// compiler won't include this since we ignore the value entirely. -impl tracing::field::Visit for FieldsPresent { - #[inline] - fn record_debug(&mut self, field: &tracing::field::Field, _: &dyn std::fmt::Debug) { - if !self.1.contains(field.index()) - && field.name() != MESSAGE_FIELD - && !field.name().starts_with("log.") - { - self.0 |= true; - } - } -} - -/// Serializes the fields directly supplied with a log event. -struct SerializableEventFields<'a, 'event>(&'a tracing::Event<'event>, SkippedFieldIndices); - -impl serde::ser::Serialize for SerializableEventFields<'_, '_> { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - use serde::ser::SerializeMap; - let serializer = serializer.serialize_map(None)?; - let mut message_skipper = MessageFieldSkipper::new(serializer, self.1); - self.0.record(&mut message_skipper); - let serializer = message_skipper.into_serializer()?; - serializer.end() + self.record_field(field, format_args!("{value}")); } } /// A tracing field visitor that skips the message field. -struct MessageFieldSkipper { - serializer: S, +struct MessageFieldSkipper<'buf> { + serializer: json::ObjectSer<'buf>, skipped_field_indices: SkippedFieldIndices, - state: Result<(), S::Error>, + present: bool, } -impl MessageFieldSkipper { +impl<'buf> MessageFieldSkipper<'buf> { #[inline] - fn new(serializer: S, skipped_field_indices: SkippedFieldIndices) -> Self { + fn new(serializer: json::ObjectSer<'buf>, skipped_field_indices: SkippedFieldIndices) -> Self { Self { serializer, skipped_field_indices, - state: Ok(()), + present: false, } } #[inline] - fn accept_field(&self, field: &tracing::field::Field) -> bool { - self.state.is_ok() - && field.name() != MESSAGE_FIELD + fn record_field(&mut self, field: &tracing::field::Field, v: impl json::ValueEncoder) { + if field.name() != MESSAGE_FIELD && !field.name().starts_with("log.") && !self.skipped_field_indices.contains(field.index()) - } - - #[inline] - fn into_serializer(self) -> Result { - self.state?; - Ok(self.serializer) + { + self.serializer.entry(field.name(), v); + self.present |= true; + } } } -impl tracing::field::Visit for MessageFieldSkipper { +impl tracing::field::Visit for MessageFieldSkipper<'_> { #[inline] fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { - if self.accept_field(field) { - self.state = self.serializer.serialize_entry(field.name(), &value); - } + self.record_field(field, value); } #[inline] fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { - if self.accept_field(field) { - self.state = self.serializer.serialize_entry(field.name(), &value); - } + self.record_field(field, value); } #[inline] fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { - if self.accept_field(field) { - self.state = self.serializer.serialize_entry(field.name(), &value); - } + self.record_field(field, value); } #[inline] fn record_i128(&mut self, field: &tracing::field::Field, value: i128) { - if self.accept_field(field) { - self.state = self.serializer.serialize_entry(field.name(), &value); - } + self.record_field(field, value); } #[inline] fn record_u128(&mut self, field: &tracing::field::Field, value: u128) { - if self.accept_field(field) { - self.state = self.serializer.serialize_entry(field.name(), &value); - } + self.record_field(field, value); } #[inline] fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { - if self.accept_field(field) { - self.state = self.serializer.serialize_entry(field.name(), &value); - } + self.record_field(field, value); } #[inline] fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) { - if self.accept_field(field) { - self.state = self - .serializer - .serialize_entry(field.name(), &format_args!("{value:x?}")); - } + self.record_field(field, format_args!("{value:x?}")); } #[inline] fn record_str(&mut self, field: &tracing::field::Field, value: &str) { - if self.accept_field(field) { - self.state = self.serializer.serialize_entry(field.name(), &value); - } + self.record_field(field, value); } #[inline] fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { - if self.accept_field(field) { - self.state = self - .serializer - .serialize_entry(field.name(), &format_args!("{value:?}")); - } + self.record_field(field, format_args!("{value:?}")); } #[inline] @@ -891,131 +826,40 @@ impl tracing::field::Visit for MessageFieldSkipper< field: &tracing::field::Field, value: &(dyn std::error::Error + 'static), ) { - if self.accept_field(field) { - self.state = self.serializer.serialize_value(&format_args!("{value}")); - } - } -} - -/// Serializes the span stack from root to leaf (parent of event) as object -/// with the span names as keys. To prevent collision we append a numberic value -/// to the name. Also, collects any span fields we're interested in. Last one -/// wins. -struct SerializableSpans<'ctx, S> -where - S: for<'lookup> LookupSpan<'lookup>, -{ - spans: Vec>, - extracted: ExtractedSpanFields, -} - -impl serde::ser::Serialize for SerializableSpans<'_, S> -where - S: for<'lookup> LookupSpan<'lookup>, -{ - fn serialize(&self, serializer: Ser) -> Result - where - Ser: serde::ser::Serializer, - { - let mut serializer = serializer.serialize_map(None)?; - - for span in self.spans.iter().rev() { - let ext = span.extensions(); - - // all spans should have this extension. - let Some(fields) = ext.get() else { continue }; - - self.extracted.layer_span(fields); - - let SpanFields { values, span_info } = fields; - serializer.serialize_entry( - &*span_info.normalized_name, - &SerializableSpanFields { - fields: span.metadata().fields(), - values, - }, - )?; - } - - serializer.end() - } -} - -/// Serializes the span fields as object. -struct SerializableSpanFields<'span> { - fields: &'span tracing::field::FieldSet, - values: &'span [serde_json::Value; MAX_TRACING_FIELDS], -} - -impl serde::ser::Serialize for SerializableSpanFields<'_> { - fn serialize(&self, serializer: S) -> Result - where - S: serde::ser::Serializer, - { - let mut serializer = serializer.serialize_map(None)?; - - for (field, value) in std::iter::zip(self.fields, self.values) { - if value.is_null() { - continue; - } - serializer.serialize_entry(field.name(), value)?; - } - - serializer.end() + self.record_field(field, format_args!("{value}")); } } struct ExtractedSpanFields { names: &'static [&'static str], - values: RefCell>, + values: Vec>, } impl ExtractedSpanFields { fn new(names: &'static [&'static str]) -> Self { ExtractedSpanFields { names, - values: RefCell::new(vec![serde_json::Value::Null; names.len()]), + values: vec![None; names.len()], } } - fn layer_span(&self, fields: &SpanFields) { - let mut v = self.values.borrow_mut(); + fn layer_span(&mut self, fields: &SpanFields) { let SpanFields { values, span_info } = fields; // extract the fields for (i, &j) in span_info.extract.iter().enumerate() { - let Some(value) = values.get(j) else { continue }; + let Some(Some(value)) = values.get(j) else { + continue; + }; - if !value.is_null() { - // TODO: replace clone with reference, if possible. - v[i] = value.clone(); - } + // TODO: replace clone with reference, if possible. + self.values[i] = Some(value.clone()); } } #[inline] fn has_values(&self) -> bool { - self.values.borrow().iter().any(|v| !v.is_null()) - } -} - -impl serde::ser::Serialize for ExtractedSpanFields { - fn serialize(&self, serializer: S) -> Result - where - S: serde::ser::Serializer, - { - let mut serializer = serializer.serialize_map(None)?; - - let values = self.values.borrow(); - for (key, value) in std::iter::zip(self.names, &*values) { - if value.is_null() { - continue; - } - - serializer.serialize_entry(key, value)?; - } - - serializer.end() + self.values.iter().any(|v| v.is_some()) } } diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index bf4d5a11eb..916604e2ec 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -112,6 +112,9 @@ pub struct ProxyMetrics { /// Number of bytes sent/received between all clients and backends. pub io_bytes: CounterVec>, + /// Number of IO errors while logging. + pub logging_errors_count: Counter, + /// Number of errors by a given classification. pub errors_total: CounterVec>,