From f5c5b99b58876bec3f10395598d20b737bc03abc Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Sat, 17 May 2025 21:04:48 +0200 Subject: [PATCH] remove lasso from json logger, use field index for lookup --- proxy/src/logging.rs | 101 +++++++++++++++++++++++-------------------- 1 file changed, 54 insertions(+), 47 deletions(-) diff --git a/proxy/src/logging.rs b/proxy/src/logging.rs index b952543eb8..7d909ce43a 100644 --- a/proxy/src/logging.rs +++ b/proxy/src/logging.rs @@ -1,11 +1,10 @@ use std::cell::{Cell, RefCell}; use std::collections::HashMap; -use std::hash::BuildHasher; use std::sync::atomic::{AtomicU32, Ordering}; use std::{array, env, fmt, io}; use chrono::{DateTime, Utc}; -use lasso::ThreadedRodeo; +use itertools::Itertools; use opentelemetry::trace::TraceContextExt; use scopeguard::defer; use serde::ser::{SerializeMap, Serializer}; @@ -21,8 +20,6 @@ use tracing_subscriber::prelude::*; use tracing_subscriber::registry::{LookupSpan, SpanRef}; use try_lock::TryLock; -type FieldNameHasher = std::hash::BuildHasherDefault; - /// Initialize logging and OpenTelemetry tracing and exporter. /// /// Logging can be configured using `RUST_LOG` environment variable. @@ -202,30 +199,33 @@ struct JsonLoggingLayer { state: JsonLoggingState, } +/// Map for values fixed at callsite registration. +// We use papaya here because registration rarely happens post-startup. +// papaya is good for read-heavy workloads. +// +// We use rustc_hash here because callsite::Identifier will always be an integer with low-bit entropy, +// since it's always a pointer to static mutable data. rustc_hash was designed for low-bit entropy. +type CallsiteMap = + papaya::HashMap>; + struct JsonLoggingState { - // Using MiniSpur only supports 65534 field names. - // This does not feel like a limitation to me. - spans: ThreadedRodeo, /// tracks which fields of each **event** are duplicates - skipped_field_indices: - papaya::HashMap, - /// tracks the fixed "callsite ID" for each span. + skipped_field_indices: CallsiteMap, + /// tracks the fixed "callsite ID" for each **span**. /// note: this is not stable between runs. - callsite_ids: papaya::HashMap, + callsite_ids: CallsiteMap, + + /// Fields we want to keep track of in a separate json object. // We use a const generic and arrays to bypass one heap allocation. - extract_fields: [lasso::MiniSpur; F], + extract_fields: [&'static str; F], } impl JsonLoggingLayer { fn new(clock: C, writer: W, extract_fields: [&'static str; F]) -> Self { - let spans = ThreadedRodeo::with_hasher(FieldNameHasher::default()); - let extract_fields = extract_fields.map(|field| spans.get_or_intern_static(field)); - JsonLoggingLayer { clock, writer, state: JsonLoggingState { - spans, skipped_field_indices: papaya::HashMap::default(), callsite_ids: papaya::HashMap::default(), extract_fields, @@ -282,9 +282,8 @@ where /// Registers a SpanFields instance as span extension. fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) { let span = ctx.span(id).expect("span must exist"); - let mut fields = SpanFields::default(); - fields.record_fields(attrs.values().len(), attrs, &self.state.spans); + let fields = SpanFields::new(attrs); span.extensions_mut().insert(fields); } @@ -295,7 +294,7 @@ where // assumption: a span being updated by one thread, // and formatted by another thread is even rarer. if let Some(data) = span.extensions_mut().get_mut::() { - data.record_fields(values.len(), values, &self.state.spans); + data.record_fields(values); } } @@ -314,7 +313,7 @@ where } let mut field_indices = SkippedFieldIndices::default(); - let mut seen_fields = HashMap::<&'static str, usize, FieldNameHasher>::default(); + let mut seen_fields = HashMap::new(); for field in metadata.fields() { use std::collections::hash_map::Entry; match seen_fields.entry(field.name()) { @@ -363,43 +362,50 @@ impl fmt::Display for CallsiteId { } /// Stores span field values recorded during the spans lifetime. -#[derive(Default)] struct SpanFields { // TODO: Switch to custom json enum? - fields: HashMap, + fields: Vec<(&'static str, serde_json::Value)>, } impl SpanFields { #[inline] - fn record_fields( - &mut self, - reserve: usize, - fields: R, - lasso: &ThreadedRodeo, - ) { - self.fields.reserve(reserve); + fn new(attrs: &span::Attributes<'_>) -> Self { + let mut fields = attrs + .fields() + .iter() + .map(|f| (f.name(), serde_json::Value::Null)) + .collect_vec(); + + attrs.values().record(&mut SpanFieldsRecorder { + fields: &mut fields, + }); + + Self { fields } + } + + #[inline] + fn record_fields(&mut self, fields: R) { fields.record(&mut SpanFieldsRecorder { - spans: lasso, fields: &mut self.fields, }); } } /// Implements a tracing field visitor to convert and store values. -struct SpanFieldsRecorder<'r, 'm, S> { - spans: &'r ThreadedRodeo, - fields: &'m mut HashMap, +struct SpanFieldsRecorder<'m> { + fields: &'m mut [(&'static str, serde_json::Value)], } -impl SpanFieldsRecorder<'_, '_, S> { +impl SpanFieldsRecorder<'_> { #[inline] fn record_value(&mut self, field: &tracing::field::Field, value: serde_json::Value) { - let name = self.spans.get_or_intern_static(field.name()); - self.fields.insert(name, value); + let f = &mut self.fields[field.index()]; + debug_assert_eq!(f.0, field.name()); + f.1 = value; } } -impl tracing::field::Visit for SpanFieldsRecorder<'_, '_, S> { +impl tracing::field::Visit for SpanFieldsRecorder<'_> { #[inline] fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { self.record_value(field, serde_json::Value::from(value)); @@ -971,8 +977,12 @@ where let ext = self.span.extensions(); if let Some(data) = ext.get::() { for (name, value) in &data.fields { - serializer.serialize_entry(self.extract.state.spans.resolve(name), value)?; - self.extract.set(*name, value); + if value.is_null() { + continue; + } + + serializer.serialize_entry(name, value)?; + self.extract.set(name, value); } } @@ -996,7 +1006,7 @@ impl<'a, const F: usize> ExtractedSpanFields<'a, F> { } #[inline] - fn set(&self, name: lasso::MiniSpur, value: &serde_json::Value) { + fn set(&self, name: &'static str, value: &serde_json::Value) { // expectation: this list is small, so a linear search is fast. if let Some(index) = self.state.extract_fields.iter().position(|x| *x == name) { let mut fields = self.values.try_lock().expect("thread-local use"); @@ -1023,7 +1033,7 @@ impl serde::ser::Serialize for ExtractedSpanFields<'_, F> { for (i, value) in values.0.iter().enumerate() { if let Some(value) = value { let key = self.state.extract_fields[i]; - serializer.serialize_entry(self.state.spans.resolve(&key), value)?; + serializer.serialize_entry(key, value)?; } } @@ -1078,16 +1088,13 @@ mod tests { current_time: Mutex::new(Utc::now()), }); let buffer = Arc::new(Mutex::new(Vec::new())); - let spans = ThreadedRodeo::with_hasher(FieldNameHasher::default()); - let field_x = spans.get_or_intern_static("x"); let log_layer = JsonLoggingLayer { clock: clock.clone(), writer: buffer.clone(), state: JsonLoggingState { - spans, - skipped_field_indices: papaya::HashMap::default(), - callsite_ids: papaya::HashMap::default(), - extract_fields: [field_x], + skipped_field_indices: CallsiteMap::default(), + callsite_ids: CallsiteMap::default(), + extract_fields: ["x"], }, };