mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 21:42:56 +00:00
proxy(logging): significant changes to json logging internals for performance. (#11974)
#11962 Please review each commit separately. Each commit is rather small in goal. The overall goal of this PR is to keep the behaviour identical, but shave away small inefficiencies here and there.
This commit is contained in:
@@ -1,13 +1,11 @@
|
||||
use std::cell::{Cell, RefCell};
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use std::hash::BuildHasher;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::{array, env, fmt, io};
|
||||
use std::{env, io};
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use indexmap::IndexSet;
|
||||
use opentelemetry::trace::TraceContextExt;
|
||||
use scopeguard::defer;
|
||||
use serde::ser::{SerializeMap, Serializer};
|
||||
use tracing::subscriber::Interest;
|
||||
use tracing::{Event, Metadata, Span, Subscriber, callsite, span};
|
||||
@@ -19,7 +17,6 @@ use tracing_subscriber::fmt::{FormatEvent, FormatFields};
|
||||
use tracing_subscriber::layer::{Context, Layer};
|
||||
use tracing_subscriber::prelude::*;
|
||||
use tracing_subscriber::registry::{LookupSpan, SpanRef};
|
||||
use try_lock::TryLock;
|
||||
|
||||
/// Initialize logging and OpenTelemetry tracing and exporter.
|
||||
///
|
||||
@@ -55,7 +52,7 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
|
||||
StderrWriter {
|
||||
stderr: std::io::stderr(),
|
||||
},
|
||||
["request_id", "session_id", "conn_id"],
|
||||
&["request_id", "session_id", "conn_id"],
|
||||
))
|
||||
} else {
|
||||
None
|
||||
@@ -183,50 +180,65 @@ impl Clock for RealClock {
|
||||
/// Name of the field used by tracing crate to store the event message.
|
||||
const MESSAGE_FIELD: &str = "message";
|
||||
|
||||
/// Tracing used to enforce that spans/events have no more than 32 fields.
|
||||
/// It seems this is no longer the case, but it's still documented in some places.
|
||||
/// Generally, we shouldn't expect more than 32 fields anyway, so we can try and
|
||||
/// rely on it for some (minor) performance gains.
|
||||
const MAX_TRACING_FIELDS: usize = 32;
|
||||
|
||||
thread_local! {
|
||||
/// Protects against deadlocks and double panics during log writing.
|
||||
/// The current panic handler will use tracing to log panic information.
|
||||
static REENTRANCY_GUARD: Cell<bool> = const { Cell::new(false) };
|
||||
/// Thread-local instance with per-thread buffer for log writing.
|
||||
static EVENT_FORMATTER: RefCell<EventFormatter> = RefCell::new(EventFormatter::new());
|
||||
static EVENT_FORMATTER: RefCell<EventFormatter> = const { RefCell::new(EventFormatter::new()) };
|
||||
/// Cached OS thread ID.
|
||||
static THREAD_ID: u64 = gettid::gettid();
|
||||
}
|
||||
|
||||
/// Map for values fixed at callsite registration.
|
||||
// We use papaya here because registration rarely happens post-startup.
|
||||
// papaya is good for read-heavy workloads.
|
||||
//
|
||||
// We use rustc_hash here because callsite::Identifier will always be an integer with low-bit entropy,
|
||||
// since it's always a pointer to static mutable data. rustc_hash was designed for low-bit entropy.
|
||||
type CallsiteMap<T> =
|
||||
papaya::HashMap<callsite::Identifier, T, std::hash::BuildHasherDefault<rustc_hash::FxHasher>>;
|
||||
|
||||
/// Implements tracing layer to handle events specific to logging.
|
||||
struct JsonLoggingLayer<C: Clock, W: MakeWriter, const F: usize> {
|
||||
struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
|
||||
clock: C,
|
||||
skipped_field_indices: papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
|
||||
callsite_ids: papaya::HashMap<callsite::Identifier, CallsiteId>,
|
||||
writer: W,
|
||||
// We use a const generic and arrays to bypass one heap allocation.
|
||||
extract_fields: IndexSet<&'static str>,
|
||||
_marker: std::marker::PhantomData<[&'static str; F]>,
|
||||
|
||||
/// tracks which fields of each **event** are duplicates
|
||||
skipped_field_indices: CallsiteMap<SkippedFieldIndices>,
|
||||
|
||||
span_info: CallsiteMap<CallsiteSpanInfo>,
|
||||
|
||||
/// Fields we want to keep track of in a separate json object.
|
||||
extract_fields: &'static [&'static str],
|
||||
}
|
||||
|
||||
impl<C: Clock, W: MakeWriter, const F: usize> JsonLoggingLayer<C, W, F> {
|
||||
fn new(clock: C, writer: W, extract_fields: [&'static str; F]) -> Self {
|
||||
impl<C: Clock, W: MakeWriter> JsonLoggingLayer<C, W> {
|
||||
fn new(clock: C, writer: W, extract_fields: &'static [&'static str]) -> Self {
|
||||
JsonLoggingLayer {
|
||||
clock,
|
||||
skipped_field_indices: papaya::HashMap::default(),
|
||||
callsite_ids: papaya::HashMap::default(),
|
||||
skipped_field_indices: CallsiteMap::default(),
|
||||
span_info: CallsiteMap::default(),
|
||||
writer,
|
||||
extract_fields: IndexSet::from_iter(extract_fields),
|
||||
_marker: std::marker::PhantomData,
|
||||
extract_fields,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn callsite_id(&self, cs: callsite::Identifier) -> CallsiteId {
|
||||
*self
|
||||
.callsite_ids
|
||||
fn span_info(&self, metadata: &'static Metadata<'static>) -> CallsiteSpanInfo {
|
||||
self.span_info
|
||||
.pin()
|
||||
.get_or_insert_with(cs, CallsiteId::next)
|
||||
.get_or_insert_with(metadata.callsite(), || {
|
||||
CallsiteSpanInfo::new(metadata, self.extract_fields)
|
||||
})
|
||||
.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, C: Clock + 'static, W: MakeWriter + 'static, const F: usize> Layer<S>
|
||||
for JsonLoggingLayer<C, W, F>
|
||||
impl<S, C: Clock + 'static, W: MakeWriter + 'static> Layer<S> for JsonLoggingLayer<C, W>
|
||||
where
|
||||
S: Subscriber + for<'a> LookupSpan<'a>,
|
||||
{
|
||||
@@ -237,35 +249,25 @@ where
|
||||
// early, before OTel machinery, and add as event extension.
|
||||
let now = self.clock.now();
|
||||
|
||||
let res: io::Result<()> = REENTRANCY_GUARD.with(move |entered| {
|
||||
if entered.get() {
|
||||
let mut formatter = EventFormatter::new();
|
||||
formatter.format::<S, F>(
|
||||
now,
|
||||
event,
|
||||
&ctx,
|
||||
&self.skipped_field_indices,
|
||||
&self.callsite_ids,
|
||||
&self.extract_fields,
|
||||
)?;
|
||||
self.writer.make_writer().write_all(formatter.buffer())
|
||||
} else {
|
||||
entered.set(true);
|
||||
defer!(entered.set(false););
|
||||
let res: io::Result<()> = EVENT_FORMATTER.with(|f| {
|
||||
let mut borrow = f.try_borrow_mut();
|
||||
let formatter = match borrow.as_deref_mut() {
|
||||
Ok(formatter) => formatter,
|
||||
// If the thread local formatter is borrowed,
|
||||
// then we likely hit an edge case were we panicked during formatting.
|
||||
// We allow the logging to proceed with an uncached formatter.
|
||||
Err(_) => &mut EventFormatter::new(),
|
||||
};
|
||||
|
||||
EVENT_FORMATTER.with_borrow_mut(move |formatter| {
|
||||
formatter.reset();
|
||||
formatter.format::<S, F>(
|
||||
now,
|
||||
event,
|
||||
&ctx,
|
||||
&self.skipped_field_indices,
|
||||
&self.callsite_ids,
|
||||
&self.extract_fields,
|
||||
)?;
|
||||
self.writer.make_writer().write_all(formatter.buffer())
|
||||
})
|
||||
}
|
||||
formatter.reset();
|
||||
formatter.format(
|
||||
now,
|
||||
event,
|
||||
&ctx,
|
||||
&self.skipped_field_indices,
|
||||
self.extract_fields,
|
||||
)?;
|
||||
self.writer.make_writer().write_all(formatter.buffer())
|
||||
});
|
||||
|
||||
// In case logging fails we generate a simpler JSON object.
|
||||
@@ -287,50 +289,48 @@ where
|
||||
/// Registers a SpanFields instance as span extension.
|
||||
fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
|
||||
let span = ctx.span(id).expect("span must exist");
|
||||
let fields = SpanFields::default();
|
||||
fields.record_fields(attrs);
|
||||
|
||||
// This could deadlock when there's a panic somewhere in the tracing
|
||||
// event handling and a read or write guard is still held. This includes
|
||||
// the OTel subscriber.
|
||||
let mut exts = span.extensions_mut();
|
||||
let mut fields = SpanFields::new(self.span_info(span.metadata()));
|
||||
attrs.record(&mut fields);
|
||||
|
||||
exts.insert(fields);
|
||||
// This is a new span: the extensions should not be locked
|
||||
// unless some layer spawned a thread to process this span.
|
||||
// I don't think any layers do that.
|
||||
span.extensions_mut().insert(fields);
|
||||
}
|
||||
|
||||
fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
|
||||
let span = ctx.span(id).expect("span must exist");
|
||||
let ext = span.extensions();
|
||||
if let Some(data) = ext.get::<SpanFields>() {
|
||||
data.record_fields(values);
|
||||
|
||||
// assumption: `on_record` is rarely called.
|
||||
// assumption: a span being updated by one thread,
|
||||
// and formatted by another thread is even rarer.
|
||||
let mut ext = span.extensions_mut();
|
||||
if let Some(fields) = ext.get_mut::<SpanFields>() {
|
||||
values.record(fields);
|
||||
}
|
||||
}
|
||||
|
||||
/// Called (lazily) whenever a new log call is executed. We quickly check
|
||||
/// for duplicate field names and record duplicates as skippable. Last one
|
||||
/// wins.
|
||||
/// Called (lazily) roughly once per event/span instance. We quickly check
|
||||
/// for duplicate field names and record duplicates as skippable. Last field wins.
|
||||
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
|
||||
debug_assert!(
|
||||
metadata.fields().len() <= MAX_TRACING_FIELDS,
|
||||
"callsite {metadata:?} has too many fields."
|
||||
);
|
||||
|
||||
if !metadata.is_event() {
|
||||
self.callsite_id(metadata.callsite());
|
||||
// register the span info.
|
||||
self.span_info(metadata);
|
||||
// Must not be never because we wouldn't get trace and span data.
|
||||
return Interest::always();
|
||||
}
|
||||
|
||||
let mut field_indices = SkippedFieldIndices::default();
|
||||
let mut seen_fields = HashMap::<&'static str, usize>::new();
|
||||
let mut seen_fields = HashMap::new();
|
||||
for field in metadata.fields() {
|
||||
use std::collections::hash_map::Entry;
|
||||
match seen_fields.entry(field.name()) {
|
||||
Entry::Vacant(entry) => {
|
||||
// field not seen yet
|
||||
entry.insert(field.index());
|
||||
}
|
||||
Entry::Occupied(mut entry) => {
|
||||
// replace currently stored index
|
||||
let old_index = entry.insert(field.index());
|
||||
// ... and append it to list of skippable indices
|
||||
field_indices.push(old_index);
|
||||
}
|
||||
if let Some(old_index) = seen_fields.insert(field.name(), field.index()) {
|
||||
field_indices.set(old_index);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -344,110 +344,113 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, Default)]
|
||||
#[repr(transparent)]
|
||||
struct CallsiteId(u32);
|
||||
/// Any span info that is fixed to a particular callsite. Not variable between span instances.
|
||||
#[derive(Clone)]
|
||||
struct CallsiteSpanInfo {
|
||||
/// index of each field to extract. usize::MAX if not found.
|
||||
extract: Arc<[usize]>,
|
||||
|
||||
impl CallsiteId {
|
||||
#[inline]
|
||||
fn next() -> Self {
|
||||
// Start at 1 to reserve 0 for default.
|
||||
static COUNTER: AtomicU32 = AtomicU32::new(1);
|
||||
CallsiteId(COUNTER.fetch_add(1, Ordering::Relaxed))
|
||||
}
|
||||
/// tracks the fixed "callsite ID" for each span.
|
||||
/// note: this is not stable between runs.
|
||||
normalized_name: Arc<str>,
|
||||
}
|
||||
|
||||
impl fmt::Display for CallsiteId {
|
||||
#[inline]
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
self.0.fmt(f)
|
||||
impl CallsiteSpanInfo {
|
||||
fn new(metadata: &'static Metadata<'static>, extract_fields: &[&'static str]) -> Self {
|
||||
// Start at 1 to reserve 0 for default.
|
||||
static COUNTER: AtomicU32 = AtomicU32::new(1);
|
||||
|
||||
let names: Vec<&'static str> = metadata.fields().iter().map(|f| f.name()).collect();
|
||||
|
||||
// get all the indices of span fields we want to focus
|
||||
let extract = extract_fields
|
||||
.iter()
|
||||
// use rposition, since we want last match wins.
|
||||
.map(|f1| names.iter().rposition(|f2| f1 == f2).unwrap_or(usize::MAX))
|
||||
.collect();
|
||||
|
||||
// normalized_name is unique for each callsite, but it is not
|
||||
// unified across separate proxy instances.
|
||||
// todo: can we do better here?
|
||||
let cid = COUNTER.fetch_add(1, Ordering::Relaxed);
|
||||
let normalized_name = format!("{}#{cid}", metadata.name()).into();
|
||||
|
||||
Self {
|
||||
extract,
|
||||
normalized_name,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Stores span field values recorded during the spans lifetime.
|
||||
#[derive(Default)]
|
||||
struct SpanFields {
|
||||
// TODO: Switch to custom enum with lasso::Spur for Strings?
|
||||
fields: papaya::HashMap<&'static str, serde_json::Value>,
|
||||
values: [serde_json::Value; MAX_TRACING_FIELDS],
|
||||
|
||||
/// cached span info so we can avoid extra hashmap lookups in the hot path.
|
||||
span_info: CallsiteSpanInfo,
|
||||
}
|
||||
|
||||
impl SpanFields {
|
||||
#[inline]
|
||||
fn record_fields<R: tracing_subscriber::field::RecordFields>(&self, fields: R) {
|
||||
fields.record(&mut SpanFieldsRecorder {
|
||||
fields: self.fields.pin(),
|
||||
});
|
||||
fn new(span_info: CallsiteSpanInfo) -> Self {
|
||||
Self {
|
||||
span_info,
|
||||
values: [const { serde_json::Value::Null }; MAX_TRACING_FIELDS],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Implements a tracing field visitor to convert and store values.
|
||||
struct SpanFieldsRecorder<'m, S, G> {
|
||||
fields: papaya::HashMapRef<'m, &'static str, serde_json::Value, S, G>,
|
||||
}
|
||||
|
||||
impl<S: BuildHasher, G: papaya::Guard> tracing::field::Visit for SpanFieldsRecorder<'_, S, G> {
|
||||
impl tracing::field::Visit for SpanFields {
|
||||
#[inline]
|
||||
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
self.values[field.index()] = serde_json::Value::from(value);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
self.values[field.index()] = serde_json::Value::from(value);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
self.values[field.index()] = serde_json::Value::from(value);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
|
||||
if let Ok(value) = i64::try_from(value) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
self.values[field.index()] = serde_json::Value::from(value);
|
||||
} else {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(format!("{value}")));
|
||||
self.values[field.index()] = serde_json::Value::from(format!("{value}"));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
|
||||
if let Ok(value) = u64::try_from(value) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
self.values[field.index()] = serde_json::Value::from(value);
|
||||
} else {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(format!("{value}")));
|
||||
self.values[field.index()] = serde_json::Value::from(format!("{value}"));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
self.values[field.index()] = serde_json::Value::from(value);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
self.values[field.index()] = serde_json::Value::from(value);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
self.values[field.index()] = serde_json::Value::from(value);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(format!("{value:?}")));
|
||||
self.values[field.index()] = serde_json::Value::from(format!("{value:?}"));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@@ -456,38 +459,33 @@ impl<S: BuildHasher, G: papaya::Guard> tracing::field::Visit for SpanFieldsRecor
|
||||
field: &tracing::field::Field,
|
||||
value: &(dyn std::error::Error + 'static),
|
||||
) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(format!("{value}")));
|
||||
self.values[field.index()] = serde_json::Value::from(format!("{value}"));
|
||||
}
|
||||
}
|
||||
|
||||
/// List of field indices skipped during logging. Can list duplicate fields or
|
||||
/// metafields not meant to be logged.
|
||||
#[derive(Clone, Default)]
|
||||
#[derive(Copy, Clone, Default)]
|
||||
struct SkippedFieldIndices {
|
||||
bits: u64,
|
||||
// 32-bits is large enough for `MAX_TRACING_FIELDS`
|
||||
bits: u32,
|
||||
}
|
||||
|
||||
impl SkippedFieldIndices {
|
||||
#[inline]
|
||||
fn is_empty(&self) -> bool {
|
||||
fn is_empty(self) -> bool {
|
||||
self.bits == 0
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn push(&mut self, index: usize) {
|
||||
self.bits |= 1u64
|
||||
.checked_shl(index as u32)
|
||||
.expect("field index too large");
|
||||
fn set(&mut self, index: usize) {
|
||||
debug_assert!(index <= 32, "index out of bounds of 32-bit set");
|
||||
self.bits |= 1 << index;
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn contains(&self, index: usize) -> bool {
|
||||
self.bits
|
||||
& 1u64
|
||||
.checked_shl(index as u32)
|
||||
.expect("field index too large")
|
||||
!= 0
|
||||
fn contains(self, index: usize) -> bool {
|
||||
self.bits & (1 << index) != 0
|
||||
}
|
||||
}
|
||||
|
||||
@@ -499,7 +497,7 @@ struct EventFormatter {
|
||||
|
||||
impl EventFormatter {
|
||||
#[inline]
|
||||
fn new() -> Self {
|
||||
const fn new() -> Self {
|
||||
EventFormatter {
|
||||
logline_buffer: Vec::new(),
|
||||
}
|
||||
@@ -515,14 +513,13 @@ impl EventFormatter {
|
||||
self.logline_buffer.clear();
|
||||
}
|
||||
|
||||
fn format<S, const F: usize>(
|
||||
fn format<S>(
|
||||
&mut self,
|
||||
now: DateTime<Utc>,
|
||||
event: &Event<'_>,
|
||||
ctx: &Context<'_, S>,
|
||||
skipped_field_indices: &papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
|
||||
callsite_ids: &papaya::HashMap<callsite::Identifier, CallsiteId>,
|
||||
extract_fields: &IndexSet<&'static str>,
|
||||
skipped_field_indices: &CallsiteMap<SkippedFieldIndices>,
|
||||
extract_fields: &'static [&'static str],
|
||||
) -> io::Result<()>
|
||||
where
|
||||
S: Subscriber + for<'a> LookupSpan<'a>,
|
||||
@@ -533,8 +530,11 @@ impl EventFormatter {
|
||||
let normalized_meta = event.normalized_metadata();
|
||||
let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());
|
||||
|
||||
let skipped_field_indices = skipped_field_indices.pin();
|
||||
let skipped_field_indices = skipped_field_indices.get(&meta.callsite());
|
||||
let skipped_field_indices = skipped_field_indices
|
||||
.pin()
|
||||
.get(&meta.callsite())
|
||||
.copied()
|
||||
.unwrap_or_default();
|
||||
|
||||
let mut serialize = || {
|
||||
let mut serializer = serde_json::Serializer::new(&mut self.logline_buffer);
|
||||
@@ -565,9 +565,11 @@ impl EventFormatter {
|
||||
}
|
||||
|
||||
let spans = SerializableSpans {
|
||||
ctx,
|
||||
callsite_ids,
|
||||
extract: ExtractedSpanFields::<'_, F>::new(extract_fields),
|
||||
// collect all spans from parent to root.
|
||||
spans: ctx
|
||||
.event_span(event)
|
||||
.map_or(vec![], |parent| parent.scope().collect()),
|
||||
extracted: ExtractedSpanFields::new(extract_fields),
|
||||
};
|
||||
serializer.serialize_entry("spans", &spans)?;
|
||||
|
||||
@@ -620,9 +622,9 @@ impl EventFormatter {
|
||||
}
|
||||
}
|
||||
|
||||
if spans.extract.has_values() {
|
||||
if spans.extracted.has_values() {
|
||||
// TODO: add fields from event, too?
|
||||
serializer.serialize_entry("extract", &spans.extract)?;
|
||||
serializer.serialize_entry("extract", &spans.extracted)?;
|
||||
}
|
||||
|
||||
serializer.end()
|
||||
@@ -635,15 +637,15 @@ impl EventFormatter {
|
||||
}
|
||||
|
||||
/// Extracts the message field that's mixed will other fields.
|
||||
struct MessageFieldExtractor<'a, S: serde::ser::SerializeMap> {
|
||||
struct MessageFieldExtractor<S: serde::ser::SerializeMap> {
|
||||
serializer: S,
|
||||
skipped_field_indices: Option<&'a SkippedFieldIndices>,
|
||||
skipped_field_indices: SkippedFieldIndices,
|
||||
state: Option<Result<(), S::Error>>,
|
||||
}
|
||||
|
||||
impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> {
|
||||
impl<S: serde::ser::SerializeMap> MessageFieldExtractor<S> {
|
||||
#[inline]
|
||||
fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
|
||||
fn new(serializer: S, skipped_field_indices: SkippedFieldIndices) -> Self {
|
||||
Self {
|
||||
serializer,
|
||||
skipped_field_indices,
|
||||
@@ -665,13 +667,11 @@ impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> {
|
||||
fn accept_field(&self, field: &tracing::field::Field) -> bool {
|
||||
self.state.is_none()
|
||||
&& field.name() == MESSAGE_FIELD
|
||||
&& !self
|
||||
.skipped_field_indices
|
||||
.is_some_and(|i| i.contains(field.index()))
|
||||
&& !self.skipped_field_indices.contains(field.index())
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtractor<'_, S> {
|
||||
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtractor<S> {
|
||||
#[inline]
|
||||
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
|
||||
if self.accept_field(field) {
|
||||
@@ -751,14 +751,14 @@ impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtracto
|
||||
/// can be skipped.
|
||||
// This is entirely optional and only cosmetic, though maybe helps a
|
||||
// bit during log parsing in dashboards when there's no field with empty object.
|
||||
struct FieldsPresent<'a>(pub bool, Option<&'a SkippedFieldIndices>);
|
||||
struct FieldsPresent(pub bool, SkippedFieldIndices);
|
||||
|
||||
// Even though some methods have an overhead (error, bytes) it is assumed the
|
||||
// compiler won't include this since we ignore the value entirely.
|
||||
impl tracing::field::Visit for FieldsPresent<'_> {
|
||||
impl tracing::field::Visit for FieldsPresent {
|
||||
#[inline]
|
||||
fn record_debug(&mut self, field: &tracing::field::Field, _: &dyn std::fmt::Debug) {
|
||||
if !self.1.is_some_and(|i| i.contains(field.index()))
|
||||
if !self.1.contains(field.index())
|
||||
&& field.name() != MESSAGE_FIELD
|
||||
&& !field.name().starts_with("log.")
|
||||
{
|
||||
@@ -768,10 +768,7 @@ impl tracing::field::Visit for FieldsPresent<'_> {
|
||||
}
|
||||
|
||||
/// Serializes the fields directly supplied with a log event.
|
||||
struct SerializableEventFields<'a, 'event>(
|
||||
&'a tracing::Event<'event>,
|
||||
Option<&'a SkippedFieldIndices>,
|
||||
);
|
||||
struct SerializableEventFields<'a, 'event>(&'a tracing::Event<'event>, SkippedFieldIndices);
|
||||
|
||||
impl serde::ser::Serialize for SerializableEventFields<'_, '_> {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
@@ -788,15 +785,15 @@ impl serde::ser::Serialize for SerializableEventFields<'_, '_> {
|
||||
}
|
||||
|
||||
/// A tracing field visitor that skips the message field.
|
||||
struct MessageFieldSkipper<'a, S: serde::ser::SerializeMap> {
|
||||
struct MessageFieldSkipper<S: serde::ser::SerializeMap> {
|
||||
serializer: S,
|
||||
skipped_field_indices: Option<&'a SkippedFieldIndices>,
|
||||
skipped_field_indices: SkippedFieldIndices,
|
||||
state: Result<(), S::Error>,
|
||||
}
|
||||
|
||||
impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> {
|
||||
impl<S: serde::ser::SerializeMap> MessageFieldSkipper<S> {
|
||||
#[inline]
|
||||
fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
|
||||
fn new(serializer: S, skipped_field_indices: SkippedFieldIndices) -> Self {
|
||||
Self {
|
||||
serializer,
|
||||
skipped_field_indices,
|
||||
@@ -809,9 +806,7 @@ impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> {
|
||||
self.state.is_ok()
|
||||
&& field.name() != MESSAGE_FIELD
|
||||
&& !field.name().starts_with("log.")
|
||||
&& !self
|
||||
.skipped_field_indices
|
||||
.is_some_and(|i| i.contains(field.index()))
|
||||
&& !self.skipped_field_indices.contains(field.index())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@@ -821,7 +816,7 @@ impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<'_, S> {
|
||||
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<S> {
|
||||
#[inline]
|
||||
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
|
||||
if self.accept_field(field) {
|
||||
@@ -905,18 +900,17 @@ impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<
|
||||
/// with the span names as keys. To prevent collision we append a numberic value
|
||||
/// to the name. Also, collects any span fields we're interested in. Last one
|
||||
/// wins.
|
||||
struct SerializableSpans<'a, 'ctx, Span, const F: usize>
|
||||
struct SerializableSpans<'ctx, S>
|
||||
where
|
||||
Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
|
||||
S: for<'lookup> LookupSpan<'lookup>,
|
||||
{
|
||||
ctx: &'a Context<'ctx, Span>,
|
||||
callsite_ids: &'a papaya::HashMap<callsite::Identifier, CallsiteId>,
|
||||
extract: ExtractedSpanFields<'a, F>,
|
||||
spans: Vec<SpanRef<'ctx, S>>,
|
||||
extracted: ExtractedSpanFields,
|
||||
}
|
||||
|
||||
impl<Span, const F: usize> serde::ser::Serialize for SerializableSpans<'_, '_, Span, F>
|
||||
impl<S> serde::ser::Serialize for SerializableSpans<'_, S>
|
||||
where
|
||||
Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
|
||||
S: for<'lookup> LookupSpan<'lookup>,
|
||||
{
|
||||
fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
|
||||
where
|
||||
@@ -924,25 +918,22 @@ where
|
||||
{
|
||||
let mut serializer = serializer.serialize_map(None)?;
|
||||
|
||||
if let Some(leaf_span) = self.ctx.lookup_current() {
|
||||
for span in leaf_span.scope().from_root() {
|
||||
// Append a numeric callsite ID to the span name to keep the name unique
|
||||
// in the JSON object.
|
||||
let cid = self
|
||||
.callsite_ids
|
||||
.pin()
|
||||
.get(&span.metadata().callsite())
|
||||
.copied()
|
||||
.unwrap_or_default();
|
||||
for span in self.spans.iter().rev() {
|
||||
let ext = span.extensions();
|
||||
|
||||
// Loki turns the # into an underscore during field name concatenation.
|
||||
serializer.serialize_key(&format_args!("{}#{}", span.metadata().name(), &cid))?;
|
||||
// all spans should have this extension.
|
||||
let Some(fields) = ext.get() else { continue };
|
||||
|
||||
serializer.serialize_value(&SerializableSpanFields {
|
||||
span: &span,
|
||||
extract: &self.extract,
|
||||
})?;
|
||||
}
|
||||
self.extracted.layer_span(fields);
|
||||
|
||||
let SpanFields { values, span_info } = fields;
|
||||
serializer.serialize_entry(
|
||||
&*span_info.normalized_name,
|
||||
&SerializableSpanFields {
|
||||
fields: span.metadata().fields(),
|
||||
values,
|
||||
},
|
||||
)?;
|
||||
}
|
||||
|
||||
serializer.end()
|
||||
@@ -950,80 +941,77 @@ where
|
||||
}
|
||||
|
||||
/// Serializes the span fields as object.
|
||||
struct SerializableSpanFields<'a, 'span, Span, const F: usize>
|
||||
where
|
||||
Span: for<'lookup> LookupSpan<'lookup>,
|
||||
{
|
||||
span: &'a SpanRef<'span, Span>,
|
||||
extract: &'a ExtractedSpanFields<'a, F>,
|
||||
struct SerializableSpanFields<'span> {
|
||||
fields: &'span tracing::field::FieldSet,
|
||||
values: &'span [serde_json::Value; MAX_TRACING_FIELDS],
|
||||
}
|
||||
|
||||
impl<Span, const F: usize> serde::ser::Serialize for SerializableSpanFields<'_, '_, Span, F>
|
||||
where
|
||||
Span: for<'lookup> LookupSpan<'lookup>,
|
||||
{
|
||||
impl serde::ser::Serialize for SerializableSpanFields<'_> {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::ser::Serializer,
|
||||
{
|
||||
let mut serializer = serializer.serialize_map(None)?;
|
||||
|
||||
let ext = self.span.extensions();
|
||||
if let Some(data) = ext.get::<SpanFields>() {
|
||||
for (name, value) in &data.fields.pin() {
|
||||
serializer.serialize_entry(name, value)?;
|
||||
// TODO: replace clone with reference, if possible.
|
||||
self.extract.set(name, value.clone());
|
||||
for (field, value) in std::iter::zip(self.fields, self.values) {
|
||||
if value.is_null() {
|
||||
continue;
|
||||
}
|
||||
serializer.serialize_entry(field.name(), value)?;
|
||||
}
|
||||
|
||||
serializer.end()
|
||||
}
|
||||
}
|
||||
|
||||
struct ExtractedSpanFields<'a, const F: usize> {
|
||||
names: &'a IndexSet<&'static str>,
|
||||
// TODO: replace TryLock with something local thread and interior mutability.
|
||||
// serde API doesn't let us use `mut`.
|
||||
values: TryLock<([Option<serde_json::Value>; F], bool)>,
|
||||
struct ExtractedSpanFields {
|
||||
names: &'static [&'static str],
|
||||
values: RefCell<Vec<serde_json::Value>>,
|
||||
}
|
||||
|
||||
impl<'a, const F: usize> ExtractedSpanFields<'a, F> {
|
||||
fn new(names: &'a IndexSet<&'static str>) -> Self {
|
||||
impl ExtractedSpanFields {
|
||||
fn new(names: &'static [&'static str]) -> Self {
|
||||
ExtractedSpanFields {
|
||||
names,
|
||||
values: TryLock::new((array::from_fn(|_| Option::default()), false)),
|
||||
values: RefCell::new(vec![serde_json::Value::Null; names.len()]),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn set(&self, name: &'static str, value: serde_json::Value) {
|
||||
if let Some((index, _)) = self.names.get_full(name) {
|
||||
let mut fields = self.values.try_lock().expect("thread-local use");
|
||||
fields.0[index] = Some(value);
|
||||
fields.1 = true;
|
||||
fn layer_span(&self, fields: &SpanFields) {
|
||||
let mut v = self.values.borrow_mut();
|
||||
let SpanFields { values, span_info } = fields;
|
||||
|
||||
// extract the fields
|
||||
for (i, &j) in span_info.extract.iter().enumerate() {
|
||||
let Some(value) = values.get(j) else { continue };
|
||||
|
||||
if !value.is_null() {
|
||||
// TODO: replace clone with reference, if possible.
|
||||
v[i] = value.clone();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn has_values(&self) -> bool {
|
||||
self.values.try_lock().expect("thread-local use").1
|
||||
self.values.borrow().iter().any(|v| !v.is_null())
|
||||
}
|
||||
}
|
||||
|
||||
impl<const F: usize> serde::ser::Serialize for ExtractedSpanFields<'_, F> {
|
||||
impl serde::ser::Serialize for ExtractedSpanFields {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::ser::Serializer,
|
||||
{
|
||||
let mut serializer = serializer.serialize_map(None)?;
|
||||
|
||||
let values = self.values.try_lock().expect("thread-local use");
|
||||
for (i, value) in values.0.iter().enumerate() {
|
||||
if let Some(value) = value {
|
||||
let key = self.names[i];
|
||||
serializer.serialize_entry(key, value)?;
|
||||
let values = self.values.borrow();
|
||||
for (key, value) in std::iter::zip(self.names, &*values) {
|
||||
if value.is_null() {
|
||||
continue;
|
||||
}
|
||||
|
||||
serializer.serialize_entry(key, value)?;
|
||||
}
|
||||
|
||||
serializer.end()
|
||||
@@ -1032,7 +1020,6 @@ impl<const F: usize> serde::ser::Serialize for ExtractedSpanFields<'_, F> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
|
||||
use assert_json_diff::assert_json_eq;
|
||||
@@ -1081,10 +1068,9 @@ mod tests {
|
||||
let log_layer = JsonLoggingLayer {
|
||||
clock: clock.clone(),
|
||||
skipped_field_indices: papaya::HashMap::default(),
|
||||
callsite_ids: papaya::HashMap::default(),
|
||||
span_info: papaya::HashMap::default(),
|
||||
writer: buffer.clone(),
|
||||
extract_fields: IndexSet::from_iter(["x"]),
|
||||
_marker: PhantomData::<[&'static str; 1]>,
|
||||
extract_fields: &["x"],
|
||||
};
|
||||
|
||||
let registry = tracing_subscriber::Registry::default().with(log_layer);
|
||||
|
||||
Reference in New Issue
Block a user