Merge commit '87915df2f' into problame/standby-horizon-leases

This commit is contained in:
Christian Schwarz
2025-08-06 17:55:06 +02:00
11 changed files with 3396 additions and 361 deletions

View File

@@ -181,6 +181,8 @@ runs:
# Ref https://github.com/neondatabase/neon/issues/4540 # Ref https://github.com/neondatabase/neon/issues/4540
# cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run) # cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
cov_prefix=() cov_prefix=()
# Explicitly set LLVM_PROFILE_FILE to /dev/null to avoid writing *.profraw files
export LLVM_PROFILE_FILE=/dev/null
else else
cov_prefix=() cov_prefix=()
fi fi

View File

@@ -87,22 +87,27 @@ jobs:
uses: ./.github/workflows/build-build-tools-image.yml uses: ./.github/workflows/build-build-tools-image.yml
secrets: inherit secrets: inherit
lint-openapi-spec: lint-yamls:
runs-on: ubuntu-22.04 needs: [ meta, check-permissions, build-build-tools-image ]
needs: [ meta, check-permissions ]
# We do need to run this in `.*-rc-pr` because of hotfixes. # We do need to run this in `.*-rc-pr` because of hotfixes.
if: ${{ contains(fromJSON('["pr", "push-main", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }} if: ${{ contains(fromJSON('["pr", "push-main", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
runs-on: [ self-hosted, small ]
container:
image: ${{ needs.build-build-tools-image.outputs.image }}
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
steps: steps:
- name: Harden the runner (Audit all outbound calls) - name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0 uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with: with:
egress-policy: audit egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: docker/login-action@74a5d142397b4f367a81961eba4e8cd7edddf772 # v3.4.0
with: - run: make -C compute manifest-schema-validation
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- run: make lint-openapi-spec - run: make lint-openapi-spec
check-codestyle-python: check-codestyle-python:
@@ -217,28 +222,6 @@ jobs:
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
secrets: inherit secrets: inherit
validate-compute-manifest:
runs-on: ubuntu-22.04
needs: [ meta, check-permissions ]
# We do need to run this in `.*-rc-pr` because of hotfixes.
if: ${{ contains(fromJSON('["pr", "push-main", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Set up Node.js
uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0
with:
node-version: '24'
- name: Validate manifest against schema
run: |
make -C compute manifest-schema-validation
build-and-test-locally: build-and-test-locally:
needs: [ meta, build-build-tools-image ] needs: [ meta, build-build-tools-image ]
# We do need to run this in `.*-rc-pr` because of hotfixes. # We do need to run this in `.*-rc-pr` because of hotfixes.

3
.gitignore vendored
View File

@@ -29,3 +29,6 @@ docker-compose/docker-compose-parallel.yml
# pgindent typedef lists # pgindent typedef lists
*.list *.list
# Node
**/node_modules/

View File

@@ -220,11 +220,15 @@ neon-pgindent: postgres-v17-pg-bsd-indent neon-pg-ext-v17
setup-pre-commit-hook: setup-pre-commit-hook:
ln -s -f $(ROOT_PROJECT_DIR)/pre-commit.py .git/hooks/pre-commit ln -s -f $(ROOT_PROJECT_DIR)/pre-commit.py .git/hooks/pre-commit
build-tools/node_modules: build-tools/package.json
cd build-tools && $(if $(CI),npm ci,npm install)
touch build-tools/node_modules
.PHONY: lint-openapi-spec .PHONY: lint-openapi-spec
lint-openapi-spec: lint-openapi-spec: build-tools/node_modules
# operation-2xx-response: pageserver timeline delete returns 404 on success # operation-2xx-response: pageserver timeline delete returns 404 on success
find . -iname "openapi_spec.y*ml" -exec\ find . -iname "openapi_spec.y*ml" -exec\
docker run --rm -v ${PWD}:/spec ghcr.io/redocly/cli:1.34.4\ npx --prefix=build-tools/ redocly\
--skip-rule=operation-operationId --skip-rule=operation-summary --extends=minimal\ --skip-rule=operation-operationId --skip-rule=operation-summary --extends=minimal\
--skip-rule=no-server-example.com --skip-rule=operation-2xx-response\ --skip-rule=no-server-example.com --skip-rule=operation-2xx-response\
lint {} \+ lint {} \+

View File

@@ -188,6 +188,12 @@ RUN curl -fsSL 'https://apt.llvm.org/llvm-snapshot.gpg.key' | apt-key add - \
&& bash -c 'for f in /usr/bin/clang*-${LLVM_VERSION} /usr/bin/llvm*-${LLVM_VERSION}; do ln -s "${f}" "${f%-${LLVM_VERSION}}"; done' \ && bash -c 'for f in /usr/bin/clang*-${LLVM_VERSION} /usr/bin/llvm*-${LLVM_VERSION}; do ln -s "${f}" "${f%-${LLVM_VERSION}}"; done' \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# Install node
ENV NODE_VERSION=24
RUN curl -fsSL https://deb.nodesource.com/setup_${NODE_VERSION}.x | bash - \
&& apt install -y nodejs \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# Install docker # Install docker
RUN curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg \ RUN curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg \
&& echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/debian ${DEBIAN_VERSION} stable" > /etc/apt/sources.list.d/docker.list \ && echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/debian ${DEBIAN_VERSION} stable" > /etc/apt/sources.list.d/docker.list \

3189
build-tools/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

8
build-tools/package.json Normal file
View File

@@ -0,0 +1,8 @@
{
"name": "build-tools",
"private": true,
"devDependencies": {
"@redocly/cli": "1.34.4",
"@sourcemeta/jsonschema": "10.0.0"
}
}

View File

@@ -50,9 +50,9 @@ jsonnetfmt-format:
jsonnetfmt --in-place $(jsonnet_files) jsonnetfmt --in-place $(jsonnet_files)
.PHONY: manifest-schema-validation .PHONY: manifest-schema-validation
manifest-schema-validation: node_modules manifest-schema-validation: ../build-tools/node_modules
node_modules/.bin/jsonschema validate -d https://json-schema.org/draft/2020-12/schema manifest.schema.json manifest.yaml npx --prefix=../build-tools/ jsonschema validate -d https://json-schema.org/draft/2020-12/schema manifest.schema.json manifest.yaml
node_modules: package.json ../build-tools/node_modules: ../build-tools/package.json
npm install cd ../build-tools && $(if $(CI),npm ci,npm install)
touch node_modules touch ../build-tools/node_modules

View File

@@ -1,7 +0,0 @@
{
"name": "neon-compute",
"private": true,
"dependencies": {
"@sourcemeta/jsonschema": "9.3.4"
}
}

View File

@@ -6,7 +6,6 @@ use std::{env, io};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use opentelemetry::trace::TraceContextExt; use opentelemetry::trace::TraceContextExt;
use serde::ser::{SerializeMap, Serializer};
use tracing::subscriber::Interest; use tracing::subscriber::Interest;
use tracing::{Event, Metadata, Span, Subscriber, callsite, span}; use tracing::{Event, Metadata, Span, Subscriber, callsite, span};
use tracing_opentelemetry::OpenTelemetrySpanExt; use tracing_opentelemetry::OpenTelemetrySpanExt;
@@ -16,7 +15,9 @@ use tracing_subscriber::fmt::time::SystemTime;
use tracing_subscriber::fmt::{FormatEvent, FormatFields}; use tracing_subscriber::fmt::{FormatEvent, FormatFields};
use tracing_subscriber::layer::{Context, Layer}; use tracing_subscriber::layer::{Context, Layer};
use tracing_subscriber::prelude::*; use tracing_subscriber::prelude::*;
use tracing_subscriber::registry::{LookupSpan, SpanRef}; use tracing_subscriber::registry::LookupSpan;
use crate::metrics::Metrics;
/// Initialize logging and OpenTelemetry tracing and exporter. /// Initialize logging and OpenTelemetry tracing and exporter.
/// ///
@@ -249,7 +250,7 @@ where
// early, before OTel machinery, and add as event extension. // early, before OTel machinery, and add as event extension.
let now = self.clock.now(); let now = self.clock.now();
let res: io::Result<()> = EVENT_FORMATTER.with(|f| { EVENT_FORMATTER.with(|f| {
let mut borrow = f.try_borrow_mut(); let mut borrow = f.try_borrow_mut();
let formatter = match borrow.as_deref_mut() { let formatter = match borrow.as_deref_mut() {
Ok(formatter) => formatter, Ok(formatter) => formatter,
@@ -259,31 +260,19 @@ where
Err(_) => &mut EventFormatter::new(), Err(_) => &mut EventFormatter::new(),
}; };
formatter.reset();
formatter.format( formatter.format(
now, now,
event, event,
&ctx, &ctx,
&self.skipped_field_indices, &self.skipped_field_indices,
self.extract_fields, self.extract_fields,
)?; );
self.writer.make_writer().write_all(formatter.buffer())
});
// In case logging fails we generate a simpler JSON object. let mut writer = self.writer.make_writer();
if let Err(err) = res if writer.write_all(formatter.buffer()).is_err() {
&& let Ok(mut line) = serde_json::to_vec(&serde_json::json!( { Metrics::get().proxy.logging_errors_count.inc();
"timestamp": now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true), }
"level": "ERROR", });
"message": format_args!("cannot log event: {err:?}"),
"fields": {
"event": format_args!("{event:?}"),
},
}))
{
line.push(b'\n');
self.writer.make_writer().write_all(&line).ok();
}
} }
/// Registers a SpanFields instance as span extension. /// Registers a SpanFields instance as span extension.
@@ -382,9 +371,24 @@ impl CallsiteSpanInfo {
} }
} }
#[derive(Clone)]
struct RawValue(Box<[u8]>);
impl RawValue {
fn new(v: impl json::ValueEncoder) -> Self {
Self(json::value_to_vec!(|val| v.encode(val)).into_boxed_slice())
}
}
impl json::ValueEncoder for &RawValue {
fn encode(self, v: json::ValueSer<'_>) {
v.write_raw_json(&self.0);
}
}
/// Stores span field values recorded during the spans lifetime. /// Stores span field values recorded during the spans lifetime.
struct SpanFields { struct SpanFields {
values: [serde_json::Value; MAX_TRACING_FIELDS], values: [Option<RawValue>; MAX_TRACING_FIELDS],
/// cached span info so we can avoid extra hashmap lookups in the hot path. /// cached span info so we can avoid extra hashmap lookups in the hot path.
span_info: CallsiteSpanInfo, span_info: CallsiteSpanInfo,
@@ -394,7 +398,7 @@ impl SpanFields {
fn new(span_info: CallsiteSpanInfo) -> Self { fn new(span_info: CallsiteSpanInfo) -> Self {
Self { Self {
span_info, span_info,
values: [const { serde_json::Value::Null }; MAX_TRACING_FIELDS], values: [const { None }; MAX_TRACING_FIELDS],
} }
} }
} }
@@ -402,55 +406,55 @@ impl SpanFields {
impl tracing::field::Visit for SpanFields { impl tracing::field::Visit for SpanFields {
#[inline] #[inline]
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
self.values[field.index()] = serde_json::Value::from(value); self.values[field.index()] = Some(RawValue::new(value));
} }
#[inline] #[inline]
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
self.values[field.index()] = serde_json::Value::from(value); self.values[field.index()] = Some(RawValue::new(value));
} }
#[inline] #[inline]
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
self.values[field.index()] = serde_json::Value::from(value); self.values[field.index()] = Some(RawValue::new(value));
} }
#[inline] #[inline]
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) { fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
if let Ok(value) = i64::try_from(value) { if let Ok(value) = i64::try_from(value) {
self.values[field.index()] = serde_json::Value::from(value); self.values[field.index()] = Some(RawValue::new(value));
} else { } else {
self.values[field.index()] = serde_json::Value::from(format!("{value}")); self.values[field.index()] = Some(RawValue::new(format_args!("{value}")));
} }
} }
#[inline] #[inline]
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) { fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
if let Ok(value) = u64::try_from(value) { if let Ok(value) = u64::try_from(value) {
self.values[field.index()] = serde_json::Value::from(value); self.values[field.index()] = Some(RawValue::new(value));
} else { } else {
self.values[field.index()] = serde_json::Value::from(format!("{value}")); self.values[field.index()] = Some(RawValue::new(format_args!("{value}")));
} }
} }
#[inline] #[inline]
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
self.values[field.index()] = serde_json::Value::from(value); self.values[field.index()] = Some(RawValue::new(value));
} }
#[inline] #[inline]
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) { fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
self.values[field.index()] = serde_json::Value::from(value); self.values[field.index()] = Some(RawValue::new(value));
} }
#[inline] #[inline]
fn record_str(&mut self, field: &tracing::field::Field, value: &str) { fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
self.values[field.index()] = serde_json::Value::from(value); self.values[field.index()] = Some(RawValue::new(value));
} }
#[inline] #[inline]
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
self.values[field.index()] = serde_json::Value::from(format!("{value:?}")); self.values[field.index()] = Some(RawValue::new(format_args!("{value:?}")));
} }
#[inline] #[inline]
@@ -459,7 +463,7 @@ impl tracing::field::Visit for SpanFields {
field: &tracing::field::Field, field: &tracing::field::Field,
value: &(dyn std::error::Error + 'static), value: &(dyn std::error::Error + 'static),
) { ) {
self.values[field.index()] = serde_json::Value::from(format!("{value}")); self.values[field.index()] = Some(RawValue::new(format_args!("{value}")));
} }
} }
@@ -508,11 +512,6 @@ impl EventFormatter {
&self.logline_buffer &self.logline_buffer
} }
#[inline]
fn reset(&mut self) {
self.logline_buffer.clear();
}
fn format<S>( fn format<S>(
&mut self, &mut self,
now: DateTime<Utc>, now: DateTime<Utc>,
@@ -520,8 +519,7 @@ impl EventFormatter {
ctx: &Context<'_, S>, ctx: &Context<'_, S>,
skipped_field_indices: &CallsiteMap<SkippedFieldIndices>, skipped_field_indices: &CallsiteMap<SkippedFieldIndices>,
extract_fields: &'static [&'static str], extract_fields: &'static [&'static str],
) -> io::Result<()> ) where
where
S: Subscriber + for<'a> LookupSpan<'a>, S: Subscriber + for<'a> LookupSpan<'a>,
{ {
let timestamp = now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true); let timestamp = now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
@@ -536,78 +534,99 @@ impl EventFormatter {
.copied() .copied()
.unwrap_or_default(); .unwrap_or_default();
let mut serialize = || { self.logline_buffer.clear();
let mut serializer = serde_json::Serializer::new(&mut self.logline_buffer); let serializer = json::ValueSer::new(&mut self.logline_buffer);
json::value_as_object!(|serializer| {
let mut serializer = serializer.serialize_map(None)?;
// Timestamp comes first, so raw lines can be sorted by timestamp. // Timestamp comes first, so raw lines can be sorted by timestamp.
serializer.serialize_entry("timestamp", &timestamp)?; serializer.entry("timestamp", &*timestamp);
// Level next. // Level next.
serializer.serialize_entry("level", &meta.level().as_str())?; serializer.entry("level", meta.level().as_str());
// Message next. // Message next.
serializer.serialize_key("message")?;
let mut message_extractor = let mut message_extractor =
MessageFieldExtractor::new(serializer, skipped_field_indices); MessageFieldExtractor::new(serializer.key("message"), skipped_field_indices);
event.record(&mut message_extractor); event.record(&mut message_extractor);
let mut serializer = message_extractor.into_serializer()?; message_extractor.finish();
// Direct message fields. // Direct message fields.
let mut fields_present = FieldsPresent(false, skipped_field_indices); {
event.record(&mut fields_present); let mut message_skipper = MessageFieldSkipper::new(
if fields_present.0 { serializer.key("fields").object(),
serializer.serialize_entry( skipped_field_indices,
"fields", );
&SerializableEventFields(event, skipped_field_indices), event.record(&mut message_skipper);
)?;
// rollback if no fields are present.
if message_skipper.present {
message_skipper.serializer.finish();
}
} }
let spans = SerializableSpans { let mut extracted = ExtractedSpanFields::new(extract_fields);
// collect all spans from parent to root.
spans: ctx let spans = serializer.key("spans");
json::value_as_object!(|spans| {
let parent_spans = ctx
.event_span(event) .event_span(event)
.map_or(vec![], |parent| parent.scope().collect()), .map_or(vec![], |parent| parent.scope().collect());
extracted: ExtractedSpanFields::new(extract_fields),
}; for span in parent_spans.iter().rev() {
serializer.serialize_entry("spans", &spans)?; let ext = span.extensions();
// all spans should have this extension.
let Some(fields) = ext.get() else { continue };
extracted.layer_span(fields);
let SpanFields { values, span_info } = fields;
let span_fields = spans.key(&*span_info.normalized_name);
json::value_as_object!(|span_fields| {
for (field, value) in std::iter::zip(span.metadata().fields(), values) {
if let Some(value) = value {
span_fields.entry(field.name(), value);
}
}
});
}
});
// TODO: thread-local cache? // TODO: thread-local cache?
let pid = std::process::id(); let pid = std::process::id();
// Skip adding pid 1 to reduce noise for services running in containers. // Skip adding pid 1 to reduce noise for services running in containers.
if pid != 1 { if pid != 1 {
serializer.serialize_entry("process_id", &pid)?; serializer.entry("process_id", pid);
} }
THREAD_ID.with(|tid| serializer.serialize_entry("thread_id", tid))?; THREAD_ID.with(|tid| serializer.entry("thread_id", tid));
// TODO: tls cache? name could change // TODO: tls cache? name could change
if let Some(thread_name) = std::thread::current().name() if let Some(thread_name) = std::thread::current().name()
&& !thread_name.is_empty() && !thread_name.is_empty()
&& thread_name != "tokio-runtime-worker" && thread_name != "tokio-runtime-worker"
{ {
serializer.serialize_entry("thread_name", thread_name)?; serializer.entry("thread_name", thread_name);
} }
if let Some(task_id) = tokio::task::try_id() { if let Some(task_id) = tokio::task::try_id() {
serializer.serialize_entry("task_id", &format_args!("{task_id}"))?; serializer.entry("task_id", format_args!("{task_id}"));
} }
serializer.serialize_entry("target", meta.target())?; serializer.entry("target", meta.target());
// Skip adding module if it's the same as target. // Skip adding module if it's the same as target.
if let Some(module) = meta.module_path() if let Some(module) = meta.module_path()
&& module != meta.target() && module != meta.target()
{ {
serializer.serialize_entry("module", module)?; serializer.entry("module", module);
} }
if let Some(file) = meta.file() { if let Some(file) = meta.file() {
if let Some(line) = meta.line() { if let Some(line) = meta.line() {
serializer.serialize_entry("src", &format_args!("{file}:{line}"))?; serializer.entry("src", format_args!("{file}:{line}"));
} else { } else {
serializer.serialize_entry("src", file)?; serializer.entry("src", file);
} }
} }
@@ -616,124 +635,104 @@ impl EventFormatter {
let otel_spanref = otel_context.span(); let otel_spanref = otel_context.span();
let span_context = otel_spanref.span_context(); let span_context = otel_spanref.span_context();
if span_context.is_valid() { if span_context.is_valid() {
serializer.serialize_entry( serializer.entry("trace_id", format_args!("{}", span_context.trace_id()));
"trace_id",
&format_args!("{}", span_context.trace_id()),
)?;
} }
} }
if spans.extracted.has_values() { if extracted.has_values() {
// TODO: add fields from event, too? // TODO: add fields from event, too?
serializer.serialize_entry("extract", &spans.extracted)?; let extract = serializer.key("extract");
json::value_as_object!(|extract| {
for (key, value) in std::iter::zip(extracted.names, extracted.values) {
if let Some(value) = value {
extract.entry(*key, &value);
}
}
});
} }
});
serializer.end()
};
serialize().map_err(io::Error::other)?;
self.logline_buffer.push(b'\n'); self.logline_buffer.push(b'\n');
Ok(())
} }
} }
/// Extracts the message field that's mixed will other fields. /// Extracts the message field that's mixed will other fields.
struct MessageFieldExtractor<S: serde::ser::SerializeMap> { struct MessageFieldExtractor<'buf> {
serializer: S, serializer: Option<json::ValueSer<'buf>>,
skipped_field_indices: SkippedFieldIndices, skipped_field_indices: SkippedFieldIndices,
state: Option<Result<(), S::Error>>,
} }
impl<S: serde::ser::SerializeMap> MessageFieldExtractor<S> { impl<'buf> MessageFieldExtractor<'buf> {
#[inline] #[inline]
fn new(serializer: S, skipped_field_indices: SkippedFieldIndices) -> Self { fn new(serializer: json::ValueSer<'buf>, skipped_field_indices: SkippedFieldIndices) -> Self {
Self { Self {
serializer, serializer: Some(serializer),
skipped_field_indices, skipped_field_indices,
state: None,
} }
} }
#[inline] #[inline]
fn into_serializer(mut self) -> Result<S, S::Error> { fn finish(self) {
match self.state { if let Some(ser) = self.serializer {
Some(Ok(())) => {} ser.value("");
Some(Err(err)) => return Err(err),
None => self.serializer.serialize_value("")?,
} }
Ok(self.serializer)
} }
#[inline] #[inline]
fn accept_field(&self, field: &tracing::field::Field) -> bool { fn record_field(&mut self, field: &tracing::field::Field, v: impl json::ValueEncoder) {
self.state.is_none() if field.name() == MESSAGE_FIELD
&& field.name() == MESSAGE_FIELD
&& !self.skipped_field_indices.contains(field.index()) && !self.skipped_field_indices.contains(field.index())
&& let Some(ser) = self.serializer.take()
{
ser.value(v);
}
} }
} }
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtractor<S> { impl tracing::field::Visit for MessageFieldExtractor<'_> {
#[inline] #[inline]
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
if self.accept_field(field) { self.record_field(field, value);
self.state = Some(self.serializer.serialize_value(&value));
}
} }
#[inline] #[inline]
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
if self.accept_field(field) { self.record_field(field, value);
self.state = Some(self.serializer.serialize_value(&value));
}
} }
#[inline] #[inline]
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
if self.accept_field(field) { self.record_field(field, value);
self.state = Some(self.serializer.serialize_value(&value));
}
} }
#[inline] #[inline]
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) { fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
if self.accept_field(field) { self.record_field(field, value);
self.state = Some(self.serializer.serialize_value(&value));
}
} }
#[inline] #[inline]
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) { fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
if self.accept_field(field) { self.record_field(field, value);
self.state = Some(self.serializer.serialize_value(&value));
}
} }
#[inline] #[inline]
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
if self.accept_field(field) { self.record_field(field, value);
self.state = Some(self.serializer.serialize_value(&value));
}
} }
#[inline] #[inline]
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) { fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
if self.accept_field(field) { self.record_field(field, format_args!("{value:x?}"));
self.state = Some(self.serializer.serialize_value(&format_args!("{value:x?}")));
}
} }
#[inline] #[inline]
fn record_str(&mut self, field: &tracing::field::Field, value: &str) { fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
if self.accept_field(field) { self.record_field(field, value);
self.state = Some(self.serializer.serialize_value(&value));
}
} }
#[inline] #[inline]
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
if self.accept_field(field) { self.record_field(field, format_args!("{value:?}"));
self.state = Some(self.serializer.serialize_value(&format_args!("{value:?}")));
}
} }
#[inline] #[inline]
@@ -742,147 +741,83 @@ impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtracto
field: &tracing::field::Field, field: &tracing::field::Field,
value: &(dyn std::error::Error + 'static), value: &(dyn std::error::Error + 'static),
) { ) {
if self.accept_field(field) { self.record_field(field, format_args!("{value}"));
self.state = Some(self.serializer.serialize_value(&format_args!("{value}")));
}
}
}
/// Checks if there's any fields and field values present. If not, the JSON subobject
/// can be skipped.
// This is entirely optional and only cosmetic, though maybe helps a
// bit during log parsing in dashboards when there's no field with empty object.
struct FieldsPresent(pub bool, SkippedFieldIndices);
// Even though some methods have an overhead (error, bytes) it is assumed the
// compiler won't include this since we ignore the value entirely.
impl tracing::field::Visit for FieldsPresent {
#[inline]
fn record_debug(&mut self, field: &tracing::field::Field, _: &dyn std::fmt::Debug) {
if !self.1.contains(field.index())
&& field.name() != MESSAGE_FIELD
&& !field.name().starts_with("log.")
{
self.0 |= true;
}
}
}
/// Serializes the fields directly supplied with a log event.
struct SerializableEventFields<'a, 'event>(&'a tracing::Event<'event>, SkippedFieldIndices);
impl serde::ser::Serialize for SerializableEventFields<'_, '_> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
use serde::ser::SerializeMap;
let serializer = serializer.serialize_map(None)?;
let mut message_skipper = MessageFieldSkipper::new(serializer, self.1);
self.0.record(&mut message_skipper);
let serializer = message_skipper.into_serializer()?;
serializer.end()
} }
} }
/// A tracing field visitor that skips the message field. /// A tracing field visitor that skips the message field.
struct MessageFieldSkipper<S: serde::ser::SerializeMap> { struct MessageFieldSkipper<'buf> {
serializer: S, serializer: json::ObjectSer<'buf>,
skipped_field_indices: SkippedFieldIndices, skipped_field_indices: SkippedFieldIndices,
state: Result<(), S::Error>, present: bool,
} }
impl<S: serde::ser::SerializeMap> MessageFieldSkipper<S> { impl<'buf> MessageFieldSkipper<'buf> {
#[inline] #[inline]
fn new(serializer: S, skipped_field_indices: SkippedFieldIndices) -> Self { fn new(serializer: json::ObjectSer<'buf>, skipped_field_indices: SkippedFieldIndices) -> Self {
Self { Self {
serializer, serializer,
skipped_field_indices, skipped_field_indices,
state: Ok(()), present: false,
} }
} }
#[inline] #[inline]
fn accept_field(&self, field: &tracing::field::Field) -> bool { fn record_field(&mut self, field: &tracing::field::Field, v: impl json::ValueEncoder) {
self.state.is_ok() if field.name() != MESSAGE_FIELD
&& field.name() != MESSAGE_FIELD
&& !field.name().starts_with("log.") && !field.name().starts_with("log.")
&& !self.skipped_field_indices.contains(field.index()) && !self.skipped_field_indices.contains(field.index())
} {
self.serializer.entry(field.name(), v);
#[inline] self.present |= true;
fn into_serializer(self) -> Result<S, S::Error> { }
self.state?;
Ok(self.serializer)
} }
} }
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<S> { impl tracing::field::Visit for MessageFieldSkipper<'_> {
#[inline] #[inline]
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
if self.accept_field(field) { self.record_field(field, value);
self.state = self.serializer.serialize_entry(field.name(), &value);
}
} }
#[inline] #[inline]
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
if self.accept_field(field) { self.record_field(field, value);
self.state = self.serializer.serialize_entry(field.name(), &value);
}
} }
#[inline] #[inline]
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
if self.accept_field(field) { self.record_field(field, value);
self.state = self.serializer.serialize_entry(field.name(), &value);
}
} }
#[inline] #[inline]
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) { fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
if self.accept_field(field) { self.record_field(field, value);
self.state = self.serializer.serialize_entry(field.name(), &value);
}
} }
#[inline] #[inline]
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) { fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
if self.accept_field(field) { self.record_field(field, value);
self.state = self.serializer.serialize_entry(field.name(), &value);
}
} }
#[inline] #[inline]
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
if self.accept_field(field) { self.record_field(field, value);
self.state = self.serializer.serialize_entry(field.name(), &value);
}
} }
#[inline] #[inline]
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) { fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
if self.accept_field(field) { self.record_field(field, format_args!("{value:x?}"));
self.state = self
.serializer
.serialize_entry(field.name(), &format_args!("{value:x?}"));
}
} }
#[inline] #[inline]
fn record_str(&mut self, field: &tracing::field::Field, value: &str) { fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
if self.accept_field(field) { self.record_field(field, value);
self.state = self.serializer.serialize_entry(field.name(), &value);
}
} }
#[inline] #[inline]
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
if self.accept_field(field) { self.record_field(field, format_args!("{value:?}"));
self.state = self
.serializer
.serialize_entry(field.name(), &format_args!("{value:?}"));
}
} }
#[inline] #[inline]
@@ -891,131 +826,40 @@ impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<
field: &tracing::field::Field, field: &tracing::field::Field,
value: &(dyn std::error::Error + 'static), value: &(dyn std::error::Error + 'static),
) { ) {
if self.accept_field(field) { self.record_field(field, format_args!("{value}"));
self.state = self.serializer.serialize_value(&format_args!("{value}"));
}
}
}
/// Serializes the span stack from root to leaf (parent of event) as object
/// with the span names as keys. To prevent collision we append a numberic value
/// to the name. Also, collects any span fields we're interested in. Last one
/// wins.
struct SerializableSpans<'ctx, S>
where
S: for<'lookup> LookupSpan<'lookup>,
{
spans: Vec<SpanRef<'ctx, S>>,
extracted: ExtractedSpanFields,
}
impl<S> serde::ser::Serialize for SerializableSpans<'_, S>
where
S: for<'lookup> LookupSpan<'lookup>,
{
fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
where
Ser: serde::ser::Serializer,
{
let mut serializer = serializer.serialize_map(None)?;
for span in self.spans.iter().rev() {
let ext = span.extensions();
// all spans should have this extension.
let Some(fields) = ext.get() else { continue };
self.extracted.layer_span(fields);
let SpanFields { values, span_info } = fields;
serializer.serialize_entry(
&*span_info.normalized_name,
&SerializableSpanFields {
fields: span.metadata().fields(),
values,
},
)?;
}
serializer.end()
}
}
/// Serializes the span fields as object.
struct SerializableSpanFields<'span> {
fields: &'span tracing::field::FieldSet,
values: &'span [serde_json::Value; MAX_TRACING_FIELDS],
}
impl serde::ser::Serialize for SerializableSpanFields<'_> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
let mut serializer = serializer.serialize_map(None)?;
for (field, value) in std::iter::zip(self.fields, self.values) {
if value.is_null() {
continue;
}
serializer.serialize_entry(field.name(), value)?;
}
serializer.end()
} }
} }
struct ExtractedSpanFields { struct ExtractedSpanFields {
names: &'static [&'static str], names: &'static [&'static str],
values: RefCell<Vec<serde_json::Value>>, values: Vec<Option<RawValue>>,
} }
impl ExtractedSpanFields { impl ExtractedSpanFields {
fn new(names: &'static [&'static str]) -> Self { fn new(names: &'static [&'static str]) -> Self {
ExtractedSpanFields { ExtractedSpanFields {
names, names,
values: RefCell::new(vec![serde_json::Value::Null; names.len()]), values: vec![None; names.len()],
} }
} }
fn layer_span(&self, fields: &SpanFields) { fn layer_span(&mut self, fields: &SpanFields) {
let mut v = self.values.borrow_mut();
let SpanFields { values, span_info } = fields; let SpanFields { values, span_info } = fields;
// extract the fields // extract the fields
for (i, &j) in span_info.extract.iter().enumerate() { for (i, &j) in span_info.extract.iter().enumerate() {
let Some(value) = values.get(j) else { continue }; let Some(Some(value)) = values.get(j) else {
continue;
};
if !value.is_null() { // TODO: replace clone with reference, if possible.
// TODO: replace clone with reference, if possible. self.values[i] = Some(value.clone());
v[i] = value.clone();
}
} }
} }
#[inline] #[inline]
fn has_values(&self) -> bool { fn has_values(&self) -> bool {
self.values.borrow().iter().any(|v| !v.is_null()) self.values.iter().any(|v| v.is_some())
}
}
impl serde::ser::Serialize for ExtractedSpanFields {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
let mut serializer = serializer.serialize_map(None)?;
let values = self.values.borrow();
for (key, value) in std::iter::zip(self.names, &*values) {
if value.is_null() {
continue;
}
serializer.serialize_entry(key, value)?;
}
serializer.end()
} }
} }

View File

@@ -112,6 +112,9 @@ pub struct ProxyMetrics {
/// Number of bytes sent/received between all clients and backends. /// Number of bytes sent/received between all clients and backends.
pub io_bytes: CounterVec<StaticLabelSet<Direction>>, pub io_bytes: CounterVec<StaticLabelSet<Direction>>,
/// Number of IO errors while logging.
pub logging_errors_count: Counter,
/// Number of errors by a given classification. /// Number of errors by a given classification.
pub errors_total: CounterVec<StaticLabelSet<crate::error::ErrorKind>>, pub errors_total: CounterVec<StaticLabelSet<crate::error::ErrorKind>>,