mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 08:52:56 +00:00
remove locking from extract, use refcell instead
This commit is contained in:
@@ -18,7 +18,6 @@ use tracing_subscriber::fmt::{FormatEvent, FormatFields};
|
||||
use tracing_subscriber::layer::{Context, Layer};
|
||||
use tracing_subscriber::prelude::*;
|
||||
use tracing_subscriber::registry::{LookupSpan, SpanRef};
|
||||
use try_lock::TryLock;
|
||||
|
||||
/// Initialize logging and OpenTelemetry tracing and exporter.
|
||||
///
|
||||
@@ -577,11 +576,18 @@ impl EventFormatter {
|
||||
)?;
|
||||
}
|
||||
|
||||
let spans = SerializableSpans {
|
||||
ctx,
|
||||
extract: ExtractedSpanFields::new(state),
|
||||
let extracted = {
|
||||
let spans = SerializableSpans {
|
||||
spans: ctx
|
||||
.event_span(event)
|
||||
.map_or(vec![], |parent| parent.scope().collect_vec()),
|
||||
extract: ExtractedSpanFields::new(state),
|
||||
};
|
||||
|
||||
serializer.serialize_entry("spans", &spans)?;
|
||||
|
||||
spans.extract
|
||||
};
|
||||
serializer.serialize_entry("spans", &spans)?;
|
||||
|
||||
// TODO: thread-local cache?
|
||||
let pid = std::process::id();
|
||||
@@ -632,9 +638,9 @@ impl EventFormatter {
|
||||
}
|
||||
}
|
||||
|
||||
if spans.extract.has_values() {
|
||||
if extracted.has_values() {
|
||||
// TODO: add fields from event, too?
|
||||
serializer.serialize_entry("extract", &spans.extract)?;
|
||||
serializer.serialize_entry("extract", &extracted)?;
|
||||
}
|
||||
|
||||
serializer.end()
|
||||
@@ -910,17 +916,17 @@ impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<
|
||||
/// with the span names as keys. To prevent collision we append a numberic value
|
||||
/// to the name. Also, collects any span fields we're interested in. Last one
|
||||
/// wins.
|
||||
struct SerializableSpans<'a, 'ctx, Span, const F: usize>
|
||||
struct SerializableSpans<'a, 'ctx, S, const F: usize>
|
||||
where
|
||||
Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
|
||||
S: for<'lookup> LookupSpan<'lookup>,
|
||||
{
|
||||
ctx: &'a Context<'ctx, Span>,
|
||||
spans: Vec<SpanRef<'ctx, S>>,
|
||||
extract: ExtractedSpanFields<'a, F>,
|
||||
}
|
||||
|
||||
impl<Span, const F: usize> serde::ser::Serialize for SerializableSpans<'_, '_, Span, F>
|
||||
where
|
||||
Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
|
||||
Span: for<'lookup> LookupSpan<'lookup>,
|
||||
{
|
||||
fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
|
||||
where
|
||||
@@ -928,27 +934,33 @@ where
|
||||
{
|
||||
let mut serializer = serializer.serialize_map(None)?;
|
||||
|
||||
if let Some(leaf_span) = self.ctx.lookup_current() {
|
||||
for span in leaf_span.scope().from_root() {
|
||||
// Append a numeric callsite ID to the span name to keep the name unique
|
||||
// in the JSON object.
|
||||
let cid = self
|
||||
.extract
|
||||
.state
|
||||
.callsite_ids
|
||||
.pin()
|
||||
.get(&span.metadata().callsite())
|
||||
.copied()
|
||||
.unwrap_or_default();
|
||||
for span in self.spans.iter().rev() {
|
||||
// Append a numeric callsite ID to the span name to keep the name unique
|
||||
// in the JSON object.
|
||||
let cid = self
|
||||
.extract
|
||||
.state
|
||||
.callsite_ids
|
||||
.pin()
|
||||
.get(&span.metadata().callsite())
|
||||
.copied()
|
||||
.unwrap_or_default();
|
||||
|
||||
// Loki turns the # into an underscore during field name concatenation.
|
||||
serializer.serialize_key(&format_args!("{}#{}", span.metadata().name(), &cid))?;
|
||||
let ext = span.extensions();
|
||||
|
||||
serializer.serialize_value(&SerializableSpanFields {
|
||||
span: &span,
|
||||
// all spans must have this extension
|
||||
let Some(fields) = ext.get::<SpanFields>() else {
|
||||
continue;
|
||||
};
|
||||
|
||||
// Loki turns the # into an underscore during field name concatenation.
|
||||
serializer.serialize_entry(
|
||||
&format_args!("{}#{}", span.metadata().name(), &cid),
|
||||
&SerializableSpanFields {
|
||||
fields,
|
||||
extract: &self.extract,
|
||||
})?;
|
||||
}
|
||||
},
|
||||
)?;
|
||||
}
|
||||
|
||||
serializer.end()
|
||||
@@ -956,34 +968,25 @@ where
|
||||
}
|
||||
|
||||
/// Serializes the span fields as object.
|
||||
struct SerializableSpanFields<'a, 'span, Span, const F: usize>
|
||||
where
|
||||
Span: for<'lookup> LookupSpan<'lookup>,
|
||||
{
|
||||
span: &'a SpanRef<'span, Span>,
|
||||
struct SerializableSpanFields<'a, const F: usize> {
|
||||
fields: &'a SpanFields,
|
||||
extract: &'a ExtractedSpanFields<'a, F>,
|
||||
}
|
||||
|
||||
impl<Span, const F: usize> serde::ser::Serialize for SerializableSpanFields<'_, '_, Span, F>
|
||||
where
|
||||
Span: for<'lookup> LookupSpan<'lookup>,
|
||||
{
|
||||
impl<const F: usize> serde::ser::Serialize for SerializableSpanFields<'_, F> {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::ser::Serializer,
|
||||
{
|
||||
let mut serializer = serializer.serialize_map(None)?;
|
||||
|
||||
let ext = self.span.extensions();
|
||||
if let Some(data) = ext.get::<SpanFields>() {
|
||||
for (name, value) in &data.fields {
|
||||
if value.is_null() {
|
||||
continue;
|
||||
}
|
||||
|
||||
serializer.serialize_entry(name, value)?;
|
||||
self.extract.set(name, value);
|
||||
for (name, value) in &self.fields.fields {
|
||||
if value.is_null() {
|
||||
continue;
|
||||
}
|
||||
|
||||
serializer.serialize_entry(name, value)?;
|
||||
self.extract.set(name, value);
|
||||
}
|
||||
|
||||
serializer.end()
|
||||
@@ -992,16 +995,14 @@ where
|
||||
|
||||
struct ExtractedSpanFields<'a, const F: usize> {
|
||||
state: &'a JsonLoggingState<F>,
|
||||
// TODO: replace TryLock with something local thread and interior mutability.
|
||||
// serde API doesn't let us use `mut`.
|
||||
values: TryLock<([Option<serde_json::Value>; F], bool)>,
|
||||
values: RefCell<[serde_json::Value; F]>,
|
||||
}
|
||||
|
||||
impl<'a, const F: usize> ExtractedSpanFields<'a, F> {
|
||||
fn new(state: &'a JsonLoggingState<F>) -> Self {
|
||||
ExtractedSpanFields {
|
||||
state,
|
||||
values: TryLock::new((array::from_fn(|_| Option::default()), false)),
|
||||
values: RefCell::new(array::from_fn(|_| serde_json::Value::Null)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1009,16 +1010,15 @@ impl<'a, const F: usize> ExtractedSpanFields<'a, F> {
|
||||
fn set(&self, name: &'static str, value: &serde_json::Value) {
|
||||
// expectation: this list is small, so a linear search is fast.
|
||||
if let Some(index) = self.state.extract_fields.iter().position(|x| *x == name) {
|
||||
let mut fields = self.values.try_lock().expect("thread-local use");
|
||||
let mut fields = self.values.borrow_mut();
|
||||
// TODO: replace clone with reference, if possible.
|
||||
fields.0[index] = Some(value.clone());
|
||||
fields.1 = true;
|
||||
fields[index] = value.clone();
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn has_values(&self) -> bool {
|
||||
self.values.try_lock().expect("thread-local use").1
|
||||
self.values.borrow().iter().any(|v| !v.is_null())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1029,12 +1029,14 @@ impl<const F: usize> serde::ser::Serialize for ExtractedSpanFields<'_, F> {
|
||||
{
|
||||
let mut serializer = serializer.serialize_map(None)?;
|
||||
|
||||
let values = self.values.try_lock().expect("thread-local use");
|
||||
for (i, value) in values.0.iter().enumerate() {
|
||||
if let Some(value) = value {
|
||||
let key = self.state.extract_fields[i];
|
||||
serializer.serialize_entry(key, value)?;
|
||||
let values = self.values.borrow();
|
||||
for (i, value) in values.iter().enumerate() {
|
||||
if value.is_null() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let key = self.state.extract_fields[i];
|
||||
serializer.serialize_entry(key, value)?;
|
||||
}
|
||||
|
||||
serializer.end()
|
||||
|
||||
Reference in New Issue
Block a user