mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-13 08:22:55 +00:00
1128 lines
35 KiB
Rust
1128 lines
35 KiB
Rust
use std::cell::RefCell;
|
|
use std::collections::HashMap;
|
|
use std::sync::Arc;
|
|
use std::sync::atomic::{AtomicU32, Ordering};
|
|
use std::{env, io};
|
|
|
|
use chrono::{DateTime, Utc};
|
|
use opentelemetry::trace::TraceContextExt;
|
|
use serde::ser::{SerializeMap, Serializer};
|
|
use tracing::subscriber::Interest;
|
|
use tracing::{Event, Metadata, Span, Subscriber, callsite, span};
|
|
use tracing_opentelemetry::OpenTelemetrySpanExt;
|
|
use tracing_subscriber::filter::{EnvFilter, LevelFilter};
|
|
use tracing_subscriber::fmt::format::{Format, Full};
|
|
use tracing_subscriber::fmt::time::SystemTime;
|
|
use tracing_subscriber::fmt::{FormatEvent, FormatFields};
|
|
use tracing_subscriber::layer::{Context, Layer};
|
|
use tracing_subscriber::prelude::*;
|
|
use tracing_subscriber::registry::{LookupSpan, SpanRef};
|
|
|
|
/// Initialize logging and OpenTelemetry tracing and exporter.
|
|
///
|
|
/// Logging can be configured using `RUST_LOG` environment variable.
|
|
///
|
|
/// OpenTelemetry is configured with OTLP/HTTP exporter. It picks up
|
|
/// configuration from environment variables. For example, to change the
|
|
/// destination, set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`.
|
|
/// See <https://opentelemetry.io/docs/reference/specification/sdk-environment-variables>
|
|
pub async fn init() -> anyhow::Result<LoggingGuard> {
|
|
let logfmt = LogFormat::from_env()?;
|
|
|
|
let env_filter = EnvFilter::builder()
|
|
.with_default_directive(LevelFilter::INFO.into())
|
|
.from_env_lossy()
|
|
.add_directive(
|
|
"aws_config=info"
|
|
.parse()
|
|
.expect("this should be a valid filter directive"),
|
|
)
|
|
.add_directive(
|
|
"azure_core::policies::transport=off"
|
|
.parse()
|
|
.expect("this should be a valid filter directive"),
|
|
);
|
|
|
|
let otlp_layer =
|
|
tracing_utils::init_tracing("proxy", tracing_utils::ExportConfig::default()).await;
|
|
|
|
let json_log_layer = if logfmt == LogFormat::Json {
|
|
Some(JsonLoggingLayer::new(
|
|
RealClock,
|
|
StderrWriter {
|
|
stderr: std::io::stderr(),
|
|
},
|
|
&["conn_id", "ep", "query_id", "request_id", "session_id"],
|
|
))
|
|
} else {
|
|
None
|
|
};
|
|
|
|
let text_log_layer = if logfmt == LogFormat::Text {
|
|
Some(
|
|
tracing_subscriber::fmt::layer()
|
|
.with_ansi(false)
|
|
.with_writer(std::io::stderr)
|
|
.with_target(false),
|
|
)
|
|
} else {
|
|
None
|
|
};
|
|
|
|
tracing_subscriber::registry()
|
|
.with(env_filter)
|
|
.with(otlp_layer)
|
|
.with(json_log_layer)
|
|
.with(text_log_layer)
|
|
.try_init()?;
|
|
|
|
Ok(LoggingGuard)
|
|
}
|
|
|
|
/// Initialize logging for local_proxy with log prefix and no opentelemetry.
|
|
///
|
|
/// Logging can be configured using `RUST_LOG` environment variable.
|
|
pub fn init_local_proxy() -> anyhow::Result<LoggingGuard> {
|
|
let env_filter = EnvFilter::builder()
|
|
.with_default_directive(LevelFilter::INFO.into())
|
|
.from_env_lossy();
|
|
|
|
let fmt_layer = tracing_subscriber::fmt::layer()
|
|
.with_ansi(false)
|
|
.with_writer(std::io::stderr)
|
|
.event_format(LocalProxyFormatter(Format::default().with_target(false)));
|
|
|
|
tracing_subscriber::registry()
|
|
.with(env_filter)
|
|
.with(fmt_layer)
|
|
.try_init()?;
|
|
|
|
Ok(LoggingGuard)
|
|
}
|
|
|
|
pub struct LocalProxyFormatter(Format<Full, SystemTime>);
|
|
|
|
impl<S, N> FormatEvent<S, N> for LocalProxyFormatter
|
|
where
|
|
S: Subscriber + for<'a> LookupSpan<'a>,
|
|
N: for<'a> FormatFields<'a> + 'static,
|
|
{
|
|
fn format_event(
|
|
&self,
|
|
ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
|
|
mut writer: tracing_subscriber::fmt::format::Writer<'_>,
|
|
event: &tracing::Event<'_>,
|
|
) -> std::fmt::Result {
|
|
writer.write_str("[local_proxy] ")?;
|
|
self.0.format_event(ctx, writer, event)
|
|
}
|
|
}
|
|
|
|
pub struct LoggingGuard;
|
|
|
|
impl Drop for LoggingGuard {
|
|
fn drop(&mut self) {
|
|
// Shutdown trace pipeline gracefully, so that it has a chance to send any
|
|
// pending traces before we exit.
|
|
tracing::info!("shutting down the tracing machinery");
|
|
tracing_utils::shutdown_tracing();
|
|
}
|
|
}
|
|
|
|
#[derive(Copy, Clone, PartialEq, Eq, Default, Debug)]
|
|
enum LogFormat {
|
|
Text,
|
|
#[default]
|
|
Json,
|
|
}
|
|
|
|
impl LogFormat {
|
|
fn from_env() -> anyhow::Result<Self> {
|
|
let logfmt = env::var("LOGFMT");
|
|
Ok(match logfmt.as_deref() {
|
|
Err(_) => LogFormat::default(),
|
|
Ok("text") => LogFormat::Text,
|
|
Ok("json") => LogFormat::Json,
|
|
Ok(logfmt) => anyhow::bail!("unknown log format: {logfmt}"),
|
|
})
|
|
}
|
|
}
|
|
|
|
trait MakeWriter {
|
|
fn make_writer(&self) -> impl io::Write;
|
|
}
|
|
|
|
struct StderrWriter {
|
|
stderr: io::Stderr,
|
|
}
|
|
|
|
impl MakeWriter for StderrWriter {
|
|
#[inline]
|
|
fn make_writer(&self) -> impl io::Write {
|
|
self.stderr.lock()
|
|
}
|
|
}
|
|
|
|
// TODO: move into separate module or even separate crate.
|
|
trait Clock {
|
|
fn now(&self) -> DateTime<Utc>;
|
|
}
|
|
|
|
struct RealClock;
|
|
|
|
impl Clock for RealClock {
|
|
#[inline]
|
|
fn now(&self) -> DateTime<Utc> {
|
|
Utc::now()
|
|
}
|
|
}
|
|
|
|
/// Name of the field used by tracing crate to store the event message.
|
|
const MESSAGE_FIELD: &str = "message";
|
|
|
|
/// Tracing used to enforce that spans/events have no more than 32 fields.
|
|
/// It seems this is no longer the case, but it's still documented in some places.
|
|
/// Generally, we shouldn't expect more than 32 fields anyway, so we can try and
|
|
/// rely on it for some (minor) performance gains.
|
|
const MAX_TRACING_FIELDS: usize = 32;
|
|
|
|
thread_local! {
|
|
/// Thread-local instance with per-thread buffer for log writing.
|
|
static EVENT_FORMATTER: RefCell<EventFormatter> = const { RefCell::new(EventFormatter::new()) };
|
|
/// Cached OS thread ID.
|
|
static THREAD_ID: u64 = gettid::gettid();
|
|
}
|
|
|
|
/// Map for values fixed at callsite registration.
|
|
// We use papaya here because registration rarely happens post-startup.
|
|
// papaya is good for read-heavy workloads.
|
|
//
|
|
// We use rustc_hash here because callsite::Identifier will always be an integer with low-bit entropy,
|
|
// since it's always a pointer to static mutable data. rustc_hash was designed for low-bit entropy.
|
|
type CallsiteMap<T> =
|
|
papaya::HashMap<callsite::Identifier, T, std::hash::BuildHasherDefault<rustc_hash::FxHasher>>;
|
|
|
|
/// Implements tracing layer to handle events specific to logging.
|
|
struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
|
|
clock: C,
|
|
writer: W,
|
|
|
|
/// tracks which fields of each **event** are duplicates
|
|
skipped_field_indices: CallsiteMap<SkippedFieldIndices>,
|
|
|
|
span_info: CallsiteMap<CallsiteSpanInfo>,
|
|
|
|
/// Fields we want to keep track of in a separate json object.
|
|
extract_fields: &'static [&'static str],
|
|
}
|
|
|
|
impl<C: Clock, W: MakeWriter> JsonLoggingLayer<C, W> {
|
|
fn new(clock: C, writer: W, extract_fields: &'static [&'static str]) -> Self {
|
|
JsonLoggingLayer {
|
|
clock,
|
|
skipped_field_indices: CallsiteMap::default(),
|
|
span_info: CallsiteMap::default(),
|
|
writer,
|
|
extract_fields,
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn span_info(&self, metadata: &'static Metadata<'static>) -> CallsiteSpanInfo {
|
|
self.span_info
|
|
.pin()
|
|
.get_or_insert_with(metadata.callsite(), || {
|
|
CallsiteSpanInfo::new(metadata, self.extract_fields)
|
|
})
|
|
.clone()
|
|
}
|
|
}
|
|
|
|
impl<S, C: Clock + 'static, W: MakeWriter + 'static> Layer<S> for JsonLoggingLayer<C, W>
|
|
where
|
|
S: Subscriber + for<'a> LookupSpan<'a>,
|
|
{
|
|
fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
|
|
use std::io::Write;
|
|
|
|
// TODO: consider special tracing subscriber to grab timestamp very
|
|
// early, before OTel machinery, and add as event extension.
|
|
let now = self.clock.now();
|
|
|
|
let res: io::Result<()> = EVENT_FORMATTER.with(|f| {
|
|
let mut borrow = f.try_borrow_mut();
|
|
let formatter = match borrow.as_deref_mut() {
|
|
Ok(formatter) => formatter,
|
|
// If the thread local formatter is borrowed,
|
|
// then we likely hit an edge case were we panicked during formatting.
|
|
// We allow the logging to proceed with an uncached formatter.
|
|
Err(_) => &mut EventFormatter::new(),
|
|
};
|
|
|
|
formatter.reset();
|
|
formatter.format(
|
|
now,
|
|
event,
|
|
&ctx,
|
|
&self.skipped_field_indices,
|
|
self.extract_fields,
|
|
)?;
|
|
self.writer.make_writer().write_all(formatter.buffer())
|
|
});
|
|
|
|
// In case logging fails we generate a simpler JSON object.
|
|
if let Err(err) = res
|
|
&& let Ok(mut line) = serde_json::to_vec(&serde_json::json!( {
|
|
"timestamp": now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
|
|
"level": "ERROR",
|
|
"message": format_args!("cannot log event: {err:?}"),
|
|
"fields": {
|
|
"event": format_args!("{event:?}"),
|
|
},
|
|
}))
|
|
{
|
|
line.push(b'\n');
|
|
self.writer.make_writer().write_all(&line).ok();
|
|
}
|
|
}
|
|
|
|
/// Registers a SpanFields instance as span extension.
|
|
fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
|
|
let span = ctx.span(id).expect("span must exist");
|
|
|
|
let mut fields = SpanFields::new(self.span_info(span.metadata()));
|
|
attrs.record(&mut fields);
|
|
|
|
// This is a new span: the extensions should not be locked
|
|
// unless some layer spawned a thread to process this span.
|
|
// I don't think any layers do that.
|
|
span.extensions_mut().insert(fields);
|
|
}
|
|
|
|
fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
|
|
let span = ctx.span(id).expect("span must exist");
|
|
|
|
// assumption: `on_record` is rarely called.
|
|
// assumption: a span being updated by one thread,
|
|
// and formatted by another thread is even rarer.
|
|
let mut ext = span.extensions_mut();
|
|
if let Some(fields) = ext.get_mut::<SpanFields>() {
|
|
values.record(fields);
|
|
}
|
|
}
|
|
|
|
/// Called (lazily) roughly once per event/span instance. We quickly check
|
|
/// for duplicate field names and record duplicates as skippable. Last field wins.
|
|
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
|
|
debug_assert!(
|
|
metadata.fields().len() <= MAX_TRACING_FIELDS,
|
|
"callsite {metadata:?} has too many fields."
|
|
);
|
|
|
|
if !metadata.is_event() {
|
|
// register the span info.
|
|
self.span_info(metadata);
|
|
// Must not be never because we wouldn't get trace and span data.
|
|
return Interest::always();
|
|
}
|
|
|
|
let mut field_indices = SkippedFieldIndices::default();
|
|
let mut seen_fields = HashMap::new();
|
|
for field in metadata.fields() {
|
|
if let Some(old_index) = seen_fields.insert(field.name(), field.index()) {
|
|
field_indices.set(old_index);
|
|
}
|
|
}
|
|
|
|
if !field_indices.is_empty() {
|
|
self.skipped_field_indices
|
|
.pin()
|
|
.insert(metadata.callsite(), field_indices);
|
|
}
|
|
|
|
Interest::always()
|
|
}
|
|
}
|
|
|
|
/// Any span info that is fixed to a particular callsite. Not variable between span instances.
|
|
#[derive(Clone)]
|
|
struct CallsiteSpanInfo {
|
|
/// index of each field to extract. usize::MAX if not found.
|
|
extract: Arc<[usize]>,
|
|
|
|
/// tracks the fixed "callsite ID" for each span.
|
|
/// note: this is not stable between runs.
|
|
normalized_name: Arc<str>,
|
|
}
|
|
|
|
impl CallsiteSpanInfo {
|
|
fn new(metadata: &'static Metadata<'static>, extract_fields: &[&'static str]) -> Self {
|
|
// Start at 1 to reserve 0 for default.
|
|
static COUNTER: AtomicU32 = AtomicU32::new(1);
|
|
|
|
let names: Vec<&'static str> = metadata.fields().iter().map(|f| f.name()).collect();
|
|
|
|
// get all the indices of span fields we want to focus
|
|
let extract = extract_fields
|
|
.iter()
|
|
// use rposition, since we want last match wins.
|
|
.map(|f1| names.iter().rposition(|f2| f1 == f2).unwrap_or(usize::MAX))
|
|
.collect();
|
|
|
|
// normalized_name is unique for each callsite, but it is not
|
|
// unified across separate proxy instances.
|
|
// todo: can we do better here?
|
|
let cid = COUNTER.fetch_add(1, Ordering::Relaxed);
|
|
let normalized_name = format!("{}#{cid}", metadata.name()).into();
|
|
|
|
Self {
|
|
extract,
|
|
normalized_name,
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Stores span field values recorded during the spans lifetime.
|
|
struct SpanFields {
|
|
values: [serde_json::Value; MAX_TRACING_FIELDS],
|
|
|
|
/// cached span info so we can avoid extra hashmap lookups in the hot path.
|
|
span_info: CallsiteSpanInfo,
|
|
}
|
|
|
|
impl SpanFields {
|
|
fn new(span_info: CallsiteSpanInfo) -> Self {
|
|
Self {
|
|
span_info,
|
|
values: [const { serde_json::Value::Null }; MAX_TRACING_FIELDS],
|
|
}
|
|
}
|
|
}
|
|
|
|
impl tracing::field::Visit for SpanFields {
|
|
#[inline]
|
|
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
|
|
self.values[field.index()] = serde_json::Value::from(value);
|
|
}
|
|
|
|
#[inline]
|
|
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
|
|
self.values[field.index()] = serde_json::Value::from(value);
|
|
}
|
|
|
|
#[inline]
|
|
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
|
|
self.values[field.index()] = serde_json::Value::from(value);
|
|
}
|
|
|
|
#[inline]
|
|
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
|
|
if let Ok(value) = i64::try_from(value) {
|
|
self.values[field.index()] = serde_json::Value::from(value);
|
|
} else {
|
|
self.values[field.index()] = serde_json::Value::from(format!("{value}"));
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
|
|
if let Ok(value) = u64::try_from(value) {
|
|
self.values[field.index()] = serde_json::Value::from(value);
|
|
} else {
|
|
self.values[field.index()] = serde_json::Value::from(format!("{value}"));
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
|
|
self.values[field.index()] = serde_json::Value::from(value);
|
|
}
|
|
|
|
#[inline]
|
|
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
|
|
self.values[field.index()] = serde_json::Value::from(value);
|
|
}
|
|
|
|
#[inline]
|
|
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
|
|
self.values[field.index()] = serde_json::Value::from(value);
|
|
}
|
|
|
|
#[inline]
|
|
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
|
|
self.values[field.index()] = serde_json::Value::from(format!("{value:?}"));
|
|
}
|
|
|
|
#[inline]
|
|
fn record_error(
|
|
&mut self,
|
|
field: &tracing::field::Field,
|
|
value: &(dyn std::error::Error + 'static),
|
|
) {
|
|
self.values[field.index()] = serde_json::Value::from(format!("{value}"));
|
|
}
|
|
}
|
|
|
|
/// List of field indices skipped during logging. Can list duplicate fields or
|
|
/// metafields not meant to be logged.
|
|
#[derive(Copy, Clone, Default)]
|
|
struct SkippedFieldIndices {
|
|
// 32-bits is large enough for `MAX_TRACING_FIELDS`
|
|
bits: u32,
|
|
}
|
|
|
|
impl SkippedFieldIndices {
|
|
#[inline]
|
|
fn is_empty(self) -> bool {
|
|
self.bits == 0
|
|
}
|
|
|
|
#[inline]
|
|
fn set(&mut self, index: usize) {
|
|
debug_assert!(index <= 32, "index out of bounds of 32-bit set");
|
|
self.bits |= 1 << index;
|
|
}
|
|
|
|
#[inline]
|
|
fn contains(self, index: usize) -> bool {
|
|
self.bits & (1 << index) != 0
|
|
}
|
|
}
|
|
|
|
/// Formats a tracing event and writes JSON to its internal buffer including a newline.
|
|
// TODO: buffer capacity management, truncate if too large
|
|
struct EventFormatter {
|
|
logline_buffer: Vec<u8>,
|
|
}
|
|
|
|
impl EventFormatter {
|
|
#[inline]
|
|
const fn new() -> Self {
|
|
EventFormatter {
|
|
logline_buffer: Vec::new(),
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn buffer(&self) -> &[u8] {
|
|
&self.logline_buffer
|
|
}
|
|
|
|
#[inline]
|
|
fn reset(&mut self) {
|
|
self.logline_buffer.clear();
|
|
}
|
|
|
|
fn format<S>(
|
|
&mut self,
|
|
now: DateTime<Utc>,
|
|
event: &Event<'_>,
|
|
ctx: &Context<'_, S>,
|
|
skipped_field_indices: &CallsiteMap<SkippedFieldIndices>,
|
|
extract_fields: &'static [&'static str],
|
|
) -> io::Result<()>
|
|
where
|
|
S: Subscriber + for<'a> LookupSpan<'a>,
|
|
{
|
|
let timestamp = now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
|
|
|
|
use tracing_log::NormalizeEvent;
|
|
let normalized_meta = event.normalized_metadata();
|
|
let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());
|
|
|
|
let skipped_field_indices = skipped_field_indices
|
|
.pin()
|
|
.get(&meta.callsite())
|
|
.copied()
|
|
.unwrap_or_default();
|
|
|
|
let mut serialize = || {
|
|
let mut serializer = serde_json::Serializer::new(&mut self.logline_buffer);
|
|
|
|
let mut serializer = serializer.serialize_map(None)?;
|
|
|
|
// Timestamp comes first, so raw lines can be sorted by timestamp.
|
|
serializer.serialize_entry("timestamp", ×tamp)?;
|
|
|
|
// Level next.
|
|
serializer.serialize_entry("level", &meta.level().as_str())?;
|
|
|
|
// Message next.
|
|
serializer.serialize_key("message")?;
|
|
let mut message_extractor =
|
|
MessageFieldExtractor::new(serializer, skipped_field_indices);
|
|
event.record(&mut message_extractor);
|
|
let mut serializer = message_extractor.into_serializer()?;
|
|
|
|
// Direct message fields.
|
|
let mut fields_present = FieldsPresent(false, skipped_field_indices);
|
|
event.record(&mut fields_present);
|
|
if fields_present.0 {
|
|
serializer.serialize_entry(
|
|
"fields",
|
|
&SerializableEventFields(event, skipped_field_indices),
|
|
)?;
|
|
}
|
|
|
|
let spans = SerializableSpans {
|
|
// collect all spans from parent to root.
|
|
spans: ctx
|
|
.event_span(event)
|
|
.map_or(vec![], |parent| parent.scope().collect()),
|
|
extracted: ExtractedSpanFields::new(extract_fields),
|
|
};
|
|
serializer.serialize_entry("spans", &spans)?;
|
|
|
|
// TODO: thread-local cache?
|
|
let pid = std::process::id();
|
|
// Skip adding pid 1 to reduce noise for services running in containers.
|
|
if pid != 1 {
|
|
serializer.serialize_entry("process_id", &pid)?;
|
|
}
|
|
|
|
THREAD_ID.with(|tid| serializer.serialize_entry("thread_id", tid))?;
|
|
|
|
// TODO: tls cache? name could change
|
|
if let Some(thread_name) = std::thread::current().name()
|
|
&& !thread_name.is_empty()
|
|
&& thread_name != "tokio-runtime-worker"
|
|
{
|
|
serializer.serialize_entry("thread_name", thread_name)?;
|
|
}
|
|
|
|
if let Some(task_id) = tokio::task::try_id() {
|
|
serializer.serialize_entry("task_id", &format_args!("{task_id}"))?;
|
|
}
|
|
|
|
serializer.serialize_entry("target", meta.target())?;
|
|
|
|
// Skip adding module if it's the same as target.
|
|
if let Some(module) = meta.module_path()
|
|
&& module != meta.target()
|
|
{
|
|
serializer.serialize_entry("module", module)?;
|
|
}
|
|
|
|
if let Some(file) = meta.file() {
|
|
if let Some(line) = meta.line() {
|
|
serializer.serialize_entry("src", &format_args!("{file}:{line}"))?;
|
|
} else {
|
|
serializer.serialize_entry("src", file)?;
|
|
}
|
|
}
|
|
|
|
{
|
|
let otel_context = Span::current().context();
|
|
let otel_spanref = otel_context.span();
|
|
let span_context = otel_spanref.span_context();
|
|
if span_context.is_valid() {
|
|
serializer.serialize_entry(
|
|
"trace_id",
|
|
&format_args!("{}", span_context.trace_id()),
|
|
)?;
|
|
}
|
|
}
|
|
|
|
if spans.extracted.has_values() {
|
|
// TODO: add fields from event, too?
|
|
serializer.serialize_entry("extract", &spans.extracted)?;
|
|
}
|
|
|
|
serializer.end()
|
|
};
|
|
|
|
serialize().map_err(io::Error::other)?;
|
|
self.logline_buffer.push(b'\n');
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
/// Extracts the message field that's mixed will other fields.
|
|
struct MessageFieldExtractor<S: serde::ser::SerializeMap> {
|
|
serializer: S,
|
|
skipped_field_indices: SkippedFieldIndices,
|
|
state: Option<Result<(), S::Error>>,
|
|
}
|
|
|
|
impl<S: serde::ser::SerializeMap> MessageFieldExtractor<S> {
|
|
#[inline]
|
|
fn new(serializer: S, skipped_field_indices: SkippedFieldIndices) -> Self {
|
|
Self {
|
|
serializer,
|
|
skipped_field_indices,
|
|
state: None,
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn into_serializer(mut self) -> Result<S, S::Error> {
|
|
match self.state {
|
|
Some(Ok(())) => {}
|
|
Some(Err(err)) => return Err(err),
|
|
None => self.serializer.serialize_value("")?,
|
|
}
|
|
Ok(self.serializer)
|
|
}
|
|
|
|
#[inline]
|
|
fn accept_field(&self, field: &tracing::field::Field) -> bool {
|
|
self.state.is_none()
|
|
&& field.name() == MESSAGE_FIELD
|
|
&& !self.skipped_field_indices.contains(field.index())
|
|
}
|
|
}
|
|
|
|
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtractor<S> {
|
|
#[inline]
|
|
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
|
|
if self.accept_field(field) {
|
|
self.state = Some(self.serializer.serialize_value(&value));
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
|
|
if self.accept_field(field) {
|
|
self.state = Some(self.serializer.serialize_value(&value));
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
|
|
if self.accept_field(field) {
|
|
self.state = Some(self.serializer.serialize_value(&value));
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
|
|
if self.accept_field(field) {
|
|
self.state = Some(self.serializer.serialize_value(&value));
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
|
|
if self.accept_field(field) {
|
|
self.state = Some(self.serializer.serialize_value(&value));
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
|
|
if self.accept_field(field) {
|
|
self.state = Some(self.serializer.serialize_value(&value));
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
|
|
if self.accept_field(field) {
|
|
self.state = Some(self.serializer.serialize_value(&format_args!("{value:x?}")));
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
|
|
if self.accept_field(field) {
|
|
self.state = Some(self.serializer.serialize_value(&value));
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
|
|
if self.accept_field(field) {
|
|
self.state = Some(self.serializer.serialize_value(&format_args!("{value:?}")));
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn record_error(
|
|
&mut self,
|
|
field: &tracing::field::Field,
|
|
value: &(dyn std::error::Error + 'static),
|
|
) {
|
|
if self.accept_field(field) {
|
|
self.state = Some(self.serializer.serialize_value(&format_args!("{value}")));
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Checks if there's any fields and field values present. If not, the JSON subobject
|
|
/// can be skipped.
|
|
// This is entirely optional and only cosmetic, though maybe helps a
|
|
// bit during log parsing in dashboards when there's no field with empty object.
|
|
struct FieldsPresent(pub bool, SkippedFieldIndices);
|
|
|
|
// Even though some methods have an overhead (error, bytes) it is assumed the
|
|
// compiler won't include this since we ignore the value entirely.
|
|
impl tracing::field::Visit for FieldsPresent {
|
|
#[inline]
|
|
fn record_debug(&mut self, field: &tracing::field::Field, _: &dyn std::fmt::Debug) {
|
|
if !self.1.contains(field.index())
|
|
&& field.name() != MESSAGE_FIELD
|
|
&& !field.name().starts_with("log.")
|
|
{
|
|
self.0 |= true;
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Serializes the fields directly supplied with a log event.
|
|
struct SerializableEventFields<'a, 'event>(&'a tracing::Event<'event>, SkippedFieldIndices);
|
|
|
|
impl serde::ser::Serialize for SerializableEventFields<'_, '_> {
|
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
|
where
|
|
S: Serializer,
|
|
{
|
|
use serde::ser::SerializeMap;
|
|
let serializer = serializer.serialize_map(None)?;
|
|
let mut message_skipper = MessageFieldSkipper::new(serializer, self.1);
|
|
self.0.record(&mut message_skipper);
|
|
let serializer = message_skipper.into_serializer()?;
|
|
serializer.end()
|
|
}
|
|
}
|
|
|
|
/// A tracing field visitor that skips the message field.
|
|
struct MessageFieldSkipper<S: serde::ser::SerializeMap> {
|
|
serializer: S,
|
|
skipped_field_indices: SkippedFieldIndices,
|
|
state: Result<(), S::Error>,
|
|
}
|
|
|
|
impl<S: serde::ser::SerializeMap> MessageFieldSkipper<S> {
|
|
#[inline]
|
|
fn new(serializer: S, skipped_field_indices: SkippedFieldIndices) -> Self {
|
|
Self {
|
|
serializer,
|
|
skipped_field_indices,
|
|
state: Ok(()),
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn accept_field(&self, field: &tracing::field::Field) -> bool {
|
|
self.state.is_ok()
|
|
&& field.name() != MESSAGE_FIELD
|
|
&& !field.name().starts_with("log.")
|
|
&& !self.skipped_field_indices.contains(field.index())
|
|
}
|
|
|
|
#[inline]
|
|
fn into_serializer(self) -> Result<S, S::Error> {
|
|
self.state?;
|
|
Ok(self.serializer)
|
|
}
|
|
}
|
|
|
|
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<S> {
|
|
#[inline]
|
|
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
|
|
if self.accept_field(field) {
|
|
self.state = self.serializer.serialize_entry(field.name(), &value);
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
|
|
if self.accept_field(field) {
|
|
self.state = self.serializer.serialize_entry(field.name(), &value);
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
|
|
if self.accept_field(field) {
|
|
self.state = self.serializer.serialize_entry(field.name(), &value);
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
|
|
if self.accept_field(field) {
|
|
self.state = self.serializer.serialize_entry(field.name(), &value);
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
|
|
if self.accept_field(field) {
|
|
self.state = self.serializer.serialize_entry(field.name(), &value);
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
|
|
if self.accept_field(field) {
|
|
self.state = self.serializer.serialize_entry(field.name(), &value);
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
|
|
if self.accept_field(field) {
|
|
self.state = self
|
|
.serializer
|
|
.serialize_entry(field.name(), &format_args!("{value:x?}"));
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
|
|
if self.accept_field(field) {
|
|
self.state = self.serializer.serialize_entry(field.name(), &value);
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
|
|
if self.accept_field(field) {
|
|
self.state = self
|
|
.serializer
|
|
.serialize_entry(field.name(), &format_args!("{value:?}"));
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn record_error(
|
|
&mut self,
|
|
field: &tracing::field::Field,
|
|
value: &(dyn std::error::Error + 'static),
|
|
) {
|
|
if self.accept_field(field) {
|
|
self.state = self.serializer.serialize_value(&format_args!("{value}"));
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Serializes the span stack from root to leaf (parent of event) as object
|
|
/// with the span names as keys. To prevent collision we append a numberic value
|
|
/// to the name. Also, collects any span fields we're interested in. Last one
|
|
/// wins.
|
|
struct SerializableSpans<'ctx, S>
|
|
where
|
|
S: for<'lookup> LookupSpan<'lookup>,
|
|
{
|
|
spans: Vec<SpanRef<'ctx, S>>,
|
|
extracted: ExtractedSpanFields,
|
|
}
|
|
|
|
impl<S> serde::ser::Serialize for SerializableSpans<'_, S>
|
|
where
|
|
S: for<'lookup> LookupSpan<'lookup>,
|
|
{
|
|
fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
|
|
where
|
|
Ser: serde::ser::Serializer,
|
|
{
|
|
let mut serializer = serializer.serialize_map(None)?;
|
|
|
|
for span in self.spans.iter().rev() {
|
|
let ext = span.extensions();
|
|
|
|
// all spans should have this extension.
|
|
let Some(fields) = ext.get() else { continue };
|
|
|
|
self.extracted.layer_span(fields);
|
|
|
|
let SpanFields { values, span_info } = fields;
|
|
serializer.serialize_entry(
|
|
&*span_info.normalized_name,
|
|
&SerializableSpanFields {
|
|
fields: span.metadata().fields(),
|
|
values,
|
|
},
|
|
)?;
|
|
}
|
|
|
|
serializer.end()
|
|
}
|
|
}
|
|
|
|
/// Serializes the span fields as object.
|
|
struct SerializableSpanFields<'span> {
|
|
fields: &'span tracing::field::FieldSet,
|
|
values: &'span [serde_json::Value; MAX_TRACING_FIELDS],
|
|
}
|
|
|
|
impl serde::ser::Serialize for SerializableSpanFields<'_> {
|
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
|
where
|
|
S: serde::ser::Serializer,
|
|
{
|
|
let mut serializer = serializer.serialize_map(None)?;
|
|
|
|
for (field, value) in std::iter::zip(self.fields, self.values) {
|
|
if value.is_null() {
|
|
continue;
|
|
}
|
|
serializer.serialize_entry(field.name(), value)?;
|
|
}
|
|
|
|
serializer.end()
|
|
}
|
|
}
|
|
|
|
struct ExtractedSpanFields {
|
|
names: &'static [&'static str],
|
|
values: RefCell<Vec<serde_json::Value>>,
|
|
}
|
|
|
|
impl ExtractedSpanFields {
|
|
fn new(names: &'static [&'static str]) -> Self {
|
|
ExtractedSpanFields {
|
|
names,
|
|
values: RefCell::new(vec![serde_json::Value::Null; names.len()]),
|
|
}
|
|
}
|
|
|
|
fn layer_span(&self, fields: &SpanFields) {
|
|
let mut v = self.values.borrow_mut();
|
|
let SpanFields { values, span_info } = fields;
|
|
|
|
// extract the fields
|
|
for (i, &j) in span_info.extract.iter().enumerate() {
|
|
let Some(value) = values.get(j) else { continue };
|
|
|
|
if !value.is_null() {
|
|
// TODO: replace clone with reference, if possible.
|
|
v[i] = value.clone();
|
|
}
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn has_values(&self) -> bool {
|
|
self.values.borrow().iter().any(|v| !v.is_null())
|
|
}
|
|
}
|
|
|
|
impl serde::ser::Serialize for ExtractedSpanFields {
|
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
|
where
|
|
S: serde::ser::Serializer,
|
|
{
|
|
let mut serializer = serializer.serialize_map(None)?;
|
|
|
|
let values = self.values.borrow();
|
|
for (key, value) in std::iter::zip(self.names, &*values) {
|
|
if value.is_null() {
|
|
continue;
|
|
}
|
|
|
|
serializer.serialize_entry(key, value)?;
|
|
}
|
|
|
|
serializer.end()
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use std::sync::{Arc, Mutex, MutexGuard};
|
|
|
|
use assert_json_diff::assert_json_eq;
|
|
use tracing::info_span;
|
|
|
|
use super::*;
|
|
|
|
struct TestClock {
|
|
current_time: Mutex<DateTime<Utc>>,
|
|
}
|
|
|
|
impl Clock for Arc<TestClock> {
|
|
fn now(&self) -> DateTime<Utc> {
|
|
*self.current_time.lock().expect("poisoned")
|
|
}
|
|
}
|
|
|
|
struct VecWriter<'a> {
|
|
buffer: MutexGuard<'a, Vec<u8>>,
|
|
}
|
|
|
|
impl MakeWriter for Arc<Mutex<Vec<u8>>> {
|
|
fn make_writer(&self) -> impl io::Write {
|
|
VecWriter {
|
|
buffer: self.lock().expect("poisoned"),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl io::Write for VecWriter<'_> {
|
|
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
|
self.buffer.write(buf)
|
|
}
|
|
|
|
fn flush(&mut self) -> io::Result<()> {
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_field_collection() {
|
|
let clock = Arc::new(TestClock {
|
|
current_time: Mutex::new(Utc::now()),
|
|
});
|
|
let buffer = Arc::new(Mutex::new(Vec::new()));
|
|
let log_layer = JsonLoggingLayer {
|
|
clock: clock.clone(),
|
|
skipped_field_indices: papaya::HashMap::default(),
|
|
span_info: papaya::HashMap::default(),
|
|
writer: buffer.clone(),
|
|
extract_fields: &["x"],
|
|
};
|
|
|
|
let registry = tracing_subscriber::Registry::default().with(log_layer);
|
|
|
|
tracing::subscriber::with_default(registry, || {
|
|
info_span!("some_span", x = 24).in_scope(|| {
|
|
info_span!("some_span", x = 40, x = 41, x = 42).in_scope(|| {
|
|
tracing::error!(
|
|
a = 1,
|
|
a = 2,
|
|
a = 3,
|
|
message = "explicit message field",
|
|
"implicit message field"
|
|
);
|
|
});
|
|
});
|
|
});
|
|
|
|
let buffer = Arc::try_unwrap(buffer)
|
|
.expect("no other reference")
|
|
.into_inner()
|
|
.expect("poisoned");
|
|
let actual: serde_json::Value = serde_json::from_slice(&buffer).expect("valid JSON");
|
|
let expected: serde_json::Value = serde_json::json!(
|
|
{
|
|
"timestamp": clock.now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
|
|
"level": "ERROR",
|
|
"message": "explicit message field",
|
|
"fields": {
|
|
"a": 3,
|
|
},
|
|
"spans": {
|
|
"some_span#1":{
|
|
"x": 24,
|
|
},
|
|
"some_span#2": {
|
|
"x": 42,
|
|
}
|
|
},
|
|
"extract": {
|
|
"x": 42,
|
|
},
|
|
"src": actual.as_object().unwrap().get("src").unwrap().as_str().unwrap(),
|
|
"target": "proxy::logging::tests",
|
|
"process_id": actual.as_object().unwrap().get("process_id").unwrap().as_number().unwrap(),
|
|
"thread_id": actual.as_object().unwrap().get("thread_id").unwrap().as_number().unwrap(),
|
|
"thread_name": "logging::tests::test_field_collection",
|
|
}
|
|
);
|
|
|
|
assert_json_eq!(actual, expected);
|
|
}
|
|
}
|