feat(compute): Add some basic compute_ctl metrics (#10504)

## Problem

There are several parts of `compute_ctl` with a very low visibility of
errors:
1. DB migrations that run async in the background after compute start.
2. Requests made to control plane (currently only `GetSpec`).
3. Requests made to the remote extensions server.

## Summary of changes

Add new counters to quickly evaluate the amount of errors among the
fleet.

Part of neondatabase/cloud#17590
This commit is contained in:
Alexey Kondratov
2025-01-28 20:24:07 +01:00
committed by GitHub
parent f5fdaa6dc6
commit d04d924649
9 changed files with 230 additions and 62 deletions

View File

@@ -85,6 +85,8 @@ use tracing::info;
use tracing::log::warn;
use zstd::stream::read::Decoder;
use crate::metrics::{REMOTE_EXT_REQUESTS_FAILED, REMOTE_EXT_REQUESTS_TOTAL, UNKNOWN_HTTP_STATUS};
fn get_pg_config(argument: &str, pgbin: &str) -> String {
// gives the result of `pg_config [argument]`
// where argument is a flag like `--version` or `--sharedir`
@@ -258,21 +260,60 @@ async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Res
info!("Download extension {:?} from uri {:?}", ext_path, uri);
let resp = reqwest::get(uri).await?;
REMOTE_EXT_REQUESTS_TOTAL.with_label_values(&[]).inc();
match resp.status() {
match do_extension_server_request(&uri).await {
Ok(resp) => {
info!(
"Successfully downloaded remote extension data {:?}",
ext_path
);
Ok(resp)
}
Err((msg, status)) => {
let status_str = status
.map(|s| s.to_string())
.unwrap_or(UNKNOWN_HTTP_STATUS.to_string());
REMOTE_EXT_REQUESTS_FAILED
.with_label_values(&[&status_str])
.inc();
bail!(msg);
}
}
}
// Do a single remote extensions server request.
// Return result or (error message + status code) in case of any failures.
async fn do_extension_server_request(uri: &str) -> Result<Bytes, (String, Option<StatusCode>)> {
let resp = reqwest::get(uri).await.map_err(|e| {
(
format!("could not perform remote extensions server request: {}", e),
None,
)
})?;
let status = resp.status();
match status {
StatusCode::OK => match resp.bytes().await {
Ok(resp) => {
info!("Download extension {:?} completed successfully", ext_path);
Ok(resp)
}
Err(e) => bail!("could not deserialize remote extension response: {}", e),
Ok(resp) => Ok(resp),
Err(e) => Err((
format!("could not read remote extensions server response: {}", e),
// It's fine to return and report error with status as 200 OK,
// because we still failed to read the response.
Some(status),
)),
},
StatusCode::SERVICE_UNAVAILABLE => bail!("remote extension is temporarily unavailable"),
_ => bail!(
"unexpected remote extension response status code: {}",
resp.status()
),
StatusCode::SERVICE_UNAVAILABLE => Err((
"remote extensions server is temporarily unavailable".to_string(),
Some(status),
)),
_ => Err((
format!(
"unexpected remote extensions server response status code: {}",
status
),
Some(status),
)),
}
}

View File

@@ -2,17 +2,16 @@ use axum::{body::Body, response::Response};
use http::header::CONTENT_TYPE;
use http::StatusCode;
use metrics::proto::MetricFamily;
use metrics::Encoder;
use metrics::TextEncoder;
use metrics::{Encoder, TextEncoder};
use crate::{http::JsonResponse, installed_extensions};
use crate::{http::JsonResponse, metrics::collect};
/// Expose Prometheus metrics.
pub(in crate::http) async fn get_metrics() -> Response {
// When we call TextEncoder::encode() below, it will immediately return an
// error if a metric family has no metrics, so we need to preemptively
// filter out metric families with no metrics.
let metrics = installed_extensions::collect()
let metrics = collect()
.into_iter()
.filter(|m| !m.get_metric().is_empty())
.collect::<Vec<MetricFamily>>();

View File

@@ -1,13 +1,10 @@
use compute_api::responses::{InstalledExtension, InstalledExtensions};
use metrics::proto::MetricFamily;
use std::collections::HashMap;
use anyhow::Result;
use postgres::{Client, NoTls};
use metrics::core::Collector;
use metrics::{register_uint_gauge_vec, UIntGaugeVec};
use once_cell::sync::Lazy;
use crate::metrics::INSTALLED_EXTENSIONS;
/// We don't reuse get_existing_dbs() just for code clarity
/// and to make database listing query here more explicit.
@@ -102,16 +99,3 @@ pub fn get_installed_extensions(mut conf: postgres::config::Config) -> Result<In
extensions: extensions_map.into_values().collect(),
})
}
static INSTALLED_EXTENSIONS: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"compute_installed_extensions",
"Number of databases where the version of extension is installed",
&["extension_name", "version", "owned_by_superuser"]
)
.expect("failed to define a metric")
});
pub fn collect() -> Vec<MetricFamily> {
INSTALLED_EXTENSIONS.collect()
}

