storage: add http/https server and cert resover metrics (#11450)

## Problem
We need to export some metrics about certs/connections to configure
alerts and make sure that all HTTP requests are gone before turning
https-only mode on.
- Closes: https://github.com/neondatabase/cloud/issues/25526

## Summary of changes
- Add started connection and connection error metrics to http/https
Server.
- Add certificate expiration time and reload metrics to
ReloadingCertificateResolver.
This commit is contained in:
Dmitrii Kovalkov
2025-04-11 10:11:35 +04:00
committed by GitHub
parent 342607473a
commit 4c4e33bc2e
8 changed files with 207 additions and 8 deletions

1
Cargo.lock generated
View File

@@ -2837,6 +2837,7 @@ dependencies = [
"utils", "utils",
"uuid", "uuid",
"workspace_hack", "workspace_hack",
"x509-cert",
] ]
[[package]] [[package]]

View File

@@ -30,6 +30,7 @@ tokio.workspace = true
tracing.workspace = true tracing.workspace = true
url.workspace = true url.workspace = true
uuid.workspace = true uuid.workspace = true
x509-cert.workspace = true
# to use tokio channels as streams, this is faster to compile than async_stream # to use tokio channels as streams, this is faster to compile than async_stream
# why is it only here? no other crate should use it, streams are rarely needed. # why is it only here? no other crate should use it, streams are rarely needed.

View File

