From 4c4e33bc2e70cb56fa796c380f58631ec6083fe6 Mon Sep 17 00:00:00 2001 From: Dmitrii Kovalkov <34828390+DimasKovas@users.noreply.github.com> Date: Fri, 11 Apr 2025 10:11:35 +0400 Subject: [PATCH] storage: add http/https server and cert resover metrics (#11450) ## Problem We need to export some metrics about certs/connections to configure alerts and make sure that all HTTP requests are gone before turning https-only mode on. - Closes: https://github.com/neondatabase/cloud/issues/25526 ## Summary of changes - Add started connection and connection error metrics to http/https Server. - Add certificate expiration time and reload metrics to ReloadingCertificateResolver. --- Cargo.lock | 1 + libs/http-utils/Cargo.toml | 1 + libs/http-utils/src/server.rs | 43 +++++++++++++ libs/http-utils/src/tls_certs.rs | 106 ++++++++++++++++++++++++++++--- pageserver/src/bin/pageserver.rs | 1 + safekeeper/src/http/mod.rs | 1 + storage_controller/src/main.rs | 1 + test_runner/regress/test_ssl.py | 61 ++++++++++++++++++ 8 files changed, 207 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aea8924f4f..5d2cdcea27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2837,6 +2837,7 @@ dependencies = [ "utils", "uuid", "workspace_hack", + "x509-cert", ] [[package]] diff --git a/libs/http-utils/Cargo.toml b/libs/http-utils/Cargo.toml index 6d24ee352a..5f6578f76e 100644 --- a/libs/http-utils/Cargo.toml +++ b/libs/http-utils/Cargo.toml @@ -30,6 +30,7 @@ tokio.workspace = true tracing.workspace = true url.workspace = true uuid.workspace = true +x509-cert.workspace = true # to use tokio channels as streams, this is faster to compile than async_stream # why is it only here? no other crate should use it, streams are rarely needed. diff --git a/libs/http-utils/src/server.rs b/libs/http-utils/src/server.rs index 07fd56ac01..f93f71c962 100644 --- a/libs/http-utils/src/server.rs +++ b/libs/http-utils/src/server.rs @@ -4,6 +4,8 @@ use futures::StreamExt; use futures::stream::FuturesUnordered; use hyper0::Body; use hyper0::server::conn::Http; +use metrics::{IntCounterVec, register_int_counter_vec}; +use once_cell::sync::Lazy; use routerify::{RequestService, RequestServiceBuilder}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_rustls::TlsAcceptor; @@ -26,6 +28,24 @@ pub struct Server { tls_acceptor: Option, } +static CONNECTION_STARTED_COUNT: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "http_server_connection_started_total", + "Number of established http/https connections", + &["scheme"] + ) + .expect("failed to define a metric") +}); + +static CONNECTION_ERROR_COUNT: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "http_server_connection_errors_total", + "Number of occured connection errors by type", + &["type"] + ) + .expect("failed to define a metric") +}); + impl Server { pub fn new( request_service: Arc>, @@ -60,6 +80,15 @@ impl Server { false } + let tcp_error_cnt = CONNECTION_ERROR_COUNT.with_label_values(&["tcp"]); + let tls_error_cnt = CONNECTION_ERROR_COUNT.with_label_values(&["tls"]); + let http_error_cnt = CONNECTION_ERROR_COUNT.with_label_values(&["http"]); + let https_error_cnt = CONNECTION_ERROR_COUNT.with_label_values(&["https"]); + let panic_error_cnt = CONNECTION_ERROR_COUNT.with_label_values(&["panic"]); + + let http_connection_cnt = CONNECTION_STARTED_COUNT.with_label_values(&["http"]); + let https_connection_cnt = CONNECTION_STARTED_COUNT.with_label_values(&["https"]); + let mut connections = FuturesUnordered::new(); loop { tokio::select! { @@ -67,6 +96,7 @@ impl Server { let (tcp_stream, remote_addr) = match stream { Ok(stream) => stream, Err(err) => { + tcp_error_cnt.inc(); if !suppress_io_error(&err) { info!("Failed to accept TCP connection: {err:#}"); } @@ -78,11 +108,18 @@ impl Server { let tls_acceptor = self.tls_acceptor.clone(); let cancel = cancel.clone(); + let tls_error_cnt = tls_error_cnt.clone(); + let http_error_cnt = http_error_cnt.clone(); + let https_error_cnt = https_error_cnt.clone(); + let http_connection_cnt = http_connection_cnt.clone(); + let https_connection_cnt = https_connection_cnt.clone(); + connections.push(tokio::spawn( async move { match tls_acceptor { Some(tls_acceptor) => { // Handle HTTPS connection. + https_connection_cnt.inc(); let tls_stream = tokio::select! { tls_stream = tls_acceptor.accept(tcp_stream) => tls_stream, _ = cancel.cancelled() => return, @@ -90,6 +127,7 @@ impl Server { let tls_stream = match tls_stream { Ok(tls_stream) => tls_stream, Err(err) => { + tls_error_cnt.inc(); if !suppress_io_error(&err) { info!(%remote_addr, "Failed to accept TLS connection: {err:#}"); } @@ -97,6 +135,7 @@ impl Server { } }; if let Err(err) = Self::serve_connection(tls_stream, service, cancel).await { + https_error_cnt.inc(); if !suppress_hyper_error(&err) { info!(%remote_addr, "Failed to serve HTTPS connection: {err:#}"); } @@ -104,7 +143,9 @@ impl Server { } None => { // Handle HTTP connection. + http_connection_cnt.inc(); if let Err(err) = Self::serve_connection(tcp_stream, service, cancel).await { + http_error_cnt.inc(); if !suppress_hyper_error(&err) { info!(%remote_addr, "Failed to serve HTTP connection: {err:#}"); } @@ -115,6 +156,7 @@ impl Server { } Some(conn) = connections.next() => { if let Err(err) = conn { + panic_error_cnt.inc(); error!("Connection panicked: {err:#}"); } } @@ -122,6 +164,7 @@ impl Server { // Wait for graceful shutdown of all connections. while let Some(conn) = connections.next().await { if let Err(err) = conn { + panic_error_cnt.inc(); error!("Connection panicked: {err:#}"); } } diff --git a/libs/http-utils/src/tls_certs.rs b/libs/http-utils/src/tls_certs.rs index 0c18d84d98..2799db78a6 100644 --- a/libs/http-utils/src/tls_certs.rs +++ b/libs/http-utils/src/tls_certs.rs @@ -3,11 +3,14 @@ use std::{sync::Arc, time::Duration}; use anyhow::Context; use arc_swap::ArcSwap; use camino::Utf8Path; +use metrics::{IntCounterVec, UIntGaugeVec, register_int_counter_vec, register_uint_gauge_vec}; +use once_cell::sync::Lazy; use rustls::{ - pki_types::{CertificateDer, PrivateKeyDer}, + pki_types::{CertificateDer, PrivateKeyDer, UnixTime}, server::{ClientHello, ResolvesServerCert}, sign::CertifiedKey, }; +use x509_cert::der::Reader; pub async fn load_cert_chain(filename: &Utf8Path) -> anyhow::Result>> { let cert_data = tokio::fs::read(filename) @@ -53,6 +56,76 @@ pub async fn load_certified_key( Ok(certified_key) } +/// rustls's CertifiedKey with extra parsed fields used for metrics. +struct ParsedCertifiedKey { + certified_key: CertifiedKey, + expiration_time: UnixTime, +} + +/// Parse expiration time from an X509 certificate. +fn parse_expiration_time(cert: &CertificateDer<'_>) -> anyhow::Result { + let parsed_cert = x509_cert::der::SliceReader::new(cert) + .context("Failed to parse cerficiate")? + .decode::() + .context("Failed to parse cerficiate")?; + + Ok(UnixTime::since_unix_epoch( + parsed_cert + .tbs_certificate + .validity + .not_after + .to_unix_duration(), + )) +} + +async fn load_and_parse_certified_key( + key_filename: &Utf8Path, + cert_filename: &Utf8Path, +) -> anyhow::Result { + let certified_key = load_certified_key(key_filename, cert_filename).await?; + let expiration_time = parse_expiration_time(certified_key.end_entity_cert()?)?; + Ok(ParsedCertifiedKey { + certified_key, + expiration_time, + }) +} + +static CERT_EXPIRATION_TIME: Lazy = Lazy::new(|| { + register_uint_gauge_vec!( + "tls_certs_expiration_time_seconds", + "Expiration time of the loaded certificate since unix epoch in seconds", + &["resolver_name"] + ) + .expect("failed to define a metric") +}); + +static CERT_RELOAD_STARTED_COUNTER: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "tls_certs_reload_started_total", + "Number of certificate reload loop iterations started", + &["resolver_name"] + ) + .expect("failed to define a metric") +}); + +static CERT_RELOAD_UPDATED_COUNTER: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "tls_certs_reload_updated_total", + "Number of times the certificate was updated to the new one", + &["resolver_name"] + ) + .expect("failed to define a metric") +}); + +static CERT_RELOAD_FAILED_COUNTER: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "tls_certs_reload_failed_total", + "Number of times the certificate reload failed", + &["resolver_name"] + ) + .expect("failed to define a metric") +}); + /// Implementation of [`rustls::server::ResolvesServerCert`] which reloads certificates from /// the disk periodically. #[derive(Debug)] @@ -63,16 +136,28 @@ pub struct ReloadingCertificateResolver { impl ReloadingCertificateResolver { /// Creates a new Resolver by loading certificate and private key from FS and /// creating tokio::task to reload them with provided reload_period. + /// resolver_name is used as metric's label. pub async fn new( + resolver_name: &str, key_filename: &Utf8Path, cert_filename: &Utf8Path, reload_period: Duration, ) -> anyhow::Result> { + // Create metrics for current resolver. + let cert_expiration_time = CERT_EXPIRATION_TIME.with_label_values(&[resolver_name]); + let cert_reload_started_counter = + CERT_RELOAD_STARTED_COUNTER.with_label_values(&[resolver_name]); + let cert_reload_updated_counter = + CERT_RELOAD_UPDATED_COUNTER.with_label_values(&[resolver_name]); + let cert_reload_failed_counter = + CERT_RELOAD_FAILED_COUNTER.with_label_values(&[resolver_name]); + + let parsed_key = load_and_parse_certified_key(key_filename, cert_filename).await?; + let this = Arc::new(Self { - certified_key: ArcSwap::from_pointee( - load_certified_key(key_filename, cert_filename).await?, - ), + certified_key: ArcSwap::from_pointee(parsed_key.certified_key), }); + cert_expiration_time.set(parsed_key.expiration_time.as_secs()); tokio::spawn({ let weak_this = Arc::downgrade(&this); @@ -88,17 +173,22 @@ impl ReloadingCertificateResolver { Some(this) => this, None => break, // Resolver has been destroyed, exit. }; - match load_certified_key(&key_filename, &cert_filename).await { - Ok(new_certified_key) => { - if new_certified_key.cert == this.certified_key.load().cert { + cert_reload_started_counter.inc(); + + match load_and_parse_certified_key(&key_filename, &cert_filename).await { + Ok(parsed_key) => { + if parsed_key.certified_key.cert == this.certified_key.load().cert { tracing::debug!("Certificate has not changed since last reloading"); } else { tracing::info!("Certificate has been reloaded"); - this.certified_key.store(Arc::new(new_certified_key)); + this.certified_key.store(Arc::new(parsed_key.certified_key)); + cert_expiration_time.set(parsed_key.expiration_time.as_secs()); + cert_reload_updated_counter.inc(); } last_reload_failed = false; } Err(err) => { + cert_reload_failed_counter.inc(); // Note: Reloading certs may fail if it conflicts with the script updating // the files at the same time. Warn only if the error is persistent. if last_reload_failed { diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 2740f81758..250d4180f5 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -455,6 +455,7 @@ fn start_pageserver( let tls_server_config = if conf.listen_https_addr.is_some() || conf.enable_tls_page_service_api { let resolver = BACKGROUND_RUNTIME.block_on(ReloadingCertificateResolver::new( + "main", &conf.ssl_key_file, &conf.ssl_cert_file, conf.ssl_cert_reload_period, diff --git a/safekeeper/src/http/mod.rs b/safekeeper/src/http/mod.rs index 003a75faa6..6e7c5d971d 100644 --- a/safekeeper/src/http/mod.rs +++ b/safekeeper/src/http/mod.rs @@ -31,6 +31,7 @@ pub async fn task_main_https( global_timelines: Arc, ) -> anyhow::Result<()> { let cert_resolver = ReloadingCertificateResolver::new( + "main", &conf.ssl_key_file, &conf.ssl_cert_file, conf.ssl_cert_reload_period, diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index 1aa9ae10ae..9358c9da4d 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -472,6 +472,7 @@ async fn async_main() -> anyhow::Result<()> { let https_listener = tcp_listener::bind(https_addr)?; let resolver = ReloadingCertificateResolver::new( + "main", &args.ssl_key_file, &args.ssl_cert_file, *args.ssl_cert_reload_period, diff --git a/test_runner/regress/test_ssl.py b/test_runner/regress/test_ssl.py index 9a7204ca17..39c94c05a9 100644 --- a/test_runner/regress/test_ssl.py +++ b/test_runner/regress/test_ssl.py @@ -1,5 +1,6 @@ import os import ssl +from datetime import datetime, timedelta import pytest import requests @@ -151,3 +152,63 @@ def test_certificate_rotation(neon_env_builder: NeonEnvBuilder): requests.get(addr, verify=str(env.ssl_ca_file)).raise_for_status() cur_cert = ssl.get_server_certificate(("localhost", port)) assert cur_cert == sk_cert + + +def test_server_and_cert_metrics(neon_env_builder: NeonEnvBuilder): + """ + Test metrics exported from http/https server and tls cert reloader. + """ + neon_env_builder.use_https_pageserver_api = True + neon_env_builder.pageserver_config_override = "ssl_cert_reload_period='100 ms'" + env = neon_env_builder.init_start() + + env.pageserver.allowed_errors.append(".*Error reloading certificate.*") + + ps_client = env.pageserver.http_client() + + # 1. Test connection started metric. + filter_https = {"scheme": "https"} + old_https_conn_count = ( + ps_client.get_metric_value("http_server_connection_started_total", filter_https) or 0 + ) + + addr = f"https://localhost:{env.pageserver.service_port.https}/v1/status" + requests.get(addr, verify=str(env.ssl_ca_file)).raise_for_status() + + new_https_conn_count = ( + ps_client.get_metric_value("http_server_connection_started_total", filter_https) or 0 + ) + # The counter should increase after the request, + # but it may increase by more than one because of storcon requests. + assert new_https_conn_count > old_https_conn_count + + # 2. Test tls connection error. + # Request without specified CA cert file should fail. + with pytest.raises(requests.exceptions.SSLError): + requests.get(addr) + + tls_error_cnt = ( + ps_client.get_metric_value("http_server_connection_errors_total", {"type": "tls"}) or 0 + ) + assert tls_error_cnt == 1 + + # 3. Test expiration time metric. + expiration_time = datetime.fromtimestamp( + ps_client.get_metric_value("tls_certs_expiration_time_seconds") or 0 + ) + now = datetime.now() + # neon_local generates certs valid for 100 years. + # Compare with +-1 year to not care about leap years. + assert now + timedelta(days=365 * 99) < expiration_time < now + timedelta(days=365 * 101) + + # 4. Test cert reload failed metric. + reload_error_cnt = ps_client.get_metric_value("tls_certs_reload_failed_total") + assert reload_error_cnt == 0 + + os.remove(env.pageserver.workdir / "server.crt") + + def reload_failed(): + reload_error_cnt = ps_client.get_metric_value("tls_certs_reload_failed_total") or 0 + assert reload_error_cnt > 0 + + wait_until(reload_failed)