View File

@@ -16,6 +16,7 @@ pub mod extension_server;
pub mod installed_extensions;
pub mod local_proxy;
pub mod lsn_lease;
pub mod metrics;
mod migration;
pub mod monitor;
pub mod params;

View File

@@ -0,0 +1,90 @@
use metrics::core::Collector;
use metrics::proto::MetricFamily;
use metrics::{register_int_counter_vec, register_uint_gauge_vec, IntCounterVec, UIntGaugeVec};
use once_cell::sync::Lazy;
pub(crate) static INSTALLED_EXTENSIONS: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"compute_installed_extensions",
"Number of databases where the version of extension is installed",
&["extension_name", "version", "owned_by_superuser"]
)
.expect("failed to define a metric")
});
// Normally, any HTTP API request is described by METHOD (e.g. GET, POST, etc.) + PATH,
// but for all our APIs we defined a 'slug'/method/operationId in the OpenAPI spec.
// And it's fair to call it a 'RPC' (Remote Procedure Call).
pub enum CPlaneRequestRPC {
GetSpec,
}
impl CPlaneRequestRPC {
pub fn as_str(&self) -> &str {
match self {
CPlaneRequestRPC::GetSpec => "GetSpec",
}
}
}
pub const UNKNOWN_HTTP_STATUS: &str = "unknown";
pub(crate) static CPLANE_REQUESTS_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"compute_ctl_cplane_requests_total",
"Total number of control plane requests made by compute_ctl",
&["rpc"]
)
.expect("failed to define a metric")
});
pub(crate) static CPLANE_REQUESTS_FAILED: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"compute_ctl_cplane_requests_failed_total",
"Total number of failed control plane requests made by compute_ctl",
&["rpc", "http_status"]
)
.expect("failed to define a metric")
});
/// Total number of failed database migrations. Per-compute, this is actually a boolean metric,
/// either empty or with a single value (1, migration_id) because we stop at the first failure.
/// Yet, the sum over the fleet will provide the total number of failures.
pub(crate) static DB_MIGRATION_FAILED: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"compute_ctl_db_migration_failed_total",
"Total number of failed database migrations",
&["migration_id"]
)
.expect("failed to define a metric")
});
pub(crate) static REMOTE_EXT_REQUESTS_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"compute_ctl_remote_ext_requests_total",
"Total number of requests made by compute_ctl to download extensions from S3 proxy",
// Do not use any labels like extension name yet.
// We can add them later if needed.
&[]
)
.expect("failed to define a metric")
});
pub(crate) static REMOTE_EXT_REQUESTS_FAILED: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"compute_ctl_remote_ext_requests_failed_total",
"Total number of failed requests to S3 proxy",
&["http_status"]
)
.expect("failed to define a metric")
});
pub fn collect() -> Vec<MetricFamily> {
let mut metrics = INSTALLED_EXTENSIONS.collect();
metrics.extend(CPLANE_REQUESTS_TOTAL.collect());
metrics.extend(CPLANE_REQUESTS_FAILED.collect());
metrics.extend(DB_MIGRATION_FAILED.collect());
metrics.extend(REMOTE_EXT_REQUESTS_TOTAL.collect());
metrics.extend(REMOTE_EXT_REQUESTS_FAILED.collect());
metrics
}

