mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-25 00:50:36 +00:00
add libmetric metric for each logged log message (#4055)
This patch extends the libmetrics logging setup functionality with a `tracing` layer that increments a Prometheus counter each time we log a log message. We have the counter per tracing event level. This allows for monitoring WARN and ERR log volume without parsing the log. Also, it would allow cross-checking whether logs got dropped on the way into Loki. It would be nicer if we could hook deeper into the tracing logging layer, to avoid evaluating the filter twice. But I don't know how to do it.
This commit is contained in:
committed by
GitHub
parent
afbbc61036
commit
e83684b868
@@ -1,6 +1,7 @@
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::Lazy;
|
||||
use strum_macros::{EnumString, EnumVariantNames};
|
||||
|
||||
#[derive(EnumString, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy)]
|
||||
@@ -23,25 +24,64 @@ impl LogFormat {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn init(log_format: LogFormat) -> anyhow::Result<()> {
|
||||
let default_filter_str = "info";
|
||||
static TRACING_EVENT_COUNT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
|
||||
metrics::register_int_counter_vec!(
|
||||
"libmetrics_tracing_event_count",
|
||||
"Number of tracing events, by level",
|
||||
&["level"]
|
||||
)
|
||||
.expect("failed to define metric")
|
||||
});
|
||||
|
||||
struct TracingEventCountLayer(&'static metrics::IntCounterVec);
|
||||
|
||||
impl<S> tracing_subscriber::layer::Layer<S> for TracingEventCountLayer
|
||||
where
|
||||
S: tracing::Subscriber,
|
||||
{
|
||||
fn on_event(
|
||||
&self,
|
||||
event: &tracing::Event<'_>,
|
||||
_ctx: tracing_subscriber::layer::Context<'_, S>,
|
||||
) {
|
||||
let level = event.metadata().level();
|
||||
let level = match *level {
|
||||
tracing::Level::ERROR => "error",
|
||||
tracing::Level::WARN => "warn",
|
||||
tracing::Level::INFO => "info",
|
||||
tracing::Level::DEBUG => "debug",
|
||||
tracing::Level::TRACE => "trace",
|
||||
};
|
||||
self.0.with_label_values(&[level]).inc();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn init(log_format: LogFormat) -> anyhow::Result<()> {
|
||||
// We fall back to printing all spans at info-level or above if
|
||||
// the RUST_LOG environment variable is not set.
|
||||
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_filter_str));
|
||||
let rust_log_env_filter = || {
|
||||
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
|
||||
};
|
||||
|
||||
let base_logger = tracing_subscriber::fmt()
|
||||
.with_env_filter(env_filter)
|
||||
.with_target(false)
|
||||
.with_ansi(atty::is(atty::Stream::Stdout))
|
||||
.with_writer(std::io::stdout);
|
||||
|
||||
match log_format {
|
||||
LogFormat::Json => base_logger.json().init(),
|
||||
LogFormat::Plain => base_logger.init(),
|
||||
LogFormat::Test => base_logger.with_test_writer().init(),
|
||||
}
|
||||
// NB: the order of the with() calls does not matter.
|
||||
// See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
|
||||
use tracing_subscriber::prelude::*;
|
||||
tracing_subscriber::registry()
|
||||
.with({
|
||||
let log_layer = tracing_subscriber::fmt::layer()
|
||||
.with_target(false)
|
||||
.with_ansi(atty::is(atty::Stream::Stdout))
|
||||
.with_writer(std::io::stdout);
|
||||
let log_layer = match log_format {
|
||||
LogFormat::Json => log_layer.json().boxed(),
|
||||
LogFormat::Plain => log_layer.boxed(),
|
||||
LogFormat::Test => log_layer.with_test_writer().boxed(),
|
||||
};
|
||||
log_layer.with_filter(rust_log_env_filter())
|
||||
})
|
||||
.with(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter()))
|
||||
.init();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -157,3 +197,33 @@ impl std::fmt::Debug for PrettyLocation<'_, '_> {
|
||||
<Self as std::fmt::Display>::fmt(self, f)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use metrics::{core::Opts, IntCounterVec};
|
||||
|
||||
use super::TracingEventCountLayer;
|
||||
|
||||
#[test]
|
||||
fn tracing_event_count_metric() {
|
||||
let counter_vec =
|
||||
IntCounterVec::new(Opts::new("testmetric", "testhelp"), &["level"]).unwrap();
|
||||
let counter_vec = Box::leak(Box::new(counter_vec)); // make it 'static
|
||||
let layer = TracingEventCountLayer(counter_vec);
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
tracing::subscriber::with_default(tracing_subscriber::registry().with(layer), || {
|
||||
tracing::trace!("foo");
|
||||
tracing::debug!("foo");
|
||||
tracing::info!("foo");
|
||||
tracing::warn!("foo");
|
||||
tracing::error!("foo");
|
||||
});
|
||||
|
||||
assert_eq!(counter_vec.with_label_values(&["trace"]).get(), 1);
|
||||
assert_eq!(counter_vec.with_label_values(&["debug"]).get(), 1);
|
||||
assert_eq!(counter_vec.with_label_values(&["info"]).get(), 1);
|
||||
assert_eq!(counter_vec.with_label_values(&["warn"]).get(), 1);
|
||||
assert_eq!(counter_vec.with_label_values(&["error"]).get(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1201,6 +1201,36 @@ async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
)
|
||||
}
|
||||
|
||||
async fn post_tracing_event_handler(mut r: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
enum Level {
|
||||
Error,
|
||||
Warn,
|
||||
Info,
|
||||
Debug,
|
||||
Trace,
|
||||
}
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
struct Request {
|
||||
level: Level,
|
||||
message: String,
|
||||
}
|
||||
let body: Request = json_request(&mut r)
|
||||
.await
|
||||
.map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?;
|
||||
|
||||
match body.level {
|
||||
Level::Error => tracing::error!(?body.message),
|
||||
Level::Warn => tracing::warn!(?body.message),
|
||||
Level::Info => tracing::info!(?body.message),
|
||||
Level::Debug => tracing::debug!(?body.message),
|
||||
Level::Trace => tracing::trace!(?body.message),
|
||||
}
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
pub fn make_router(
|
||||
conf: &'static PageServerConf,
|
||||
launch_ts: &'static LaunchTimestamp,
|
||||
@@ -1341,5 +1371,9 @@ pub fn make_router(
|
||||
testing_api!("set tenant state to broken", handle_tenant_break),
|
||||
)
|
||||
.get("/v1/panic", |r| RequestSpan(always_panic_handler).handle(r))
|
||||
.post(
|
||||
"/v1/tracing/event",
|
||||
testing_api!("emit a tracing event", post_tracing_event_handler),
|
||||
)
|
||||
.any(handler_404))
|
||||
}
|
||||
|
||||
@@ -53,6 +53,7 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
|
||||
"pageserver_storage_operations_seconds_global_bucket",
|
||||
"libmetrics_launch_timestamp",
|
||||
"libmetrics_build_info",
|
||||
"libmetrics_tracing_event_count_total",
|
||||
)
|
||||
|
||||
PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
|
||||
|
||||
@@ -550,3 +550,13 @@ class PageserverHttpClient(requests.Session):
|
||||
def tenant_break(self, tenant_id: TenantId):
|
||||
res = self.put(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/break")
|
||||
self.verbose_error(res)
|
||||
|
||||
def post_tracing_event(self, level: str, message: str):
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tracing/event",
|
||||
json={
|
||||
"level": level,
|
||||
"message": message,
|
||||
},
|
||||
)
|
||||
self.verbose_error(res)
|
||||
|
||||
49
test_runner/regress/test_logging.py
Normal file
49
test_runner/regress/test_logging.py
Normal file
@@ -0,0 +1,49 @@
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
|
||||
@pytest.mark.parametrize("level", ["trace", "debug", "info", "warn", "error"])
|
||||
def test_logging_event_count(neon_env_builder: NeonEnvBuilder, level: str):
|
||||
# self-test: make sure the event is logged (i.e., our testing endpoint works)
|
||||
log_expected = {
|
||||
"trace": False,
|
||||
"debug": False,
|
||||
"info": True,
|
||||
"warn": True,
|
||||
"error": True,
|
||||
}[level]
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
ps_http = env.pageserver.http_client()
|
||||
msg_id = uuid.uuid4().hex
|
||||
|
||||
# NB: the _total suffix is added by our prometheus client
|
||||
before = ps_http.get_metric_value("libmetrics_tracing_event_count_total", {"level": level})
|
||||
|
||||
# post the event
|
||||
ps_http.post_tracing_event(level, msg_id)
|
||||
if log_expected:
|
||||
env.pageserver.allowed_errors.append(f".*{msg_id}.*")
|
||||
|
||||
def assert_logged():
|
||||
if not log_expected:
|
||||
return
|
||||
assert env.pageserver.log_contains(f".*{msg_id}.*")
|
||||
|
||||
wait_until(10, 0.5, assert_logged)
|
||||
|
||||
# make sure it's counted
|
||||
def assert_metric_value():
|
||||
if not log_expected:
|
||||
return
|
||||
# NB: the _total suffix is added by our prometheus client
|
||||
val = ps_http.get_metric_value("libmetrics_tracing_event_count_total", {"level": level})
|
||||
val = val or 0.0
|
||||
log.info("libmetrics_tracing_event_count: %s", val)
|
||||
assert val > (before or 0.0)
|
||||
|
||||
wait_until(10, 1, assert_metric_value)
|
||||
Reference in New Issue
Block a user