diff --git a/Cargo.lock b/Cargo.lock index 772b1f50c6..7aa9c53e7e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4385,9 +4385,9 @@ dependencies = [ [[package]] name = "papaya" -version = "0.1.8" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc7c76487f7eaa00a0fc1d7f88dc6b295aec478d11b0fc79f857b62c2874124c" +checksum = "aab21828b6b5952fdadd6c377728ffae53ec3a21b2febc47319ab65741f7e2fd" dependencies = [ "equivalent", "seize", @@ -6110,9 +6110,9 @@ dependencies = [ [[package]] name = "seize" -version = "0.4.9" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d84b0c858bdd30cb56f5597f8b3bf702ec23829e652cc636a1e5a7b9de46ae93" +checksum = "e4b8d813387d566f627f3ea1b914c068aac94c40ae27ec43f5f33bde65abefe7" dependencies = [ "libc", "windows-sys 0.52.0", diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 5964b76ecf..b6e3f03a81 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -53,7 +53,7 @@ measured = { workspace = true, features = ["lasso"] } metrics.workspace = true once_cell.workspace = true opentelemetry = { workspace = true, features = ["trace"] } -papaya = "0.1.8" +papaya = "0.2.0" parking_lot.workspace = true parquet.workspace = true parquet_derive.workspace = true diff --git a/proxy/src/logging.rs b/proxy/src/logging.rs index 3c34918d84..b2e95a109f 100644 --- a/proxy/src/logging.rs +++ b/proxy/src/logging.rs @@ -1,9 +1,11 @@ use std::cell::{Cell, RefCell}; use std::collections::HashMap; use std::hash::BuildHasher; -use std::{env, io}; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::{array, env, fmt, io}; use chrono::{DateTime, Utc}; +use indexmap::IndexSet; use opentelemetry::trace::TraceContextExt; use scopeguard::defer; use serde::ser::{SerializeMap, Serializer}; @@ -17,6 +19,7 @@ use tracing_subscriber::fmt::{FormatEvent, FormatFields}; use tracing_subscriber::layer::{Context, Layer}; use tracing_subscriber::prelude::*; use tracing_subscriber::registry::{LookupSpan, SpanRef}; +use try_lock::TryLock; /// Initialize logging and OpenTelemetry tracing and exporter. /// @@ -46,13 +49,13 @@ pub async fn init() -> anyhow::Result { let otlp_layer = tracing_utils::init_tracing("proxy").await; let json_log_layer = if logfmt == LogFormat::Json { - Some(JsonLoggingLayer { - clock: RealClock, - skipped_field_indices: papaya::HashMap::default(), - writer: StderrWriter { + Some(JsonLoggingLayer::new( + RealClock, + StderrWriter { stderr: std::io::stderr(), }, - }) + ["request_id", "session_id", "conn_id"], + )) } else { None }; @@ -191,13 +194,39 @@ thread_local! { } /// Implements tracing layer to handle events specific to logging. -struct JsonLoggingLayer { +struct JsonLoggingLayer { clock: C, skipped_field_indices: papaya::HashMap, + callsite_ids: papaya::HashMap, writer: W, + // We use a const generic and arrays to bypass one heap allocation. + extract_fields: IndexSet<&'static str>, + _marker: std::marker::PhantomData<[&'static str; F]>, } -impl Layer for JsonLoggingLayer +impl JsonLoggingLayer { + fn new(clock: C, writer: W, extract_fields: [&'static str; F]) -> Self { + JsonLoggingLayer { + clock, + skipped_field_indices: papaya::HashMap::default(), + callsite_ids: papaya::HashMap::default(), + writer, + extract_fields: IndexSet::from_iter(extract_fields), + _marker: std::marker::PhantomData, + } + } + + #[inline] + fn callsite_id(&self, cs: callsite::Identifier) -> CallsiteId { + *self + .callsite_ids + .pin() + .get_or_insert_with(cs, CallsiteId::next) + } +} + +impl Layer + for JsonLoggingLayer where S: Subscriber + for<'a> LookupSpan<'a>, { @@ -211,7 +240,14 @@ where let res: io::Result<()> = REENTRANCY_GUARD.with(move |entered| { if entered.get() { let mut formatter = EventFormatter::new(); - formatter.format(now, event, &ctx, &self.skipped_field_indices)?; + formatter.format::( + now, + event, + &ctx, + &self.skipped_field_indices, + &self.callsite_ids, + &self.extract_fields, + )?; self.writer.make_writer().write_all(formatter.buffer()) } else { entered.set(true); @@ -219,7 +255,14 @@ where EVENT_FORMATTER.with_borrow_mut(move |formatter| { formatter.reset(); - formatter.format(now, event, &ctx, &self.skipped_field_indices)?; + formatter.format::( + now, + event, + &ctx, + &self.skipped_field_indices, + &self.callsite_ids, + &self.extract_fields, + )?; self.writer.make_writer().write_all(formatter.buffer()) }) } @@ -243,13 +286,17 @@ where /// Registers a SpanFields instance as span extension. fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) { + let csid = self.callsite_id(attrs.metadata().callsite()); let span = ctx.span(id).expect("span must exist"); let fields = SpanFields::default(); fields.record_fields(attrs); // This could deadlock when there's a panic somewhere in the tracing // event handling and a read or write guard is still held. This includes // the OTel subscriber. - span.extensions_mut().insert(fields); + let mut exts = span.extensions_mut(); + + exts.insert(fields); + exts.insert(csid); } fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) { @@ -265,6 +312,7 @@ where /// wins. fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest { if !metadata.is_event() { + self.callsite_id(metadata.callsite()); // Must not be never because we wouldn't get trace and span data. return Interest::always(); } @@ -297,6 +345,26 @@ where } } +#[derive(Copy, Clone, Debug, Default)] +#[repr(transparent)] +struct CallsiteId(u32); + +impl CallsiteId { + #[inline] + fn next() -> Self { + // Start at 1 to reserve 0 for default. + static COUNTER: AtomicU32 = AtomicU32::new(1); + CallsiteId(COUNTER.fetch_add(1, Ordering::Relaxed)) + } +} + +impl fmt::Display for CallsiteId { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + /// Stores span field values recorded during the spans lifetime. #[derive(Default)] struct SpanFields { @@ -448,12 +516,14 @@ impl EventFormatter { self.logline_buffer.clear(); } - fn format( + fn format( &mut self, now: DateTime, event: &Event<'_>, ctx: &Context<'_, S>, skipped_field_indices: &papaya::HashMap, + callsite_ids: &papaya::HashMap, + extract_fields: &IndexSet<&'static str>, ) -> io::Result<()> where S: Subscriber + for<'a> LookupSpan<'a>, @@ -485,6 +555,7 @@ impl EventFormatter { event.record(&mut message_extractor); let mut serializer = message_extractor.into_serializer()?; + // Direct message fields. let mut fields_present = FieldsPresent(false, skipped_field_indices); event.record(&mut fields_present); if fields_present.0 { @@ -494,7 +565,9 @@ impl EventFormatter { )?; } + // TODO: thread-local cache? let pid = std::process::id(); + // Skip adding pid 1 to reduce noise for services running in containers. if pid != 1 { serializer.serialize_entry("process_id", &pid)?; } @@ -514,6 +587,7 @@ impl EventFormatter { serializer.serialize_entry("target", meta.target())?; + // Skip adding module if it's the same as target. if let Some(module) = meta.module_path() { if module != meta.target() { serializer.serialize_entry("module", module)?; @@ -540,7 +614,16 @@ impl EventFormatter { } } - serializer.serialize_entry("spans", &SerializableSpanStack(ctx))?; + let stack = SerializableSpans { + ctx, + callsite_ids, + fields: ExtractedSpanFields::<'_, F>::new(extract_fields), + }; + serializer.serialize_entry("spans", &stack)?; + + if stack.fields.has_values() { + serializer.serialize_entry("extract", &stack.fields)?; + } serializer.end() }; @@ -818,15 +901,20 @@ impl tracing::field::Visit for MessageFieldSkipper< } } -/// Serializes the span stack from root to leaf (parent of event) enumerated -/// inside an object where the keys are just the number padded with zeroes -/// to retain sorting order. -// The object is necessary because Loki cannot flatten arrays. -struct SerializableSpanStack<'a, 'b, Span>(&'b Context<'a, Span>) +/// Serializes the span stack from root to leaf (parent of event) as object +/// with the span names as keys. To prevent collision we append a numberic value +/// to the name. Also, collects any span fields we're interested in. Last one +/// wins. +struct SerializableSpans<'a, 'ctx, Span, const F: usize> where - Span: Subscriber + for<'lookup> LookupSpan<'lookup>; + Span: Subscriber + for<'lookup> LookupSpan<'lookup>, +{ + ctx: &'a Context<'ctx, Span>, + callsite_ids: &'a papaya::HashMap, + fields: ExtractedSpanFields<'a, F>, +} -impl serde::ser::Serialize for SerializableSpanStack<'_, '_, Span> +impl serde::ser::Serialize for SerializableSpans<'_, '_, Span, F> where Span: Subscriber + for<'lookup> LookupSpan<'lookup>, { @@ -836,9 +924,24 @@ where { let mut serializer = serializer.serialize_map(None)?; - if let Some(leaf_span) = self.0.lookup_current() { - for (i, span) in leaf_span.scope().from_root().enumerate() { - serializer.serialize_entry(&format_args!("{i:02}"), &SerializableSpan(&span))?; + if let Some(leaf_span) = self.ctx.lookup_current() { + for span in leaf_span.scope().from_root() { + // Append a numeric callsite ID to the span name to keep the name unique + // in the JSON object. + let cid = self + .callsite_ids + .pin() + .get(&span.metadata().callsite()) + .copied() + .unwrap_or_default(); + + // Loki turns the # into an underscore during field name concatenation. + serializer.serialize_key(&format_args!("{}#{}", span.metadata().name(), &cid))?; + + serializer.serialize_value(&SerializableSpanFields { + span: &span, + fields: &self.fields, + })?; } } @@ -846,28 +949,79 @@ where } } -/// Serializes a single span. Include the span ID, name and its fields as -/// recorded up to this point. -struct SerializableSpan<'a, 'b, Span>(&'b SpanRef<'a, Span>) -where - Span: for<'lookup> LookupSpan<'lookup>; - -impl serde::ser::Serialize for SerializableSpan<'_, '_, Span> +/// Serializes the span fields as object. +struct SerializableSpanFields<'a, 'span, Span, const F: usize> where Span: for<'lookup> LookupSpan<'lookup>, { - fn serialize(&self, serializer: Ser) -> Result + span: &'a SpanRef<'span, Span>, + fields: &'a ExtractedSpanFields<'a, F>, +} + +impl serde::ser::Serialize for SerializableSpanFields<'_, '_, Span, F> +where + Span: for<'lookup> LookupSpan<'lookup>, +{ + fn serialize(&self, serializer: S) -> Result where - Ser: serde::ser::Serializer, + S: serde::ser::Serializer, { let mut serializer = serializer.serialize_map(None)?; - // TODO: the span ID is probably only useful for debugging tracing. - serializer.serialize_entry("span_id", &format_args!("{:016x}", self.0.id().into_u64()))?; - serializer.serialize_entry("span_name", self.0.metadata().name())?; - let ext = self.0.extensions(); + let ext = self.span.extensions(); if let Some(data) = ext.get::() { - for (key, value) in &data.fields.pin() { + for (name, value) in &data.fields.pin() { + serializer.serialize_entry(name, value)?; + // TODO: replace clone with reference, if possible. + self.fields.set(name, value.clone()); + } + } + + serializer.end() + } +} + +struct ExtractedSpanFields<'a, const F: usize> { + names: &'a IndexSet<&'static str>, + // TODO: replace TryLock with something local thread and interior mutability. + // serde API doesn't let us use `mut`. + values: TryLock<([Option; F], bool)>, +} + +impl<'a, const F: usize> ExtractedSpanFields<'a, F> { + fn new(names: &'a IndexSet<&'static str>) -> Self { + ExtractedSpanFields { + names, + values: TryLock::new((array::from_fn(|_| Option::default()), false)), + } + } + + #[inline] + fn set(&self, name: &'static str, value: serde_json::Value) { + if let Some((index, _)) = self.names.get_full(name) { + let mut fields = self.values.try_lock().expect("thread-local use"); + fields.0[index] = Some(value); + fields.1 = true; + } + } + + #[inline] + fn has_values(&self) -> bool { + self.values.try_lock().expect("thread-local use").1 + } +} + +impl serde::ser::Serialize for ExtractedSpanFields<'_, F> { + fn serialize(&self, serializer: S) -> Result + where + S: serde::ser::Serializer, + { + let mut serializer = serializer.serialize_map(None)?; + + let values = self.values.try_lock().expect("thread-local use"); + for (i, value) in values.0.iter().enumerate() { + if let Some(value) = value { + let key = self.names[i]; serializer.serialize_entry(key, value)?; } } @@ -879,6 +1033,7 @@ where #[cfg(test)] #[allow(clippy::unwrap_used)] mod tests { + use std::marker::PhantomData; use std::sync::{Arc, Mutex, MutexGuard}; use assert_json_diff::assert_json_eq; @@ -927,14 +1082,17 @@ mod tests { let log_layer = JsonLoggingLayer { clock: clock.clone(), skipped_field_indices: papaya::HashMap::default(), + callsite_ids: papaya::HashMap::default(), writer: buffer.clone(), + extract_fields: IndexSet::from_iter(["x"]), + _marker: PhantomData::<[&'static str; 1]>, }; let registry = tracing_subscriber::Registry::default().with(log_layer); tracing::subscriber::with_default(registry, || { - info_span!("span1", x = 40, x = 41, x = 42).in_scope(|| { - info_span!("span2").in_scope(|| { + info_span!("some_span", x = 24).in_scope(|| { + info_span!("some_span", x = 40, x = 41, x = 42).in_scope(|| { tracing::error!( a = 1, a = 2, @@ -960,16 +1118,16 @@ mod tests { "a": 3, }, "spans": { - "00":{ - "span_id": "0000000000000001", - "span_name": "span1", - "x": 42, + "some_span#1":{ + "x": 24, }, - "01": { - "span_id": "0000000000000002", - "span_name": "span2", + "some_span#2": { + "x": 42, } }, + "extract": { + "x": 42, + }, "src": actual.as_object().unwrap().get("src").unwrap().as_str().unwrap(), "target": "proxy::logging::tests", "process_id": actual.as_object().unwrap().get("process_id").unwrap().as_number().unwrap(),