@@ -4,6 +4,8 @@ use futures::StreamExt;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use hyper0::Body; use hyper0::Body;
use hyper0::server::conn::Http; use hyper0::server::conn::Http;
use metrics::{IntCounterVec, register_int_counter_vec};
use once_cell::sync::Lazy;
use routerify::{RequestService, RequestServiceBuilder}; use routerify::{RequestService, RequestServiceBuilder};
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use tokio_rustls::TlsAcceptor; use tokio_rustls::TlsAcceptor;
@@ -26,6 +28,24 @@ pub struct Server {
tls_acceptor: Option<TlsAcceptor>, tls_acceptor: Option<TlsAcceptor>,
} }
static CONNECTION_STARTED_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"http_server_connection_started_total",
"Number of established http/https connections",
&["scheme"]
)
.expect("failed to define a metric")
});
static CONNECTION_ERROR_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"http_server_connection_errors_total",
"Number of occured connection errors by type",
&["type"]
)
.expect("failed to define a metric")
});
impl Server { impl Server {
pub fn new( pub fn new(
request_service: Arc<RequestServiceBuilder<Body, ApiError>>, request_service: Arc<RequestServiceBuilder<Body, ApiError>>,
@@ -60,6 +80,15 @@ impl Server {
false false
} }
let tcp_error_cnt = CONNECTION_ERROR_COUNT.with_label_values(&["tcp"]);
let tls_error_cnt = CONNECTION_ERROR_COUNT.with_label_values(&["tls"]);
let http_error_cnt = CONNECTION_ERROR_COUNT.with_label_values(&["http"]);
let https_error_cnt = CONNECTION_ERROR_COUNT.with_label_values(&["https"]);
let panic_error_cnt = CONNECTION_ERROR_COUNT.with_label_values(&["panic"]);
let http_connection_cnt = CONNECTION_STARTED_COUNT.with_label_values(&["http"]);
let https_connection_cnt = CONNECTION_STARTED_COUNT.with_label_values(&["https"]);
let mut connections = FuturesUnordered::new(); let mut connections = FuturesUnordered::new();
loop { loop {
tokio::select! { tokio::select! {
@@ -67,6 +96,7 @@ impl Server {
let (tcp_stream, remote_addr) = match stream { let (tcp_stream, remote_addr) = match stream {
Ok(stream) => stream, Ok(stream) => stream,
Err(err) => { Err(err) => {
tcp_error_cnt.inc();
if !suppress_io_error(&err) { if !suppress_io_error(&err) {
info!("Failed to accept TCP connection: {err:#}"); info!("Failed to accept TCP connection: {err:#}");
} }
@@ -78,11 +108,18 @@ impl Server {
let tls_acceptor = self.tls_acceptor.clone(); let tls_acceptor = self.tls_acceptor.clone();
let cancel = cancel.clone(); let cancel = cancel.clone();
let tls_error_cnt = tls_error_cnt.clone();
let http_error_cnt = http_error_cnt.clone();
let https_error_cnt = https_error_cnt.clone();
let http_connection_cnt = http_connection_cnt.clone();
let https_connection_cnt = https_connection_cnt.clone();
connections.push(tokio::spawn( connections.push(tokio::spawn(
async move { async move {
match tls_acceptor { match tls_acceptor {
Some(tls_acceptor) => { Some(tls_acceptor) => {
// Handle HTTPS connection. // Handle HTTPS connection.
https_connection_cnt.inc();
let tls_stream = tokio::select! { let tls_stream = tokio::select! {
tls_stream = tls_acceptor.accept(tcp_stream) => tls_stream, tls_stream = tls_acceptor.accept(tcp_stream) => tls_stream,
_ = cancel.cancelled() => return, _ = cancel.cancelled() => return,
@@ -90,6 +127,7 @@ impl Server {
let tls_stream = match tls_stream { let tls_stream = match tls_stream {
Ok(tls_stream) => tls_stream, Ok(tls_stream) => tls_stream,
Err(err) => { Err(err) => {
tls_error_cnt.inc();
if !suppress_io_error(&err) { if !suppress_io_error(&err) {
info!(%remote_addr, "Failed to accept TLS connection: {err:#}"); info!(%remote_addr, "Failed to accept TLS connection: {err:#}");
} }
@@ -97,6 +135,7 @@ impl Server {
} }
}; };
if let Err(err) = Self::serve_connection(tls_stream, service, cancel).await { if let Err(err) = Self::serve_connection(tls_stream, service, cancel).await {
https_error_cnt.inc();
if !suppress_hyper_error(&err) { if !suppress_hyper_error(&err) {
info!(%remote_addr, "Failed to serve HTTPS connection: {err:#}"); info!(%remote_addr, "Failed to serve HTTPS connection: {err:#}");
} }
@@ -104,7 +143,9 @@ impl Server {
} }
None => { None => {
// Handle HTTP connection. // Handle HTTP connection.
http_connection_cnt.inc();
if let Err(err) = Self::serve_connection(tcp_stream, service, cancel).await { if let Err(err) = Self::serve_connection(tcp_stream, service, cancel).await {
http_error_cnt.inc();
if !suppress_hyper_error(&err) { if !suppress_hyper_error(&err) {
info!(%remote_addr, "Failed to serve HTTP connection: {err:#}"); info!(%remote_addr, "Failed to serve HTTP connection: {err:#}");
} }
@@ -115,6 +156,7 @@ impl Server {
} }
Some(conn) = connections.next() => { Some(conn) = connections.next() => {
if let Err(err) = conn { if let Err(err) = conn {
panic_error_cnt.inc();
error!("Connection panicked: {err:#}"); error!("Connection panicked: {err:#}");
} }
} }
@@ -122,6 +164,7 @@ impl Server {
// Wait for graceful shutdown of all connections. // Wait for graceful shutdown of all connections.
while let Some(conn) = connections.next().await { while let Some(conn) = connections.next().await {
if let Err(err) = conn { if let Err(err) = conn {
panic_error_cnt.inc();
error!("Connection panicked: {err:#}"); error!("Connection panicked: {err:#}");
} }
} }

View File

@@ -3,11 +3,14 @@ use std::{sync::Arc, time::Duration};
use anyhow::Context; use anyhow::Context;
use arc_swap::ArcSwap; use arc_swap::ArcSwap;
use camino::Utf8Path; use camino::Utf8Path;
use metrics::{IntCounterVec, UIntGaugeVec, register_int_counter_vec, register_uint_gauge_vec};
use once_cell::sync::Lazy;
use rustls::{ use rustls::{
pki_types::{CertificateDer, PrivateKeyDer}, pki_types::{CertificateDer, PrivateKeyDer, UnixTime},
server::{ClientHello, ResolvesServerCert}, server::{ClientHello, ResolvesServerCert},
sign::CertifiedKey, sign::CertifiedKey,
}; };
use x509_cert::der::Reader;
pub async fn load_cert_chain(filename: &Utf8Path) -> anyhow::Result<Vec<CertificateDer<'static>>> { pub async fn load_cert_chain(filename: &Utf8Path) -> anyhow::Result<Vec<CertificateDer<'static>>> {
let cert_data = tokio::fs::read(filename) let cert_data = tokio::fs::read(filename)
@@ -53,6 +56,76 @@ pub async fn load_certified_key(
Ok(certified_key) Ok(certified_key)
} }
/// rustls's CertifiedKey with extra parsed fields used for metrics.
struct ParsedCertifiedKey {
certified_key: CertifiedKey,
expiration_time: UnixTime,
}
/// Parse expiration time from an X509 certificate.
fn parse_expiration_time(cert: &CertificateDer<'_>) -> anyhow::Result<UnixTime> {
let parsed_cert = x509_cert::der::SliceReader::new(cert)
.context("Failed to parse cerficiate")?
.decode::<x509_cert::Certificate>()
.context("Failed to parse cerficiate")?;
Ok(UnixTime::since_unix_epoch(
parsed_cert
.tbs_certificate
.validity
.not_after
.to_unix_duration(),
))
}
async fn load_and_parse_certified_key(
key_filename: &Utf8Path,
cert_filename: &Utf8Path,
) -> anyhow::Result<ParsedCertifiedKey> {
let certified_key = load_certified_key(key_filename, cert_filename).await?;
let expiration_time = parse_expiration_time(certified_key.end_entity_cert()?)?;
Ok(ParsedCertifiedKey {
certified_key,
expiration_time,
})
}
static CERT_EXPIRATION_TIME: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"tls_certs_expiration_time_seconds",
"Expiration time of the loaded certificate since unix epoch in seconds",
&["resolver_name"]
)
.expect("failed to define a metric")
});
static CERT_RELOAD_STARTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"tls_certs_reload_started_total",
"Number of certificate reload loop iterations started",
&["resolver_name"]
)
.expect("failed to define a metric")
});
static CERT_RELOAD_UPDATED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"tls_certs_reload_updated_total",
"Number of times the certificate was updated to the new one",
&["resolver_name"]
)
.expect("failed to define a metric")
});
static CERT_RELOAD_FAILED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"tls_certs_reload_failed_total",
"Number of times the certificate reload failed",
&["resolver_name"]
)
.expect("failed to define a metric")
});
/// Implementation of [`rustls::server::ResolvesServerCert`] which reloads certificates from /// Implementation of [`rustls::server::ResolvesServerCert`] which reloads certificates from
/// the disk periodically. /// the disk periodically.
#[derive(Debug)] #[derive(Debug)]
@@ -63,16 +136,28 @@ pub struct ReloadingCertificateResolver {
impl ReloadingCertificateResolver { impl ReloadingCertificateResolver {
/// Creates a new Resolver by loading certificate and private key from FS and /// Creates a new Resolver by loading certificate and private key from FS and
/// creating tokio::task to reload them with provided reload_period. /// creating tokio::task to reload them with provided reload_period.
/// resolver_name is used as metric's label.
pub async fn new( pub async fn new(
resolver_name: &str,
key_filename: &Utf8Path, key_filename: &Utf8Path,
cert_filename: &Utf8Path, cert_filename: &Utf8Path,
reload_period: Duration, reload_period: Duration,
) -> anyhow::Result<Arc<Self>> { ) -> anyhow::Result<Arc<Self>> {
// Create metrics for current resolver.
let cert_expiration_time = CERT_EXPIRATION_TIME.with_label_values(&[resolver_name]);
let cert_reload_started_counter =
CERT_RELOAD_STARTED_COUNTER.with_label_values(&[resolver_name]);
let cert_reload_updated_counter =
CERT_RELOAD_UPDATED_COUNTER.with_label_values(&[resolver_name]);
let cert_reload_failed_counter =
CERT_RELOAD_FAILED_COUNTER.with_label_values(&[resolver_name]);
let parsed_key = load_and_parse_certified_key(key_filename, cert_filename).await?;
let this = Arc::new(Self { let this = Arc::new(Self {
certified_key: ArcSwap::from_pointee( certified_key: ArcSwap::from_pointee(parsed_key.certified_key),
load_certified_key(key_filename, cert_filename).await?,
),
}); });
cert_expiration_time.set(parsed_key.expiration_time.as_secs());
tokio::spawn({ tokio::spawn({
let weak_this = Arc::downgrade(&this); let weak_this = Arc::downgrade(&this);
@@ -88,17 +173,22 @@ impl ReloadingCertificateResolver {
Some(this) => this, Some(this) => this,
None => break, // Resolver has been destroyed, exit. None => break, // Resolver has been destroyed, exit.
}; };
match load_certified_key(&key_filename, &cert_filename).await { cert_reload_started_counter.inc();
Ok(new_certified_key) => {
if new_certified_key.cert == this.certified_key.load().cert { match load_and_parse_certified_key(&key_filename, &cert_filename).await {
Ok(parsed_key) => {
if parsed_key.certified_key.cert == this.certified_key.load().cert {
tracing::debug!("Certificate has not changed since last reloading"); tracing::debug!("Certificate has not changed since last reloading");
} else { } else {
tracing::info!("Certificate has been reloaded"); tracing::info!("Certificate has been reloaded");
this.certified_key.store(Arc::new(new_certified_key)); this.certified_key.store(Arc::new(parsed_key.certified_key));
cert_expiration_time.set(parsed_key.expiration_time.as_secs());
cert_reload_updated_counter.inc();
} }
last_reload_failed = false; last_reload_failed = false;
} }
Err(err) => { Err(err) => {
cert_reload_failed_counter.inc();
// Note: Reloading certs may fail if it conflicts with the script updating // Note: Reloading certs may fail if it conflicts with the script updating
// the files at the same time. Warn only if the error is persistent. // the files at the same time. Warn only if the error is persistent.
if last_reload_failed { if last_reload_failed {

View File

@@ -455,6 +455,7 @@ fn start_pageserver(
let tls_server_config = if conf.listen_https_addr.is_some() || conf.enable_tls_page_service_api let tls_server_config = if conf.listen_https_addr.is_some() || conf.enable_tls_page_service_api
{ {
let resolver = BACKGROUND_RUNTIME.block_on(ReloadingCertificateResolver::new( let resolver = BACKGROUND_RUNTIME.block_on(ReloadingCertificateResolver::new(
"main",
&conf.ssl_key_file, &conf.ssl_key_file,
&conf.ssl_cert_file, &conf.ssl_cert_file,
conf.ssl_cert_reload_period, conf.ssl_cert_reload_period,

View File

@@ -31,6 +31,7 @@ pub async fn task_main_https(
global_timelines: Arc<GlobalTimelines>, global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let cert_resolver = ReloadingCertificateResolver::new( let cert_resolver = ReloadingCertificateResolver::new(
"main",
&conf.ssl_key_file, &conf.ssl_key_file,
&conf.ssl_cert_file, &conf.ssl_cert_file,
conf.ssl_cert_reload_period, conf.ssl_cert_reload_period,

View File

@@ -472,6 +472,7 @@ async fn async_main() -> anyhow::Result<()> {
let https_listener = tcp_listener::bind(https_addr)?; let https_listener = tcp_listener::bind(https_addr)?;
let resolver = ReloadingCertificateResolver::new( let resolver = ReloadingCertificateResolver::new(
"main",
&args.ssl_key_file, &args.ssl_key_file,
&args.ssl_cert_file, &args.ssl_cert_file,
*args.ssl_cert_reload_period, *args.ssl_cert_reload_period,

View File

@@ -1,5 +1,6 @@
import os import os
import ssl import ssl
from datetime import datetime, timedelta
import pytest import pytest
import requests import requests
@@ -151,3 +152,63 @@ def test_certificate_rotation(neon_env_builder: NeonEnvBuilder):
requests.get(addr, verify=str(env.ssl_ca_file)).raise_for_status() requests.get(addr, verify=str(env.ssl_ca_file)).raise_for_status()
cur_cert = ssl.get_server_certificate(("localhost", port)) cur_cert = ssl.get_server_certificate(("localhost", port))
assert cur_cert == sk_cert assert cur_cert == sk_cert
def test_server_and_cert_metrics(neon_env_builder: NeonEnvBuilder):
"""
Test metrics exported from http/https server and tls cert reloader.
"""
neon_env_builder.use_https_pageserver_api = True
neon_env_builder.pageserver_config_override = "ssl_cert_reload_period='100 ms'"
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.append(".*Error reloading certificate.*")
ps_client = env.pageserver.http_client()
# 1. Test connection started metric.
filter_https = {"scheme": "https"}
old_https_conn_count = (
ps_client.get_metric_value("http_server_connection_started_total", filter_https) or 0
)
addr = f"https://localhost:{env.pageserver.service_port.https}/v1/status"
requests.get(addr, verify=str(env.ssl_ca_file)).raise_for_status()
new_https_conn_count = (
ps_client.get_metric_value("http_server_connection_started_total", filter_https) or 0
)
# The counter should increase after the request,
# but it may increase by more than one because of storcon requests.
assert new_https_conn_count > old_https_conn_count
# 2. Test tls connection error.
# Request without specified CA cert file should fail.
with pytest.raises(requests.exceptions.SSLError):
requests.get(addr)
tls_error_cnt = (
ps_client.get_metric_value("http_server_connection_errors_total", {"type": "tls"}) or 0
)
assert tls_error_cnt == 1
# 3. Test expiration time metric.
expiration_time = datetime.fromtimestamp(
ps_client.get_metric_value("tls_certs_expiration_time_seconds") or 0
)
now = datetime.now()
# neon_local generates certs valid for 100 years.
# Compare with +-1 year to not care about leap years.
assert now + timedelta(days=365 * 99) < expiration_time < now + timedelta(days=365 * 101)
# 4. Test cert reload failed metric.
reload_error_cnt = ps_client.get_metric_value("tls_certs_reload_failed_total")
assert reload_error_cnt == 0
os.remove(env.pageserver.workdir / "server.crt")
def reload_failed():
reload_error_cnt = ps_client.get_metric_value("tls_certs_reload_failed_total") or 0
assert reload_error_cnt > 0
wait_until(reload_failed)