proxy: JSON logging field refactor (#11078)

## Problem

Grafana Loki's JSON handling is somewhat limited and the log message
should be structured in a way that it's easy to sift through logs and
filter.

## Summary of changes

* Drop span_id. It's too short lived to be of value and only bloats the
logs.
* Use the span's name as the object key, but append a unique numeric
value to prevent name collisions.
* Extract interesting span fields into a separate object at the root.

New format:
```json
{
  "timestamp": "2025-03-04T18:54:44.134435Z",
  "level": "INFO",
  "message": "connected to compute node at 127.0.0.1 (127.0.0.1:5432) latency=client: 22.002292ms, cplane: 0ns, compute: 5.338875ms, retry: 0ns",
  "fields": {
    "cold_start_info": "unknown"
  },
  "process_id": 56675,
  "thread_id": 9122892,
  "task_id": "24",
  "target": "proxy::compute",
  "src": "proxy/src/compute.rs:288",
  "trace_id": "5eb89b840ec63fee5fc56cebd633e197",
  "spans": {
    "connect_request#1": {
      "ep": "endpoint",
      "role": "proxy",
      "session_id": "b8a41818-12bd-4c3f-8ef0-9a942cc99514",
      "protocol": "tcp",
      "conn_info": "127.0.0.1"
    },
    "connect_to_compute#6": {},
    "connect_once#8": {
      "compute_id": "compute",
      "pid": "853"
    }
  },
  "extract": {
    "session_id": "b8a41818-12bd-4c3f-8ef0-9a942cc99514"
  }
}
```
This commit is contained in:
Folke Behrens
2025-03-05 10:27:46 +00:00
committed by GitHub
parent 906d7468cc
commit 8e51bfc597
3 changed files with 209 additions and 51 deletions

View File

@@ -53,7 +53,7 @@ measured = { workspace = true, features = ["lasso"] }
metrics.workspace = true
once_cell.workspace = true
opentelemetry = { workspace = true, features = ["trace"] }
papaya = "0.1.8"
papaya = "0.2.0"
parking_lot.workspace = true
parquet.workspace = true
parquet_derive.workspace = true

View File

@@ -1,9 +1,11 @@
use std::cell::{Cell, RefCell};
use std::collections::HashMap;
use std::hash::BuildHasher;
use std::{env, io};
use std::sync::atomic::{AtomicU32, Ordering};
use std::{array, env, fmt, io};
use chrono::{DateTime, Utc};
use indexmap::IndexSet;
use opentelemetry::trace::TraceContextExt;
use scopeguard::defer;
use serde::ser::{SerializeMap, Serializer};
@@ -17,6 +19,7 @@ use tracing_subscriber::fmt::{FormatEvent, FormatFields};
use tracing_subscriber::layer::{Context, Layer};
use tracing_subscriber::prelude::*;
use tracing_subscriber::registry::{LookupSpan, SpanRef};
use try_lock::TryLock;
/// Initialize logging and OpenTelemetry tracing and exporter.
///
@@ -46,13 +49,13 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
let otlp_layer = tracing_utils::init_tracing("proxy").await;
let json_log_layer = if logfmt == LogFormat::Json {
Some(JsonLoggingLayer {
clock: RealClock,
skipped_field_indices: papaya::HashMap::default(),
writer: StderrWriter {
Some(JsonLoggingLayer::new(
RealClock,
StderrWriter {
stderr: std::io::stderr(),
},
})
["request_id", "session_id", "conn_id"],
))
} else {
None
};
@@ -191,13 +194,39 @@ thread_local! {
}
/// Implements tracing layer to handle events specific to logging.
struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
struct JsonLoggingLayer<C: Clock, W: MakeWriter, const F: usize> {
clock: C,
skipped_field_indices: papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
callsite_ids: papaya::HashMap<callsite::Identifier, CallsiteId>,
writer: W,
// We use a const generic and arrays to bypass one heap allocation.
extract_fields: IndexSet<&'static str>,
_marker: std::marker::PhantomData<[&'static str; F]>,
}
impl<S, C: Clock + 'static, W: MakeWriter + 'static> Layer<S> for JsonLoggingLayer<C, W>
impl<C: Clock, W: MakeWriter, const F: usize> JsonLoggingLayer<C, W, F> {
fn new(clock: C, writer: W, extract_fields: [&'static str; F]) -> Self {
JsonLoggingLayer {
clock,
skipped_field_indices: papaya::HashMap::default(),
callsite_ids: papaya::HashMap::default(),
writer,
extract_fields: IndexSet::from_iter(extract_fields),
_marker: std::marker::PhantomData,
}
}
#[inline]
fn callsite_id(&self, cs: callsite::Identifier) -> CallsiteId {
*self
.callsite_ids
.pin()
.get_or_insert_with(cs, CallsiteId::next)
}
}
impl<S, C: Clock + 'static, W: MakeWriter + 'static, const F: usize> Layer<S>
for JsonLoggingLayer<C, W, F>
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
@@ -211,7 +240,14 @@ where
let res: io::Result<()> = REENTRANCY_GUARD.with(move |entered| {
if entered.get() {
let mut formatter = EventFormatter::new();
formatter.format(now, event, &ctx, &self.skipped_field_indices)?;
formatter.format::<S, F>(
now,
event,
&ctx,
&self.skipped_field_indices,
&self.callsite_ids,
&self.extract_fields,
)?;
self.writer.make_writer().write_all(formatter.buffer())
} else {
entered.set(true);
@@ -219,7 +255,14 @@ where
EVENT_FORMATTER.with_borrow_mut(move |formatter| {
formatter.reset();
formatter.format(now, event, &ctx, &self.skipped_field_indices)?;
formatter.format::<S, F>(
now,
event,
&ctx,
&self.skipped_field_indices,
&self.callsite_ids,
&self.extract_fields,
)?;
self.writer.make_writer().write_all(formatter.buffer())
})
}
@@ -243,13 +286,17 @@ where
/// Registers a SpanFields instance as span extension.
fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
let csid = self.callsite_id(attrs.metadata().callsite());
let span = ctx.span(id).expect("span must exist");
let fields = SpanFields::default();
fields.record_fields(attrs);
// This could deadlock when there's a panic somewhere in the tracing
// event handling and a read or write guard is still held. This includes
// the OTel subscriber.
span.extensions_mut().insert(fields);
let mut exts = span.extensions_mut();
exts.insert(fields);
exts.insert(csid);
}
fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
@@ -265,6 +312,7 @@ where
/// wins.
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
if !metadata.is_event() {
self.callsite_id(metadata.callsite());
// Must not be never because we wouldn't get trace and span data.
return Interest::always();
}
@@ -297,6 +345,26 @@ where
}
}
#[derive(Copy, Clone, Debug, Default)]
#[repr(transparent)]
struct CallsiteId(u32);
impl CallsiteId {
#[inline]
fn next() -> Self {
// Start at 1 to reserve 0 for default.
static COUNTER: AtomicU32 = AtomicU32::new(1);
CallsiteId(COUNTER.fetch_add(1, Ordering::Relaxed))
}
}
impl fmt::Display for CallsiteId {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
/// Stores span field values recorded during the spans lifetime.
#[derive(Default)]
struct SpanFields {
@@ -448,12 +516,14 @@ impl EventFormatter {
self.logline_buffer.clear();
}
fn format<S>(
fn format<S, const F: usize>(
&mut self,
now: DateTime<Utc>,
event: &Event<'_>,
ctx: &Context<'_, S>,
skipped_field_indices: &papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
callsite_ids: &papaya::HashMap<callsite::Identifier, CallsiteId>,
extract_fields: &IndexSet<&'static str>,
) -> io::Result<()>
where
S: Subscriber + for<'a> LookupSpan<'a>,
@@ -485,6 +555,7 @@ impl EventFormatter {
event.record(&mut message_extractor);
let mut serializer = message_extractor.into_serializer()?;
// Direct message fields.
let mut fields_present = FieldsPresent(false, skipped_field_indices);
event.record(&mut fields_present);
if fields_present.0 {
@@ -494,7 +565,9 @@ impl EventFormatter {
)?;
}
// TODO: thread-local cache?
let pid = std::process::id();
// Skip adding pid 1 to reduce noise for services running in containers.
if pid != 1 {
serializer.serialize_entry("process_id", &pid)?;
}
@@ -514,6 +587,7 @@ impl EventFormatter {
serializer.serialize_entry("target", meta.target())?;
// Skip adding module if it's the same as target.
if let Some(module) = meta.module_path() {
if module != meta.target() {
serializer.serialize_entry("module", module)?;
@@ -540,7 +614,16 @@ impl EventFormatter {
}
}
serializer.serialize_entry("spans", &SerializableSpanStack(ctx))?;
let stack = SerializableSpans {
ctx,
callsite_ids,
fields: ExtractedSpanFields::<'_, F>::new(extract_fields),
};
serializer.serialize_entry("spans", &stack)?;
if stack.fields.has_values() {
serializer.serialize_entry("extract", &stack.fields)?;
}
serializer.end()
};
@@ -818,15 +901,20 @@ impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<
}
}
/// Serializes the span stack from root to leaf (parent of event) enumerated
/// inside an object where the keys are just the number padded with zeroes
/// to retain sorting order.
// The object is necessary because Loki cannot flatten arrays.
struct SerializableSpanStack<'a, 'b, Span>(&'b Context<'a, Span>)
/// Serializes the span stack from root to leaf (parent of event) as object
/// with the span names as keys. To prevent collision we append a numberic value
/// to the name. Also, collects any span fields we're interested in. Last one
/// wins.
struct SerializableSpans<'a, 'ctx, Span, const F: usize>
where
Span: Subscriber + for<'lookup> LookupSpan<'lookup>;
Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
{
ctx: &'a Context<'ctx, Span>,
callsite_ids: &'a papaya::HashMap<callsite::Identifier, CallsiteId>,
fields: ExtractedSpanFields<'a, F>,
}
impl<Span> serde::ser::Serialize for SerializableSpanStack<'_, '_, Span>
impl<Span, const F: usize> serde::ser::Serialize for SerializableSpans<'_, '_, Span, F>
where
Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
{
@@ -836,9 +924,24 @@ where
{
let mut serializer = serializer.serialize_map(None)?;
if let Some(leaf_span) = self.0.lookup_current() {
for (i, span) in leaf_span.scope().from_root().enumerate() {
serializer.serialize_entry(&format_args!("{i:02}"), &SerializableSpan(&span))?;
if let Some(leaf_span) = self.ctx.lookup_current() {
for span in leaf_span.scope().from_root() {
// Append a numeric callsite ID to the span name to keep the name unique
// in the JSON object.
let cid = self
.callsite_ids
.pin()
.get(&span.metadata().callsite())
.copied()
.unwrap_or_default();
// Loki turns the # into an underscore during field name concatenation.
serializer.serialize_key(&format_args!("{}#{}", span.metadata().name(), &cid))?;
serializer.serialize_value(&SerializableSpanFields {
span: &span,
fields: &self.fields,
})?;
}
}
@@ -846,28 +949,79 @@ where
}
}
/// Serializes a single span. Include the span ID, name and its fields as
/// recorded up to this point.
struct SerializableSpan<'a, 'b, Span>(&'b SpanRef<'a, Span>)
where
Span: for<'lookup> LookupSpan<'lookup>;
impl<Span> serde::ser::Serialize for SerializableSpan<'_, '_, Span>
/// Serializes the span fields as object.
struct SerializableSpanFields<'a, 'span, Span, const F: usize>
where
Span: for<'lookup> LookupSpan<'lookup>,
{
fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
span: &'a SpanRef<'span, Span>,
fields: &'a ExtractedSpanFields<'a, F>,
}
impl<Span, const F: usize> serde::ser::Serialize for SerializableSpanFields<'_, '_, Span, F>
where
Span: for<'lookup> LookupSpan<'lookup>,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
Ser: serde::ser::Serializer,
S: serde::ser::Serializer,
{
let mut serializer = serializer.serialize_map(None)?;
// TODO: the span ID is probably only useful for debugging tracing.
serializer.serialize_entry("span_id", &format_args!("{:016x}", self.0.id().into_u64()))?;
serializer.serialize_entry("span_name", self.0.metadata().name())?;
let ext = self.0.extensions();
let ext = self.span.extensions();
if let Some(data) = ext.get::<SpanFields>() {
for (key, value) in &data.fields.pin() {
for (name, value) in &data.fields.pin() {
serializer.serialize_entry(name, value)?;
// TODO: replace clone with reference, if possible.
self.fields.set(name, value.clone());
}
}
serializer.end()
}
}
struct ExtractedSpanFields<'a, const F: usize> {
names: &'a IndexSet<&'static str>,
// TODO: replace TryLock with something local thread and interior mutability.
// serde API doesn't let us use `mut`.
values: TryLock<([Option<serde_json::Value>; F], bool)>,
}
impl<'a, const F: usize> ExtractedSpanFields<'a, F> {
fn new(names: &'a IndexSet<&'static str>) -> Self {
ExtractedSpanFields {
names,
values: TryLock::new((array::from_fn(|_| Option::default()), false)),
}
}
#[inline]
fn set(&self, name: &'static str, value: serde_json::Value) {
if let Some((index, _)) = self.names.get_full(name) {
let mut fields = self.values.try_lock().expect("thread-local use");
fields.0[index] = Some(value);
fields.1 = true;
}
}
#[inline]
fn has_values(&self) -> bool {
self.values.try_lock().expect("thread-local use").1
}
}
impl<const F: usize> serde::ser::Serialize for ExtractedSpanFields<'_, F> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
let mut serializer = serializer.serialize_map(None)?;
let values = self.values.try_lock().expect("thread-local use");
for (i, value) in values.0.iter().enumerate() {
if let Some(value) = value {
let key = self.names[i];
serializer.serialize_entry(key, value)?;
}
}
@@ -879,6 +1033,7 @@ where
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use std::marker::PhantomData;
use std::sync::{Arc, Mutex, MutexGuard};
use assert_json_diff::assert_json_eq;
@@ -927,14 +1082,17 @@ mod tests {
let log_layer = JsonLoggingLayer {
clock: clock.clone(),
skipped_field_indices: papaya::HashMap::default(),
callsite_ids: papaya::HashMap::default(),
writer: buffer.clone(),
extract_fields: IndexSet::from_iter(["x"]),
_marker: PhantomData::<[&'static str; 1]>,
};
let registry = tracing_subscriber::Registry::default().with(log_layer);
tracing::subscriber::with_default(registry, || {
info_span!("span1", x = 40, x = 41, x = 42).in_scope(|| {
info_span!("span2").in_scope(|| {
info_span!("some_span", x = 24).in_scope(|| {
info_span!("some_span", x = 40, x = 41, x = 42).in_scope(|| {
tracing::error!(
a = 1,
a = 2,
@@ -960,16 +1118,16 @@ mod tests {
"a": 3,
},
"spans": {
"00":{
"span_id": "0000000000000001",
"span_name": "span1",
"x": 42,
"some_span#1":{
"x": 24,
},
"01": {
"span_id": "0000000000000002",
"span_name": "span2",
"some_span#2": {
"x": 42,
}
},
"extract": {
"x": 42,
},
"src": actual.as_object().unwrap().get("src").unwrap().as_str().unwrap(),
"target": "proxy::logging::tests",
"process_id": actual.as_object().unwrap().get("process_id").unwrap().as_number().unwrap(),