View File

@@ -1,7 +1,9 @@
use anyhow::{Context, Result};
use fail::fail_point;
use postgres::{Client, Transaction};
use tracing::info;
use tracing::{error, info};
use crate::metrics::DB_MIGRATION_FAILED;
/// Runs a series of migrations on a target database
pub(crate) struct MigrationRunner<'m> {
@@ -78,24 +80,31 @@ impl<'m> MigrationRunner<'m> {
Ok(())
}
/// Run an individual migration
fn run_migration(txn: &mut Transaction, migration_id: i64, migration: &str) -> Result<()> {
/// Run an individual migration in a separate transaction block.
fn run_migration(client: &mut Client, migration_id: i64, migration: &str) -> Result<()> {
let mut txn = client
.transaction()
.with_context(|| format!("begin transaction for migration {migration_id}"))?;
if migration.starts_with("-- SKIP") {
info!("Skipping migration id={}", migration_id);
// Even though we are skipping the migration, updating the
// migration ID should help keep logic easy to understand when
// trying to understand the state of a cluster.
Self::update_migration_id(txn, migration_id)?;
Self::update_migration_id(&mut txn, migration_id)?;
} else {
info!("Running migration id={}:\n{}\n", migration_id, migration);
txn.simple_query(migration)
.with_context(|| format!("apply migration {migration_id}"))?;
Self::update_migration_id(txn, migration_id)?;
Self::update_migration_id(&mut txn, migration_id)?;
}
txn.commit()
.with_context(|| format!("commit transaction for migration {migration_id}"))?;
Ok(())
}
@@ -109,19 +118,20 @@ impl<'m> MigrationRunner<'m> {
// The index lags the migration ID by 1, so the current migration
// ID is also the next index
let migration_id = (current_migration + 1) as i64;
let migration = self.migrations[current_migration];
let mut txn = self
.client
.transaction()
.with_context(|| format!("begin transaction for migration {migration_id}"))?;
Self::run_migration(&mut txn, migration_id, self.migrations[current_migration])
.with_context(|| format!("running migration {migration_id}"))?;
txn.commit()
.with_context(|| format!("commit transaction for migration {migration_id}"))?;
info!("Finished migration id={}", migration_id);
match Self::run_migration(self.client, migration_id, migration) {
Ok(_) => {
info!("Finished migration id={}", migration_id);
}
Err(e) => {
error!("Failed to run migration id={}: {}", migration_id, e);
DB_MIGRATION_FAILED
.with_label_values(&[migration_id.to_string().as_str()])
.inc();
return Err(e);
}
}
current_migration += 1;
}

View File

@@ -6,6 +6,9 @@ use std::path::Path;
use tracing::{error, info, instrument, warn};
use crate::config;
use crate::metrics::{
CPlaneRequestRPC, CPLANE_REQUESTS_FAILED, CPLANE_REQUESTS_TOTAL, UNKNOWN_HTTP_STATUS,
};
use crate::migration::MigrationRunner;
use crate::params::PG_HBA_ALL_MD5;
use crate::pg_helpers::*;
@@ -19,7 +22,7 @@ use compute_api::spec::ComputeSpec;
fn do_control_plane_request(
uri: &str,
jwt: &str,
) -> Result<ControlPlaneSpecResponse, (bool, String)> {
) -> Result<ControlPlaneSpecResponse, (bool, String, Option<StatusCode>)> {
let resp = reqwest::blocking::Client::new()
.get(uri)
.header("Authorization", format!("Bearer {}", jwt))
@@ -28,34 +31,41 @@ fn do_control_plane_request(
(
true,
format!("could not perform spec request to control plane: {}", e),
None,
)
})?;
match resp.status() {
let status = resp.status();
match status {
StatusCode::OK => match resp.json::<ControlPlaneSpecResponse>() {
Ok(spec_resp) => Ok(spec_resp),
Err(e) => Err((
true,
format!("could not deserialize control plane response: {}", e),
Some(status),
)),
},
StatusCode::SERVICE_UNAVAILABLE => {
Err((true, "control plane is temporarily unavailable".to_string()))
}
StatusCode::SERVICE_UNAVAILABLE => Err((
true,
"control plane is temporarily unavailable".to_string(),
Some(status),
)),
StatusCode::BAD_GATEWAY => {
// We have a problem with intermittent 502 errors now
// https://github.com/neondatabase/cloud/issues/2353
// It's fine to retry GET request in this case.
Err((true, "control plane request failed with 502".to_string()))
Err((
true,
"control plane request failed with 502".to_string(),
Some(status),
))
}
// Another code, likely 500 or 404, means that compute is unknown to the control plane
// or some internal failure happened. Doesn't make much sense to retry in this case.
_ => Err((
false,
format!(
"unexpected control plane response status code: {}",
resp.status()
),
format!("unexpected control plane response status code: {}", status),
Some(status),
)),
}
}
@@ -82,6 +92,9 @@ pub fn get_spec_from_control_plane(
// - no spec for compute yet (Empty state) -> return Ok(None)
// - got spec -> return Ok(Some(spec))
while attempt < 4 {
CPLANE_REQUESTS_TOTAL
.with_label_values(&[CPlaneRequestRPC::GetSpec.as_str()])
.inc();
spec = match do_control_plane_request(&cp_uri, &jwt) {
Ok(spec_resp) => match spec_resp.status {
ControlPlaneComputeStatus::Empty => Ok(None),
@@ -93,7 +106,13 @@ pub fn get_spec_from_control_plane(
}
}
},
Err((retry, msg)) => {
Err((retry, msg, status)) => {
let status_str = status
.map(|s| s.to_string())
.unwrap_or(UNKNOWN_HTTP_STATUS.to_string());
CPLANE_REQUESTS_FAILED
.with_label_values(&[CPlaneRequestRPC::GetSpec.as_str(), &status_str])
.inc();
if retry {
Err(anyhow!(msg))
} else {

View File

@@ -5,6 +5,7 @@ from typing import TYPE_CHECKING, cast
import pytest
from fixtures.compute_migrations import COMPUTE_MIGRATIONS, NUM_COMPUTE_MIGRATIONS
from fixtures.metrics import parse_metrics
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnv
@@ -33,6 +34,17 @@ def test_compute_migrations_retry(neon_simple_env: NeonEnv, compute_migrations_d
migration_id = cast("int", cur.fetchall()[0][0])
assert migration_id == i - 1
# Check that migration failure is properly recorded in the metrics
client = endpoint.http_client()
raw_metrics = client.metrics()
metrics = parse_metrics(raw_metrics)
failed_migration = metrics.query_all(
"compute_ctl_db_migration_failed_total",
)
assert len(failed_migration) == 1
for sample in failed_migration:
assert sample.value == 1
endpoint.stop()
endpoint.start()

View File

@@ -8,6 +8,7 @@ from typing import TYPE_CHECKING
import pytest
from fixtures.log_helper import log
from fixtures.metrics import parse_metrics
from fixtures.neon_fixtures import (
NeonEnvBuilder,
)
@@ -128,6 +129,17 @@ def test_remote_extensions(
httpserver.check()
# Check that we properly recorded downloads in the metrics
client = endpoint.http_client()
raw_metrics = client.metrics()
metrics = parse_metrics(raw_metrics)
remote_ext_requests = metrics.query_all(
"compute_ctl_remote_ext_requests_total",
)
assert len(remote_ext_requests) == 1
for sample in remote_ext_requests:
assert sample.value == 1
# TODO
# 1. Test downloading remote library.
@@ -137,7 +149,7 @@ def test_remote_extensions(
#
# 3.Test that extension is downloaded after endpoint restart,
# when the library is used in the query.
# Run the test with mutliple simultaneous connections to an endpoint.
# Run the test with multiple simultaneous connections to an endpoint.
# to ensure that the extension is downloaded only once.
#
# 4. Test that private extensions are only downloaded when they are present in the spec.