From e83684b8683847a5f467809cd7dd8e2ccdc9bffa Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 25 Apr 2023 14:10:18 +0200 Subject: [PATCH] add libmetric metric for each logged log message (#4055) This patch extends the libmetrics logging setup functionality with a `tracing` layer that increments a Prometheus counter each time we log a log message. We have the counter per tracing event level. This allows for monitoring WARN and ERR log volume without parsing the log. Also, it would allow cross-checking whether logs got dropped on the way into Loki. It would be nicer if we could hook deeper into the tracing logging layer, to avoid evaluating the filter twice. But I don't know how to do it. --- libs/utils/src/logging.rs | 100 ++++++++++++++++++++---- pageserver/src/http/routes.rs | 34 ++++++++ test_runner/fixtures/metrics.py | 1 + test_runner/fixtures/pageserver/http.py | 10 +++ test_runner/regress/test_logging.py | 49 ++++++++++++ 5 files changed, 179 insertions(+), 15 deletions(-) create mode 100644 test_runner/regress/test_logging.py diff --git a/libs/utils/src/logging.rs b/libs/utils/src/logging.rs index f770622a60..ed856b6804 100644 --- a/libs/utils/src/logging.rs +++ b/libs/utils/src/logging.rs @@ -1,6 +1,7 @@ use std::str::FromStr; use anyhow::Context; +use once_cell::sync::Lazy; use strum_macros::{EnumString, EnumVariantNames}; #[derive(EnumString, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy)] @@ -23,25 +24,64 @@ impl LogFormat { } } -pub fn init(log_format: LogFormat) -> anyhow::Result<()> { - let default_filter_str = "info"; +static TRACING_EVENT_COUNT: Lazy = Lazy::new(|| { + metrics::register_int_counter_vec!( + "libmetrics_tracing_event_count", + "Number of tracing events, by level", + &["level"] + ) + .expect("failed to define metric") +}); +struct TracingEventCountLayer(&'static metrics::IntCounterVec); + +impl tracing_subscriber::layer::Layer for TracingEventCountLayer +where + S: tracing::Subscriber, +{ + fn on_event( + &self, + event: &tracing::Event<'_>, + _ctx: tracing_subscriber::layer::Context<'_, S>, + ) { + let level = event.metadata().level(); + let level = match *level { + tracing::Level::ERROR => "error", + tracing::Level::WARN => "warn", + tracing::Level::INFO => "info", + tracing::Level::DEBUG => "debug", + tracing::Level::TRACE => "trace", + }; + self.0.with_label_values(&[level]).inc(); + } +} + +pub fn init(log_format: LogFormat) -> anyhow::Result<()> { // We fall back to printing all spans at info-level or above if // the RUST_LOG environment variable is not set. - let env_filter = tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_filter_str)); + let rust_log_env_filter = || { + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")) + }; - let base_logger = tracing_subscriber::fmt() - .with_env_filter(env_filter) - .with_target(false) - .with_ansi(atty::is(atty::Stream::Stdout)) - .with_writer(std::io::stdout); - - match log_format { - LogFormat::Json => base_logger.json().init(), - LogFormat::Plain => base_logger.init(), - LogFormat::Test => base_logger.with_test_writer().init(), - } + // NB: the order of the with() calls does not matter. + // See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering + use tracing_subscriber::prelude::*; + tracing_subscriber::registry() + .with({ + let log_layer = tracing_subscriber::fmt::layer() + .with_target(false) + .with_ansi(atty::is(atty::Stream::Stdout)) + .with_writer(std::io::stdout); + let log_layer = match log_format { + LogFormat::Json => log_layer.json().boxed(), + LogFormat::Plain => log_layer.boxed(), + LogFormat::Test => log_layer.with_test_writer().boxed(), + }; + log_layer.with_filter(rust_log_env_filter()) + }) + .with(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter())) + .init(); Ok(()) } @@ -157,3 +197,33 @@ impl std::fmt::Debug for PrettyLocation<'_, '_> { ::fmt(self, f) } } + +#[cfg(test)] +mod tests { + use metrics::{core::Opts, IntCounterVec}; + + use super::TracingEventCountLayer; + + #[test] + fn tracing_event_count_metric() { + let counter_vec = + IntCounterVec::new(Opts::new("testmetric", "testhelp"), &["level"]).unwrap(); + let counter_vec = Box::leak(Box::new(counter_vec)); // make it 'static + let layer = TracingEventCountLayer(counter_vec); + use tracing_subscriber::prelude::*; + + tracing::subscriber::with_default(tracing_subscriber::registry().with(layer), || { + tracing::trace!("foo"); + tracing::debug!("foo"); + tracing::info!("foo"); + tracing::warn!("foo"); + tracing::error!("foo"); + }); + + assert_eq!(counter_vec.with_label_values(&["trace"]).get(), 1); + assert_eq!(counter_vec.with_label_values(&["debug"]).get(), 1); + assert_eq!(counter_vec.with_label_values(&["info"]).get(), 1); + assert_eq!(counter_vec.with_label_values(&["warn"]).get(), 1); + assert_eq!(counter_vec.with_label_values(&["error"]).get(), 1); + } +} diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 06a97f6dff..3318e5263c 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1201,6 +1201,36 @@ async fn handler_404(_: Request) -> Result, ApiError> { ) } +async fn post_tracing_event_handler(mut r: Request) -> Result, ApiError> { + #[derive(Debug, serde::Deserialize)] + #[serde(rename_all = "lowercase")] + enum Level { + Error, + Warn, + Info, + Debug, + Trace, + } + #[derive(Debug, serde::Deserialize)] + struct Request { + level: Level, + message: String, + } + let body: Request = json_request(&mut r) + .await + .map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?; + + match body.level { + Level::Error => tracing::error!(?body.message), + Level::Warn => tracing::warn!(?body.message), + Level::Info => tracing::info!(?body.message), + Level::Debug => tracing::debug!(?body.message), + Level::Trace => tracing::trace!(?body.message), + } + + json_response(StatusCode::OK, ()) +} + pub fn make_router( conf: &'static PageServerConf, launch_ts: &'static LaunchTimestamp, @@ -1341,5 +1371,9 @@ pub fn make_router( testing_api!("set tenant state to broken", handle_tenant_break), ) .get("/v1/panic", |r| RequestSpan(always_panic_handler).handle(r)) + .post( + "/v1/tracing/event", + testing_api!("emit a tracing event", post_tracing_event_handler), + ) .any(handler_404)) } diff --git a/test_runner/fixtures/metrics.py b/test_runner/fixtures/metrics.py index 2984f2c7d3..c88b985c8e 100644 --- a/test_runner/fixtures/metrics.py +++ b/test_runner/fixtures/metrics.py @@ -53,6 +53,7 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = ( "pageserver_storage_operations_seconds_global_bucket", "libmetrics_launch_timestamp", "libmetrics_build_info", + "libmetrics_tracing_event_count_total", ) PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = ( diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 69042478c7..cf92aeb6c0 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -550,3 +550,13 @@ class PageserverHttpClient(requests.Session): def tenant_break(self, tenant_id: TenantId): res = self.put(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/break") self.verbose_error(res) + + def post_tracing_event(self, level: str, message: str): + res = self.post( + f"http://localhost:{self.port}/v1/tracing/event", + json={ + "level": level, + "message": message, + }, + ) + self.verbose_error(res) diff --git a/test_runner/regress/test_logging.py b/test_runner/regress/test_logging.py new file mode 100644 index 0000000000..d559be0a8f --- /dev/null +++ b/test_runner/regress/test_logging.py @@ -0,0 +1,49 @@ +import uuid + +import pytest +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnvBuilder +from fixtures.utils import wait_until + + +@pytest.mark.parametrize("level", ["trace", "debug", "info", "warn", "error"]) +def test_logging_event_count(neon_env_builder: NeonEnvBuilder, level: str): + # self-test: make sure the event is logged (i.e., our testing endpoint works) + log_expected = { + "trace": False, + "debug": False, + "info": True, + "warn": True, + "error": True, + }[level] + + env = neon_env_builder.init_start() + ps_http = env.pageserver.http_client() + msg_id = uuid.uuid4().hex + + # NB: the _total suffix is added by our prometheus client + before = ps_http.get_metric_value("libmetrics_tracing_event_count_total", {"level": level}) + + # post the event + ps_http.post_tracing_event(level, msg_id) + if log_expected: + env.pageserver.allowed_errors.append(f".*{msg_id}.*") + + def assert_logged(): + if not log_expected: + return + assert env.pageserver.log_contains(f".*{msg_id}.*") + + wait_until(10, 0.5, assert_logged) + + # make sure it's counted + def assert_metric_value(): + if not log_expected: + return + # NB: the _total suffix is added by our prometheus client + val = ps_http.get_metric_value("libmetrics_tracing_event_count_total", {"level": level}) + val = val or 0.0 + log.info("libmetrics_tracing_event_count: %s", val) + assert val > (before or 0.0) + + wait_until(10, 1, assert_metric_value)