mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
remove lasso from json logger, use field index for lookup
This commit is contained in:
@@ -1,11 +1,10 @@
|
||||
use std::cell::{Cell, RefCell};
|
||||
use std::collections::HashMap;
|
||||
use std::hash::BuildHasher;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::{array, env, fmt, io};
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use lasso::ThreadedRodeo;
|
||||
use itertools::Itertools;
|
||||
use opentelemetry::trace::TraceContextExt;
|
||||
use scopeguard::defer;
|
||||
use serde::ser::{SerializeMap, Serializer};
|
||||
@@ -21,8 +20,6 @@ use tracing_subscriber::prelude::*;
|
||||
use tracing_subscriber::registry::{LookupSpan, SpanRef};
|
||||
use try_lock::TryLock;
|
||||
|
||||
type FieldNameHasher = std::hash::BuildHasherDefault<rustc_hash::FxHasher>;
|
||||
|
||||
/// Initialize logging and OpenTelemetry tracing and exporter.
|
||||
///
|
||||
/// Logging can be configured using `RUST_LOG` environment variable.
|
||||
@@ -202,30 +199,33 @@ struct JsonLoggingLayer<C: Clock, W: MakeWriter, const F: usize> {
|
||||
state: JsonLoggingState<F>,
|
||||
}
|
||||
|
||||
/// Map for values fixed at callsite registration.
|
||||
// We use papaya here because registration rarely happens post-startup.
|
||||
// papaya is good for read-heavy workloads.
|
||||
//
|
||||
// We use rustc_hash here because callsite::Identifier will always be an integer with low-bit entropy,
|
||||
// since it's always a pointer to static mutable data. rustc_hash was designed for low-bit entropy.
|
||||
type CallsiteMap<T> =
|
||||
papaya::HashMap<callsite::Identifier, T, std::hash::BuildHasherDefault<rustc_hash::FxHasher>>;
|
||||
|
||||
struct JsonLoggingState<const F: usize> {
|
||||
// Using MiniSpur only supports 65534 field names.
|
||||
// This does not feel like a limitation to me.
|
||||
spans: ThreadedRodeo<lasso::MiniSpur, FieldNameHasher>,
|
||||
/// tracks which fields of each **event** are duplicates
|
||||
skipped_field_indices:
|
||||
papaya::HashMap<callsite::Identifier, SkippedFieldIndices, FieldNameHasher>,
|
||||
/// tracks the fixed "callsite ID" for each span.
|
||||
skipped_field_indices: CallsiteMap<SkippedFieldIndices>,
|
||||
/// tracks the fixed "callsite ID" for each **span**.
|
||||
/// note: this is not stable between runs.
|
||||
callsite_ids: papaya::HashMap<callsite::Identifier, CallsiteId, FieldNameHasher>,
|
||||
callsite_ids: CallsiteMap<CallsiteId>,
|
||||
|
||||
/// Fields we want to keep track of in a separate json object.
|
||||
// We use a const generic and arrays to bypass one heap allocation.
|
||||
extract_fields: [lasso::MiniSpur; F],
|
||||
extract_fields: [&'static str; F],
|
||||
}
|
||||
|
||||
impl<C: Clock, W: MakeWriter, const F: usize> JsonLoggingLayer<C, W, F> {
|
||||
fn new(clock: C, writer: W, extract_fields: [&'static str; F]) -> Self {
|
||||
let spans = ThreadedRodeo::with_hasher(FieldNameHasher::default());
|
||||
let extract_fields = extract_fields.map(|field| spans.get_or_intern_static(field));
|
||||
|
||||
JsonLoggingLayer {
|
||||
clock,
|
||||
writer,
|
||||
state: JsonLoggingState {
|
||||
spans,
|
||||
skipped_field_indices: papaya::HashMap::default(),
|
||||
callsite_ids: papaya::HashMap::default(),
|
||||
extract_fields,
|
||||
@@ -282,9 +282,8 @@ where
|
||||
/// Registers a SpanFields instance as span extension.
|
||||
fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
|
||||
let span = ctx.span(id).expect("span must exist");
|
||||
let mut fields = SpanFields::default();
|
||||
fields.record_fields(attrs.values().len(), attrs, &self.state.spans);
|
||||
|
||||
let fields = SpanFields::new(attrs);
|
||||
span.extensions_mut().insert(fields);
|
||||
}
|
||||
|
||||
@@ -295,7 +294,7 @@ where
|
||||
// assumption: a span being updated by one thread,
|
||||
// and formatted by another thread is even rarer.
|
||||
if let Some(data) = span.extensions_mut().get_mut::<SpanFields>() {
|
||||
data.record_fields(values.len(), values, &self.state.spans);
|
||||
data.record_fields(values);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -314,7 +313,7 @@ where
|
||||
}
|
||||
|
||||
let mut field_indices = SkippedFieldIndices::default();
|
||||
let mut seen_fields = HashMap::<&'static str, usize, FieldNameHasher>::default();
|
||||
let mut seen_fields = HashMap::new();
|
||||
for field in metadata.fields() {
|
||||
use std::collections::hash_map::Entry;
|
||||
match seen_fields.entry(field.name()) {
|
||||
@@ -363,43 +362,50 @@ impl fmt::Display for CallsiteId {
|
||||
}
|
||||
|
||||
/// Stores span field values recorded during the spans lifetime.
|
||||
#[derive(Default)]
|
||||
struct SpanFields {
|
||||
// TODO: Switch to custom json enum?
|
||||
fields: HashMap<lasso::MiniSpur, serde_json::Value, FieldNameHasher>,
|
||||
fields: Vec<(&'static str, serde_json::Value)>,
|
||||
}
|
||||
|
||||
impl SpanFields {
|
||||
#[inline]
|
||||
fn record_fields<R: tracing_subscriber::field::RecordFields>(
|
||||
&mut self,
|
||||
reserve: usize,
|
||||
fields: R,
|
||||
lasso: &ThreadedRodeo<lasso::MiniSpur, FieldNameHasher>,
|
||||
) {
|
||||
self.fields.reserve(reserve);
|
||||
fn new(attrs: &span::Attributes<'_>) -> Self {
|
||||
let mut fields = attrs
|
||||
.fields()
|
||||
.iter()
|
||||
.map(|f| (f.name(), serde_json::Value::Null))
|
||||
.collect_vec();
|
||||
|
||||
attrs.values().record(&mut SpanFieldsRecorder {
|
||||
fields: &mut fields,
|
||||
});
|
||||
|
||||
Self { fields }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_fields<R: tracing_subscriber::field::RecordFields>(&mut self, fields: R) {
|
||||
fields.record(&mut SpanFieldsRecorder {
|
||||
spans: lasso,
|
||||
fields: &mut self.fields,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Implements a tracing field visitor to convert and store values.
|
||||
struct SpanFieldsRecorder<'r, 'm, S> {
|
||||
spans: &'r ThreadedRodeo<lasso::MiniSpur, FieldNameHasher>,
|
||||
fields: &'m mut HashMap<lasso::MiniSpur, serde_json::Value, S>,
|
||||
struct SpanFieldsRecorder<'m> {
|
||||
fields: &'m mut [(&'static str, serde_json::Value)],
|
||||
}
|
||||
|
||||
impl<S: BuildHasher> SpanFieldsRecorder<'_, '_, S> {
|
||||
impl SpanFieldsRecorder<'_> {
|
||||
#[inline]
|
||||
fn record_value(&mut self, field: &tracing::field::Field, value: serde_json::Value) {
|
||||
let name = self.spans.get_or_intern_static(field.name());
|
||||
self.fields.insert(name, value);
|
||||
let f = &mut self.fields[field.index()];
|
||||
debug_assert_eq!(f.0, field.name());
|
||||
f.1 = value;
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: BuildHasher> tracing::field::Visit for SpanFieldsRecorder<'_, '_, S> {
|
||||
impl tracing::field::Visit for SpanFieldsRecorder<'_> {
|
||||
#[inline]
|
||||
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
|
||||
self.record_value(field, serde_json::Value::from(value));
|
||||
@@ -971,8 +977,12 @@ where
|
||||
let ext = self.span.extensions();
|
||||
if let Some(data) = ext.get::<SpanFields>() {
|
||||
for (name, value) in &data.fields {
|
||||
serializer.serialize_entry(self.extract.state.spans.resolve(name), value)?;
|
||||
self.extract.set(*name, value);
|
||||
if value.is_null() {
|
||||
continue;
|
||||
}
|
||||
|
||||
serializer.serialize_entry(name, value)?;
|
||||
self.extract.set(name, value);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -996,7 +1006,7 @@ impl<'a, const F: usize> ExtractedSpanFields<'a, F> {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn set(&self, name: lasso::MiniSpur, value: &serde_json::Value) {
|
||||
fn set(&self, name: &'static str, value: &serde_json::Value) {
|
||||
// expectation: this list is small, so a linear search is fast.
|
||||
if let Some(index) = self.state.extract_fields.iter().position(|x| *x == name) {
|
||||
let mut fields = self.values.try_lock().expect("thread-local use");
|
||||
@@ -1023,7 +1033,7 @@ impl<const F: usize> serde::ser::Serialize for ExtractedSpanFields<'_, F> {
|
||||
for (i, value) in values.0.iter().enumerate() {
|
||||
if let Some(value) = value {
|
||||
let key = self.state.extract_fields[i];
|
||||
serializer.serialize_entry(self.state.spans.resolve(&key), value)?;
|
||||
serializer.serialize_entry(key, value)?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1078,16 +1088,13 @@ mod tests {
|
||||
current_time: Mutex::new(Utc::now()),
|
||||
});
|
||||
let buffer = Arc::new(Mutex::new(Vec::new()));
|
||||
let spans = ThreadedRodeo::with_hasher(FieldNameHasher::default());
|
||||
let field_x = spans.get_or_intern_static("x");
|
||||
let log_layer = JsonLoggingLayer {
|
||||
clock: clock.clone(),
|
||||
writer: buffer.clone(),
|
||||
state: JsonLoggingState {
|
||||
spans,
|
||||
skipped_field_indices: papaya::HashMap::default(),
|
||||
callsite_ids: papaya::HashMap::default(),
|
||||
extract_fields: [field_x],
|
||||
skipped_field_indices: CallsiteMap::default(),
|
||||
callsite_ids: CallsiteMap::default(),
|
||||
extract_fields: ["x"],
|
||||
},
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user