diff --git a/proxy/src/logging.rs b/proxy/src/logging.rs index efa3c0b514..a58b55a704 100644 --- a/proxy/src/logging.rs +++ b/proxy/src/logging.rs @@ -1,13 +1,11 @@ -use std::cell::{Cell, RefCell}; +use std::cell::RefCell; use std::collections::HashMap; -use std::hash::BuildHasher; +use std::sync::Arc; use std::sync::atomic::{AtomicU32, Ordering}; -use std::{array, env, fmt, io}; +use std::{env, io}; use chrono::{DateTime, Utc}; -use indexmap::IndexSet; use opentelemetry::trace::TraceContextExt; -use scopeguard::defer; use serde::ser::{SerializeMap, Serializer}; use tracing::subscriber::Interest; use tracing::{Event, Metadata, Span, Subscriber, callsite, span}; @@ -19,7 +17,6 @@ use tracing_subscriber::fmt::{FormatEvent, FormatFields}; use tracing_subscriber::layer::{Context, Layer}; use tracing_subscriber::prelude::*; use tracing_subscriber::registry::{LookupSpan, SpanRef}; -use try_lock::TryLock; /// Initialize logging and OpenTelemetry tracing and exporter. /// @@ -55,7 +52,7 @@ pub async fn init() -> anyhow::Result { StderrWriter { stderr: std::io::stderr(), }, - ["request_id", "session_id", "conn_id"], + &["request_id", "session_id", "conn_id"], )) } else { None @@ -183,50 +180,65 @@ impl Clock for RealClock { /// Name of the field used by tracing crate to store the event message. const MESSAGE_FIELD: &str = "message"; +/// Tracing used to enforce that spans/events have no more than 32 fields. +/// It seems this is no longer the case, but it's still documented in some places. +/// Generally, we shouldn't expect more than 32 fields anyway, so we can try and +/// rely on it for some (minor) performance gains. +const MAX_TRACING_FIELDS: usize = 32; + thread_local! { - /// Protects against deadlocks and double panics during log writing. - /// The current panic handler will use tracing to log panic information. - static REENTRANCY_GUARD: Cell = const { Cell::new(false) }; /// Thread-local instance with per-thread buffer for log writing. - static EVENT_FORMATTER: RefCell = RefCell::new(EventFormatter::new()); + static EVENT_FORMATTER: RefCell = const { RefCell::new(EventFormatter::new()) }; /// Cached OS thread ID. static THREAD_ID: u64 = gettid::gettid(); } +/// Map for values fixed at callsite registration. +// We use papaya here because registration rarely happens post-startup. +// papaya is good for read-heavy workloads. +// +// We use rustc_hash here because callsite::Identifier will always be an integer with low-bit entropy, +// since it's always a pointer to static mutable data. rustc_hash was designed for low-bit entropy. +type CallsiteMap = + papaya::HashMap>; + /// Implements tracing layer to handle events specific to logging. -struct JsonLoggingLayer { +struct JsonLoggingLayer { clock: C, - skipped_field_indices: papaya::HashMap, - callsite_ids: papaya::HashMap, writer: W, - // We use a const generic and arrays to bypass one heap allocation. - extract_fields: IndexSet<&'static str>, - _marker: std::marker::PhantomData<[&'static str; F]>, + + /// tracks which fields of each **event** are duplicates + skipped_field_indices: CallsiteMap, + + span_info: CallsiteMap, + + /// Fields we want to keep track of in a separate json object. + extract_fields: &'static [&'static str], } -impl JsonLoggingLayer { - fn new(clock: C, writer: W, extract_fields: [&'static str; F]) -> Self { +impl JsonLoggingLayer { + fn new(clock: C, writer: W, extract_fields: &'static [&'static str]) -> Self { JsonLoggingLayer { clock, - skipped_field_indices: papaya::HashMap::default(), - callsite_ids: papaya::HashMap::default(), + skipped_field_indices: CallsiteMap::default(), + span_info: CallsiteMap::default(), writer, - extract_fields: IndexSet::from_iter(extract_fields), - _marker: std::marker::PhantomData, + extract_fields, } } #[inline] - fn callsite_id(&self, cs: callsite::Identifier) -> CallsiteId { - *self - .callsite_ids + fn span_info(&self, metadata: &'static Metadata<'static>) -> CallsiteSpanInfo { + self.span_info .pin() - .get_or_insert_with(cs, CallsiteId::next) + .get_or_insert_with(metadata.callsite(), || { + CallsiteSpanInfo::new(metadata, self.extract_fields) + }) + .clone() } } -impl Layer - for JsonLoggingLayer +impl Layer for JsonLoggingLayer where S: Subscriber + for<'a> LookupSpan<'a>, { @@ -237,35 +249,25 @@ where // early, before OTel machinery, and add as event extension. let now = self.clock.now(); - let res: io::Result<()> = REENTRANCY_GUARD.with(move |entered| { - if entered.get() { - let mut formatter = EventFormatter::new(); - formatter.format::( - now, - event, - &ctx, - &self.skipped_field_indices, - &self.callsite_ids, - &self.extract_fields, - )?; - self.writer.make_writer().write_all(formatter.buffer()) - } else { - entered.set(true); - defer!(entered.set(false);); + let res: io::Result<()> = EVENT_FORMATTER.with(|f| { + let mut borrow = f.try_borrow_mut(); + let formatter = match borrow.as_deref_mut() { + Ok(formatter) => formatter, + // If the thread local formatter is borrowed, + // then we likely hit an edge case were we panicked during formatting. + // We allow the logging to proceed with an uncached formatter. + Err(_) => &mut EventFormatter::new(), + }; - EVENT_FORMATTER.with_borrow_mut(move |formatter| { - formatter.reset(); - formatter.format::( - now, - event, - &ctx, - &self.skipped_field_indices, - &self.callsite_ids, - &self.extract_fields, - )?; - self.writer.make_writer().write_all(formatter.buffer()) - }) - } + formatter.reset(); + formatter.format( + now, + event, + &ctx, + &self.skipped_field_indices, + self.extract_fields, + )?; + self.writer.make_writer().write_all(formatter.buffer()) }); // In case logging fails we generate a simpler JSON object. @@ -287,50 +289,48 @@ where /// Registers a SpanFields instance as span extension. fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) { let span = ctx.span(id).expect("span must exist"); - let fields = SpanFields::default(); - fields.record_fields(attrs); - // This could deadlock when there's a panic somewhere in the tracing - // event handling and a read or write guard is still held. This includes - // the OTel subscriber. - let mut exts = span.extensions_mut(); + let mut fields = SpanFields::new(self.span_info(span.metadata())); + attrs.record(&mut fields); - exts.insert(fields); + // This is a new span: the extensions should not be locked + // unless some layer spawned a thread to process this span. + // I don't think any layers do that. + span.extensions_mut().insert(fields); } fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) { let span = ctx.span(id).expect("span must exist"); - let ext = span.extensions(); - if let Some(data) = ext.get::() { - data.record_fields(values); + + // assumption: `on_record` is rarely called. + // assumption: a span being updated by one thread, + // and formatted by another thread is even rarer. + let mut ext = span.extensions_mut(); + if let Some(fields) = ext.get_mut::() { + values.record(fields); } } - /// Called (lazily) whenever a new log call is executed. We quickly check - /// for duplicate field names and record duplicates as skippable. Last one - /// wins. + /// Called (lazily) roughly once per event/span instance. We quickly check + /// for duplicate field names and record duplicates as skippable. Last field wins. fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest { + debug_assert!( + metadata.fields().len() <= MAX_TRACING_FIELDS, + "callsite {metadata:?} has too many fields." + ); + if !metadata.is_event() { - self.callsite_id(metadata.callsite()); + // register the span info. + self.span_info(metadata); // Must not be never because we wouldn't get trace and span data. return Interest::always(); } let mut field_indices = SkippedFieldIndices::default(); - let mut seen_fields = HashMap::<&'static str, usize>::new(); + let mut seen_fields = HashMap::new(); for field in metadata.fields() { - use std::collections::hash_map::Entry; - match seen_fields.entry(field.name()) { - Entry::Vacant(entry) => { - // field not seen yet - entry.insert(field.index()); - } - Entry::Occupied(mut entry) => { - // replace currently stored index - let old_index = entry.insert(field.index()); - // ... and append it to list of skippable indices - field_indices.push(old_index); - } + if let Some(old_index) = seen_fields.insert(field.name(), field.index()) { + field_indices.set(old_index); } } @@ -344,110 +344,113 @@ where } } -#[derive(Copy, Clone, Debug, Default)] -#[repr(transparent)] -struct CallsiteId(u32); +/// Any span info that is fixed to a particular callsite. Not variable between span instances. +#[derive(Clone)] +struct CallsiteSpanInfo { + /// index of each field to extract. usize::MAX if not found. + extract: Arc<[usize]>, -impl CallsiteId { - #[inline] - fn next() -> Self { - // Start at 1 to reserve 0 for default. - static COUNTER: AtomicU32 = AtomicU32::new(1); - CallsiteId(COUNTER.fetch_add(1, Ordering::Relaxed)) - } + /// tracks the fixed "callsite ID" for each span. + /// note: this is not stable between runs. + normalized_name: Arc, } -impl fmt::Display for CallsiteId { - #[inline] - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) +impl CallsiteSpanInfo { + fn new(metadata: &'static Metadata<'static>, extract_fields: &[&'static str]) -> Self { + // Start at 1 to reserve 0 for default. + static COUNTER: AtomicU32 = AtomicU32::new(1); + + let names: Vec<&'static str> = metadata.fields().iter().map(|f| f.name()).collect(); + + // get all the indices of span fields we want to focus + let extract = extract_fields + .iter() + // use rposition, since we want last match wins. + .map(|f1| names.iter().rposition(|f2| f1 == f2).unwrap_or(usize::MAX)) + .collect(); + + // normalized_name is unique for each callsite, but it is not + // unified across separate proxy instances. + // todo: can we do better here? + let cid = COUNTER.fetch_add(1, Ordering::Relaxed); + let normalized_name = format!("{}#{cid}", metadata.name()).into(); + + Self { + extract, + normalized_name, + } } } /// Stores span field values recorded during the spans lifetime. -#[derive(Default)] struct SpanFields { - // TODO: Switch to custom enum with lasso::Spur for Strings? - fields: papaya::HashMap<&'static str, serde_json::Value>, + values: [serde_json::Value; MAX_TRACING_FIELDS], + + /// cached span info so we can avoid extra hashmap lookups in the hot path. + span_info: CallsiteSpanInfo, } impl SpanFields { - #[inline] - fn record_fields(&self, fields: R) { - fields.record(&mut SpanFieldsRecorder { - fields: self.fields.pin(), - }); + fn new(span_info: CallsiteSpanInfo) -> Self { + Self { + span_info, + values: [const { serde_json::Value::Null }; MAX_TRACING_FIELDS], + } } } -/// Implements a tracing field visitor to convert and store values. -struct SpanFieldsRecorder<'m, S, G> { - fields: papaya::HashMapRef<'m, &'static str, serde_json::Value, S, G>, -} - -impl tracing::field::Visit for SpanFieldsRecorder<'_, S, G> { +impl tracing::field::Visit for SpanFields { #[inline] fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { - self.fields - .insert(field.name(), serde_json::Value::from(value)); + self.values[field.index()] = serde_json::Value::from(value); } #[inline] fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { - self.fields - .insert(field.name(), serde_json::Value::from(value)); + self.values[field.index()] = serde_json::Value::from(value); } #[inline] fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { - self.fields - .insert(field.name(), serde_json::Value::from(value)); + self.values[field.index()] = serde_json::Value::from(value); } #[inline] fn record_i128(&mut self, field: &tracing::field::Field, value: i128) { if let Ok(value) = i64::try_from(value) { - self.fields - .insert(field.name(), serde_json::Value::from(value)); + self.values[field.index()] = serde_json::Value::from(value); } else { - self.fields - .insert(field.name(), serde_json::Value::from(format!("{value}"))); + self.values[field.index()] = serde_json::Value::from(format!("{value}")); } } #[inline] fn record_u128(&mut self, field: &tracing::field::Field, value: u128) { if let Ok(value) = u64::try_from(value) { - self.fields - .insert(field.name(), serde_json::Value::from(value)); + self.values[field.index()] = serde_json::Value::from(value); } else { - self.fields - .insert(field.name(), serde_json::Value::from(format!("{value}"))); + self.values[field.index()] = serde_json::Value::from(format!("{value}")); } } #[inline] fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { - self.fields - .insert(field.name(), serde_json::Value::from(value)); + self.values[field.index()] = serde_json::Value::from(value); } #[inline] fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) { - self.fields - .insert(field.name(), serde_json::Value::from(value)); + self.values[field.index()] = serde_json::Value::from(value); } #[inline] fn record_str(&mut self, field: &tracing::field::Field, value: &str) { - self.fields - .insert(field.name(), serde_json::Value::from(value)); + self.values[field.index()] = serde_json::Value::from(value); } #[inline] fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { - self.fields - .insert(field.name(), serde_json::Value::from(format!("{value:?}"))); + self.values[field.index()] = serde_json::Value::from(format!("{value:?}")); } #[inline] @@ -456,38 +459,33 @@ impl tracing::field::Visit for SpanFieldsRecor field: &tracing::field::Field, value: &(dyn std::error::Error + 'static), ) { - self.fields - .insert(field.name(), serde_json::Value::from(format!("{value}"))); + self.values[field.index()] = serde_json::Value::from(format!("{value}")); } } /// List of field indices skipped during logging. Can list duplicate fields or /// metafields not meant to be logged. -#[derive(Clone, Default)] +#[derive(Copy, Clone, Default)] struct SkippedFieldIndices { - bits: u64, + // 32-bits is large enough for `MAX_TRACING_FIELDS` + bits: u32, } impl SkippedFieldIndices { #[inline] - fn is_empty(&self) -> bool { + fn is_empty(self) -> bool { self.bits == 0 } #[inline] - fn push(&mut self, index: usize) { - self.bits |= 1u64 - .checked_shl(index as u32) - .expect("field index too large"); + fn set(&mut self, index: usize) { + debug_assert!(index <= 32, "index out of bounds of 32-bit set"); + self.bits |= 1 << index; } #[inline] - fn contains(&self, index: usize) -> bool { - self.bits - & 1u64 - .checked_shl(index as u32) - .expect("field index too large") - != 0 + fn contains(self, index: usize) -> bool { + self.bits & (1 << index) != 0 } } @@ -499,7 +497,7 @@ struct EventFormatter { impl EventFormatter { #[inline] - fn new() -> Self { + const fn new() -> Self { EventFormatter { logline_buffer: Vec::new(), } @@ -515,14 +513,13 @@ impl EventFormatter { self.logline_buffer.clear(); } - fn format( + fn format( &mut self, now: DateTime, event: &Event<'_>, ctx: &Context<'_, S>, - skipped_field_indices: &papaya::HashMap, - callsite_ids: &papaya::HashMap, - extract_fields: &IndexSet<&'static str>, + skipped_field_indices: &CallsiteMap, + extract_fields: &'static [&'static str], ) -> io::Result<()> where S: Subscriber + for<'a> LookupSpan<'a>, @@ -533,8 +530,11 @@ impl EventFormatter { let normalized_meta = event.normalized_metadata(); let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata()); - let skipped_field_indices = skipped_field_indices.pin(); - let skipped_field_indices = skipped_field_indices.get(&meta.callsite()); + let skipped_field_indices = skipped_field_indices + .pin() + .get(&meta.callsite()) + .copied() + .unwrap_or_default(); let mut serialize = || { let mut serializer = serde_json::Serializer::new(&mut self.logline_buffer); @@ -565,9 +565,11 @@ impl EventFormatter { } let spans = SerializableSpans { - ctx, - callsite_ids, - extract: ExtractedSpanFields::<'_, F>::new(extract_fields), + // collect all spans from parent to root. + spans: ctx + .event_span(event) + .map_or(vec![], |parent| parent.scope().collect()), + extracted: ExtractedSpanFields::new(extract_fields), }; serializer.serialize_entry("spans", &spans)?; @@ -620,9 +622,9 @@ impl EventFormatter { } } - if spans.extract.has_values() { + if spans.extracted.has_values() { // TODO: add fields from event, too? - serializer.serialize_entry("extract", &spans.extract)?; + serializer.serialize_entry("extract", &spans.extracted)?; } serializer.end() @@ -635,15 +637,15 @@ impl EventFormatter { } /// Extracts the message field that's mixed will other fields. -struct MessageFieldExtractor<'a, S: serde::ser::SerializeMap> { +struct MessageFieldExtractor { serializer: S, - skipped_field_indices: Option<&'a SkippedFieldIndices>, + skipped_field_indices: SkippedFieldIndices, state: Option>, } -impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> { +impl MessageFieldExtractor { #[inline] - fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self { + fn new(serializer: S, skipped_field_indices: SkippedFieldIndices) -> Self { Self { serializer, skipped_field_indices, @@ -665,13 +667,11 @@ impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> { fn accept_field(&self, field: &tracing::field::Field) -> bool { self.state.is_none() && field.name() == MESSAGE_FIELD - && !self - .skipped_field_indices - .is_some_and(|i| i.contains(field.index())) + && !self.skipped_field_indices.contains(field.index()) } } -impl tracing::field::Visit for MessageFieldExtractor<'_, S> { +impl tracing::field::Visit for MessageFieldExtractor { #[inline] fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { if self.accept_field(field) { @@ -751,14 +751,14 @@ impl tracing::field::Visit for MessageFieldExtracto /// can be skipped. // This is entirely optional and only cosmetic, though maybe helps a // bit during log parsing in dashboards when there's no field with empty object. -struct FieldsPresent<'a>(pub bool, Option<&'a SkippedFieldIndices>); +struct FieldsPresent(pub bool, SkippedFieldIndices); // Even though some methods have an overhead (error, bytes) it is assumed the // compiler won't include this since we ignore the value entirely. -impl tracing::field::Visit for FieldsPresent<'_> { +impl tracing::field::Visit for FieldsPresent { #[inline] fn record_debug(&mut self, field: &tracing::field::Field, _: &dyn std::fmt::Debug) { - if !self.1.is_some_and(|i| i.contains(field.index())) + if !self.1.contains(field.index()) && field.name() != MESSAGE_FIELD && !field.name().starts_with("log.") { @@ -768,10 +768,7 @@ impl tracing::field::Visit for FieldsPresent<'_> { } /// Serializes the fields directly supplied with a log event. -struct SerializableEventFields<'a, 'event>( - &'a tracing::Event<'event>, - Option<&'a SkippedFieldIndices>, -); +struct SerializableEventFields<'a, 'event>(&'a tracing::Event<'event>, SkippedFieldIndices); impl serde::ser::Serialize for SerializableEventFields<'_, '_> { fn serialize(&self, serializer: S) -> Result @@ -788,15 +785,15 @@ impl serde::ser::Serialize for SerializableEventFields<'_, '_> { } /// A tracing field visitor that skips the message field. -struct MessageFieldSkipper<'a, S: serde::ser::SerializeMap> { +struct MessageFieldSkipper { serializer: S, - skipped_field_indices: Option<&'a SkippedFieldIndices>, + skipped_field_indices: SkippedFieldIndices, state: Result<(), S::Error>, } -impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> { +impl MessageFieldSkipper { #[inline] - fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self { + fn new(serializer: S, skipped_field_indices: SkippedFieldIndices) -> Self { Self { serializer, skipped_field_indices, @@ -809,9 +806,7 @@ impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> { self.state.is_ok() && field.name() != MESSAGE_FIELD && !field.name().starts_with("log.") - && !self - .skipped_field_indices - .is_some_and(|i| i.contains(field.index())) + && !self.skipped_field_indices.contains(field.index()) } #[inline] @@ -821,7 +816,7 @@ impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> { } } -impl tracing::field::Visit for MessageFieldSkipper<'_, S> { +impl tracing::field::Visit for MessageFieldSkipper { #[inline] fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { if self.accept_field(field) { @@ -905,18 +900,17 @@ impl tracing::field::Visit for MessageFieldSkipper< /// with the span names as keys. To prevent collision we append a numberic value /// to the name. Also, collects any span fields we're interested in. Last one /// wins. -struct SerializableSpans<'a, 'ctx, Span, const F: usize> +struct SerializableSpans<'ctx, S> where - Span: Subscriber + for<'lookup> LookupSpan<'lookup>, + S: for<'lookup> LookupSpan<'lookup>, { - ctx: &'a Context<'ctx, Span>, - callsite_ids: &'a papaya::HashMap, - extract: ExtractedSpanFields<'a, F>, + spans: Vec>, + extracted: ExtractedSpanFields, } -impl serde::ser::Serialize for SerializableSpans<'_, '_, Span, F> +impl serde::ser::Serialize for SerializableSpans<'_, S> where - Span: Subscriber + for<'lookup> LookupSpan<'lookup>, + S: for<'lookup> LookupSpan<'lookup>, { fn serialize(&self, serializer: Ser) -> Result where @@ -924,25 +918,22 @@ where { let mut serializer = serializer.serialize_map(None)?; - if let Some(leaf_span) = self.ctx.lookup_current() { - for span in leaf_span.scope().from_root() { - // Append a numeric callsite ID to the span name to keep the name unique - // in the JSON object. - let cid = self - .callsite_ids - .pin() - .get(&span.metadata().callsite()) - .copied() - .unwrap_or_default(); + for span in self.spans.iter().rev() { + let ext = span.extensions(); - // Loki turns the # into an underscore during field name concatenation. - serializer.serialize_key(&format_args!("{}#{}", span.metadata().name(), &cid))?; + // all spans should have this extension. + let Some(fields) = ext.get() else { continue }; - serializer.serialize_value(&SerializableSpanFields { - span: &span, - extract: &self.extract, - })?; - } + self.extracted.layer_span(fields); + + let SpanFields { values, span_info } = fields; + serializer.serialize_entry( + &*span_info.normalized_name, + &SerializableSpanFields { + fields: span.metadata().fields(), + values, + }, + )?; } serializer.end() @@ -950,80 +941,77 @@ where } /// Serializes the span fields as object. -struct SerializableSpanFields<'a, 'span, Span, const F: usize> -where - Span: for<'lookup> LookupSpan<'lookup>, -{ - span: &'a SpanRef<'span, Span>, - extract: &'a ExtractedSpanFields<'a, F>, +struct SerializableSpanFields<'span> { + fields: &'span tracing::field::FieldSet, + values: &'span [serde_json::Value; MAX_TRACING_FIELDS], } -impl serde::ser::Serialize for SerializableSpanFields<'_, '_, Span, F> -where - Span: for<'lookup> LookupSpan<'lookup>, -{ +impl serde::ser::Serialize for SerializableSpanFields<'_> { fn serialize(&self, serializer: S) -> Result where S: serde::ser::Serializer, { let mut serializer = serializer.serialize_map(None)?; - let ext = self.span.extensions(); - if let Some(data) = ext.get::() { - for (name, value) in &data.fields.pin() { - serializer.serialize_entry(name, value)?; - // TODO: replace clone with reference, if possible. - self.extract.set(name, value.clone()); + for (field, value) in std::iter::zip(self.fields, self.values) { + if value.is_null() { + continue; } + serializer.serialize_entry(field.name(), value)?; } serializer.end() } } -struct ExtractedSpanFields<'a, const F: usize> { - names: &'a IndexSet<&'static str>, - // TODO: replace TryLock with something local thread and interior mutability. - // serde API doesn't let us use `mut`. - values: TryLock<([Option; F], bool)>, +struct ExtractedSpanFields { + names: &'static [&'static str], + values: RefCell>, } -impl<'a, const F: usize> ExtractedSpanFields<'a, F> { - fn new(names: &'a IndexSet<&'static str>) -> Self { +impl ExtractedSpanFields { + fn new(names: &'static [&'static str]) -> Self { ExtractedSpanFields { names, - values: TryLock::new((array::from_fn(|_| Option::default()), false)), + values: RefCell::new(vec![serde_json::Value::Null; names.len()]), } } - #[inline] - fn set(&self, name: &'static str, value: serde_json::Value) { - if let Some((index, _)) = self.names.get_full(name) { - let mut fields = self.values.try_lock().expect("thread-local use"); - fields.0[index] = Some(value); - fields.1 = true; + fn layer_span(&self, fields: &SpanFields) { + let mut v = self.values.borrow_mut(); + let SpanFields { values, span_info } = fields; + + // extract the fields + for (i, &j) in span_info.extract.iter().enumerate() { + let Some(value) = values.get(j) else { continue }; + + if !value.is_null() { + // TODO: replace clone with reference, if possible. + v[i] = value.clone(); + } } } #[inline] fn has_values(&self) -> bool { - self.values.try_lock().expect("thread-local use").1 + self.values.borrow().iter().any(|v| !v.is_null()) } } -impl serde::ser::Serialize for ExtractedSpanFields<'_, F> { +impl serde::ser::Serialize for ExtractedSpanFields { fn serialize(&self, serializer: S) -> Result where S: serde::ser::Serializer, { let mut serializer = serializer.serialize_map(None)?; - let values = self.values.try_lock().expect("thread-local use"); - for (i, value) in values.0.iter().enumerate() { - if let Some(value) = value { - let key = self.names[i]; - serializer.serialize_entry(key, value)?; + let values = self.values.borrow(); + for (key, value) in std::iter::zip(self.names, &*values) { + if value.is_null() { + continue; } + + serializer.serialize_entry(key, value)?; } serializer.end() @@ -1032,7 +1020,6 @@ impl serde::ser::Serialize for ExtractedSpanFields<'_, F> { #[cfg(test)] mod tests { - use std::marker::PhantomData; use std::sync::{Arc, Mutex, MutexGuard}; use assert_json_diff::assert_json_eq; @@ -1081,10 +1068,9 @@ mod tests { let log_layer = JsonLoggingLayer { clock: clock.clone(), skipped_field_indices: papaya::HashMap::default(), - callsite_ids: papaya::HashMap::default(), + span_info: papaya::HashMap::default(), writer: buffer.clone(), - extract_fields: IndexSet::from_iter(["x"]), - _marker: PhantomData::<[&'static str; 1]>, + extract_fields: &["x"], }; let registry = tracing_subscriber::Registry::default().with(log_layer);