From 53fdcd252f0df35ab7a48f9bae54dd2ad61285fb Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Sat, 17 May 2025 21:51:00 +0200 Subject: [PATCH] remove locking from extract, use refcell instead --- proxy/src/logging.rs | 122 ++++++++++++++++++++++--------------------- 1 file changed, 62 insertions(+), 60 deletions(-) diff --git a/proxy/src/logging.rs b/proxy/src/logging.rs index 7d909ce43a..db378117f3 100644 --- a/proxy/src/logging.rs +++ b/proxy/src/logging.rs @@ -18,7 +18,6 @@ use tracing_subscriber::fmt::{FormatEvent, FormatFields}; use tracing_subscriber::layer::{Context, Layer}; use tracing_subscriber::prelude::*; use tracing_subscriber::registry::{LookupSpan, SpanRef}; -use try_lock::TryLock; /// Initialize logging and OpenTelemetry tracing and exporter. /// @@ -577,11 +576,18 @@ impl EventFormatter { )?; } - let spans = SerializableSpans { - ctx, - extract: ExtractedSpanFields::new(state), + let extracted = { + let spans = SerializableSpans { + spans: ctx + .event_span(event) + .map_or(vec![], |parent| parent.scope().collect_vec()), + extract: ExtractedSpanFields::new(state), + }; + + serializer.serialize_entry("spans", &spans)?; + + spans.extract }; - serializer.serialize_entry("spans", &spans)?; // TODO: thread-local cache? let pid = std::process::id(); @@ -632,9 +638,9 @@ impl EventFormatter { } } - if spans.extract.has_values() { + if extracted.has_values() { // TODO: add fields from event, too? - serializer.serialize_entry("extract", &spans.extract)?; + serializer.serialize_entry("extract", &extracted)?; } serializer.end() @@ -910,17 +916,17 @@ impl tracing::field::Visit for MessageFieldSkipper< /// with the span names as keys. To prevent collision we append a numberic value /// to the name. Also, collects any span fields we're interested in. Last one /// wins. -struct SerializableSpans<'a, 'ctx, Span, const F: usize> +struct SerializableSpans<'a, 'ctx, S, const F: usize> where - Span: Subscriber + for<'lookup> LookupSpan<'lookup>, + S: for<'lookup> LookupSpan<'lookup>, { - ctx: &'a Context<'ctx, Span>, + spans: Vec>, extract: ExtractedSpanFields<'a, F>, } impl serde::ser::Serialize for SerializableSpans<'_, '_, Span, F> where - Span: Subscriber + for<'lookup> LookupSpan<'lookup>, + Span: for<'lookup> LookupSpan<'lookup>, { fn serialize(&self, serializer: Ser) -> Result where @@ -928,27 +934,33 @@ where { let mut serializer = serializer.serialize_map(None)?; - if let Some(leaf_span) = self.ctx.lookup_current() { - for span in leaf_span.scope().from_root() { - // Append a numeric callsite ID to the span name to keep the name unique - // in the JSON object. - let cid = self - .extract - .state - .callsite_ids - .pin() - .get(&span.metadata().callsite()) - .copied() - .unwrap_or_default(); + for span in self.spans.iter().rev() { + // Append a numeric callsite ID to the span name to keep the name unique + // in the JSON object. + let cid = self + .extract + .state + .callsite_ids + .pin() + .get(&span.metadata().callsite()) + .copied() + .unwrap_or_default(); - // Loki turns the # into an underscore during field name concatenation. - serializer.serialize_key(&format_args!("{}#{}", span.metadata().name(), &cid))?; + let ext = span.extensions(); - serializer.serialize_value(&SerializableSpanFields { - span: &span, + // all spans must have this extension + let Some(fields) = ext.get::() else { + continue; + }; + + // Loki turns the # into an underscore during field name concatenation. + serializer.serialize_entry( + &format_args!("{}#{}", span.metadata().name(), &cid), + &SerializableSpanFields { + fields, extract: &self.extract, - })?; - } + }, + )?; } serializer.end() @@ -956,34 +968,25 @@ where } /// Serializes the span fields as object. -struct SerializableSpanFields<'a, 'span, Span, const F: usize> -where - Span: for<'lookup> LookupSpan<'lookup>, -{ - span: &'a SpanRef<'span, Span>, +struct SerializableSpanFields<'a, const F: usize> { + fields: &'a SpanFields, extract: &'a ExtractedSpanFields<'a, F>, } -impl serde::ser::Serialize for SerializableSpanFields<'_, '_, Span, F> -where - Span: for<'lookup> LookupSpan<'lookup>, -{ +impl serde::ser::Serialize for SerializableSpanFields<'_, F> { fn serialize(&self, serializer: S) -> Result where S: serde::ser::Serializer, { let mut serializer = serializer.serialize_map(None)?; - let ext = self.span.extensions(); - if let Some(data) = ext.get::() { - for (name, value) in &data.fields { - if value.is_null() { - continue; - } - - serializer.serialize_entry(name, value)?; - self.extract.set(name, value); + for (name, value) in &self.fields.fields { + if value.is_null() { + continue; } + + serializer.serialize_entry(name, value)?; + self.extract.set(name, value); } serializer.end() @@ -992,16 +995,14 @@ where struct ExtractedSpanFields<'a, const F: usize> { state: &'a JsonLoggingState, - // TODO: replace TryLock with something local thread and interior mutability. - // serde API doesn't let us use `mut`. - values: TryLock<([Option; F], bool)>, + values: RefCell<[serde_json::Value; F]>, } impl<'a, const F: usize> ExtractedSpanFields<'a, F> { fn new(state: &'a JsonLoggingState) -> Self { ExtractedSpanFields { state, - values: TryLock::new((array::from_fn(|_| Option::default()), false)), + values: RefCell::new(array::from_fn(|_| serde_json::Value::Null)), } } @@ -1009,16 +1010,15 @@ impl<'a, const F: usize> ExtractedSpanFields<'a, F> { fn set(&self, name: &'static str, value: &serde_json::Value) { // expectation: this list is small, so a linear search is fast. if let Some(index) = self.state.extract_fields.iter().position(|x| *x == name) { - let mut fields = self.values.try_lock().expect("thread-local use"); + let mut fields = self.values.borrow_mut(); // TODO: replace clone with reference, if possible. - fields.0[index] = Some(value.clone()); - fields.1 = true; + fields[index] = value.clone(); } } #[inline] fn has_values(&self) -> bool { - self.values.try_lock().expect("thread-local use").1 + self.values.borrow().iter().any(|v| !v.is_null()) } } @@ -1029,12 +1029,14 @@ impl serde::ser::Serialize for ExtractedSpanFields<'_, F> { { let mut serializer = serializer.serialize_map(None)?; - let values = self.values.try_lock().expect("thread-local use"); - for (i, value) in values.0.iter().enumerate() { - if let Some(value) = value { - let key = self.state.extract_fields[i]; - serializer.serialize_entry(key, value)?; + let values = self.values.borrow(); + for (i, value) in values.iter().enumerate() { + if value.is_null() { + continue; } + + let key = self.state.extract_fields[i]; + serializer.serialize_entry(key, value)?; } serializer.end()