mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 04:22:56 +00:00
chore(compute): Simplify new compute_ctl metrics and fix flaky test (#10560)
## Problem 1.d04d924added separate metrics for total requests and failures separately, but it doesn't make much sense. We could just have a unified counter with `http_status`. 2. `test_compute_migrations_retry` had a race, i.e., it was waiting for the last successful migration, not an actual failure. This was revealed after adding an assert on failure metric ind04d924. ## Summary of changes 1. Switch to unified counters for `compute_ctl` requests. 2. Add a waiting loop into `test_compute_migrations_retry` to eliminate the race. Part of neondatabase/cloud#17590
This commit is contained in:
@@ -85,7 +85,7 @@ use tracing::info;
|
||||
use tracing::log::warn;
|
||||
use zstd::stream::read::Decoder;
|
||||
|
||||
use crate::metrics::{REMOTE_EXT_REQUESTS_FAILED, REMOTE_EXT_REQUESTS_TOTAL, UNKNOWN_HTTP_STATUS};
|
||||
use crate::metrics::{REMOTE_EXT_REQUESTS_TOTAL, UNKNOWN_HTTP_STATUS};
|
||||
|
||||
fn get_pg_config(argument: &str, pgbin: &str) -> String {
|
||||
// gives the result of `pg_config [argument]`
|
||||
@@ -260,22 +260,20 @@ async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Res
|
||||
|
||||
info!("Download extension {:?} from uri {:?}", ext_path, uri);
|
||||
|
||||
REMOTE_EXT_REQUESTS_TOTAL.with_label_values(&[]).inc();
|
||||
|
||||
match do_extension_server_request(&uri).await {
|
||||
Ok(resp) => {
|
||||
info!(
|
||||
"Successfully downloaded remote extension data {:?}",
|
||||
ext_path
|
||||
);
|
||||
REMOTE_EXT_REQUESTS_TOTAL
|
||||
.with_label_values(&[&StatusCode::OK.to_string()])
|
||||
.inc();
|
||||
Ok(resp)
|
||||
}
|
||||
Err((msg, status)) => {
|
||||
let status_str = status
|
||||
.map(|s| s.to_string())
|
||||
.unwrap_or(UNKNOWN_HTTP_STATUS.to_string());
|
||||
REMOTE_EXT_REQUESTS_FAILED
|
||||
.with_label_values(&[&status_str])
|
||||
REMOTE_EXT_REQUESTS_TOTAL
|
||||
.with_label_values(&[&status])
|
||||
.inc();
|
||||
bail!(msg);
|
||||
}
|
||||
@@ -283,12 +281,12 @@ async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Res
|
||||
}
|
||||
|
||||
// Do a single remote extensions server request.
|
||||
// Return result or (error message + status code) in case of any failures.
|
||||
async fn do_extension_server_request(uri: &str) -> Result<Bytes, (String, Option<StatusCode>)> {
|
||||
// Return result or (error message + stringified status code) in case of any failures.
|
||||
async fn do_extension_server_request(uri: &str) -> Result<Bytes, (String, String)> {
|
||||
let resp = reqwest::get(uri).await.map_err(|e| {
|
||||
(
|
||||
format!("could not perform remote extensions server request: {}", e),
|
||||
None,
|
||||
UNKNOWN_HTTP_STATUS.to_string(),
|
||||
)
|
||||
})?;
|
||||
let status = resp.status();
|
||||
@@ -300,19 +298,19 @@ async fn do_extension_server_request(uri: &str) -> Result<Bytes, (String, Option
|
||||
format!("could not read remote extensions server response: {}", e),
|
||||
// It's fine to return and report error with status as 200 OK,
|
||||
// because we still failed to read the response.
|
||||
Some(status),
|
||||
status.to_string(),
|
||||
)),
|
||||
},
|
||||
StatusCode::SERVICE_UNAVAILABLE => Err((
|
||||
"remote extensions server is temporarily unavailable".to_string(),
|
||||
Some(status),
|
||||
status.to_string(),
|
||||
)),
|
||||
_ => Err((
|
||||
format!(
|
||||
"unexpected remote extensions server response status code: {}",
|
||||
status
|
||||
),
|
||||
Some(status),
|
||||
status.to_string(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,16 +32,7 @@ pub const UNKNOWN_HTTP_STATUS: &str = "unknown";
|
||||
pub(crate) static CPLANE_REQUESTS_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"compute_ctl_cplane_requests_total",
|
||||
"Total number of control plane requests made by compute_ctl",
|
||||
&["rpc"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static CPLANE_REQUESTS_FAILED: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"compute_ctl_cplane_requests_failed_total",
|
||||
"Total number of failed control plane requests made by compute_ctl",
|
||||
"Total number of control plane requests made by compute_ctl by status",
|
||||
&["rpc", "http_status"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
@@ -62,18 +53,9 @@ pub(crate) static DB_MIGRATION_FAILED: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
pub(crate) static REMOTE_EXT_REQUESTS_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"compute_ctl_remote_ext_requests_total",
|
||||
"Total number of requests made by compute_ctl to download extensions from S3 proxy",
|
||||
"Total number of requests made by compute_ctl to download extensions from S3 proxy by status",
|
||||
// Do not use any labels like extension name yet.
|
||||
// We can add them later if needed.
|
||||
&[]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static REMOTE_EXT_REQUESTS_FAILED: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"compute_ctl_remote_ext_requests_failed_total",
|
||||
"Total number of failed requests to S3 proxy",
|
||||
&["http_status"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
@@ -82,9 +64,7 @@ pub(crate) static REMOTE_EXT_REQUESTS_FAILED: Lazy<IntCounterVec> = Lazy::new(||
|
||||
pub fn collect() -> Vec<MetricFamily> {
|
||||
let mut metrics = INSTALLED_EXTENSIONS.collect();
|
||||
metrics.extend(CPLANE_REQUESTS_TOTAL.collect());
|
||||
metrics.extend(CPLANE_REQUESTS_FAILED.collect());
|
||||
metrics.extend(DB_MIGRATION_FAILED.collect());
|
||||
metrics.extend(REMOTE_EXT_REQUESTS_TOTAL.collect());
|
||||
metrics.extend(REMOTE_EXT_REQUESTS_FAILED.collect());
|
||||
metrics.extend(DB_MIGRATION_FAILED.collect());
|
||||
metrics
|
||||
}
|
||||
|
||||
@@ -6,9 +6,7 @@ use std::path::Path;
|
||||
use tracing::{error, info, instrument, warn};
|
||||
|
||||
use crate::config;
|
||||
use crate::metrics::{
|
||||
CPlaneRequestRPC, CPLANE_REQUESTS_FAILED, CPLANE_REQUESTS_TOTAL, UNKNOWN_HTTP_STATUS,
|
||||
};
|
||||
use crate::metrics::{CPlaneRequestRPC, CPLANE_REQUESTS_TOTAL, UNKNOWN_HTTP_STATUS};
|
||||
use crate::migration::MigrationRunner;
|
||||
use crate::params::PG_HBA_ALL_MD5;
|
||||
use crate::pg_helpers::*;
|
||||
@@ -22,7 +20,7 @@ use compute_api::spec::ComputeSpec;
|
||||
fn do_control_plane_request(
|
||||
uri: &str,
|
||||
jwt: &str,
|
||||
) -> Result<ControlPlaneSpecResponse, (bool, String, Option<StatusCode>)> {
|
||||
) -> Result<ControlPlaneSpecResponse, (bool, String, String)> {
|
||||
let resp = reqwest::blocking::Client::new()
|
||||
.get(uri)
|
||||
.header("Authorization", format!("Bearer {}", jwt))
|
||||
@@ -31,7 +29,7 @@ fn do_control_plane_request(
|
||||
(
|
||||
true,
|
||||
format!("could not perform spec request to control plane: {}", e),
|
||||
None,
|
||||
UNKNOWN_HTTP_STATUS.to_string(),
|
||||
)
|
||||
})?;
|
||||
|
||||
@@ -42,13 +40,13 @@ fn do_control_plane_request(
|
||||
Err(e) => Err((
|
||||
true,
|
||||
format!("could not deserialize control plane response: {}", e),
|
||||
Some(status),
|
||||
status.to_string(),
|
||||
)),
|
||||
},
|
||||
StatusCode::SERVICE_UNAVAILABLE => Err((
|
||||
true,
|
||||
"control plane is temporarily unavailable".to_string(),
|
||||
Some(status),
|
||||
status.to_string(),
|
||||
)),
|
||||
StatusCode::BAD_GATEWAY => {
|
||||
// We have a problem with intermittent 502 errors now
|
||||
@@ -57,7 +55,7 @@ fn do_control_plane_request(
|
||||
Err((
|
||||
true,
|
||||
"control plane request failed with 502".to_string(),
|
||||
Some(status),
|
||||
status.to_string(),
|
||||
))
|
||||
}
|
||||
// Another code, likely 500 or 404, means that compute is unknown to the control plane
|
||||
@@ -65,7 +63,7 @@ fn do_control_plane_request(
|
||||
_ => Err((
|
||||
false,
|
||||
format!("unexpected control plane response status code: {}", status),
|
||||
Some(status),
|
||||
status.to_string(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
@@ -92,26 +90,28 @@ pub fn get_spec_from_control_plane(
|
||||
// - no spec for compute yet (Empty state) -> return Ok(None)
|
||||
// - got spec -> return Ok(Some(spec))
|
||||
while attempt < 4 {
|
||||
CPLANE_REQUESTS_TOTAL
|
||||
.with_label_values(&[CPlaneRequestRPC::GetSpec.as_str()])
|
||||
.inc();
|
||||
spec = match do_control_plane_request(&cp_uri, &jwt) {
|
||||
Ok(spec_resp) => match spec_resp.status {
|
||||
ControlPlaneComputeStatus::Empty => Ok(None),
|
||||
ControlPlaneComputeStatus::Attached => {
|
||||
if let Some(spec) = spec_resp.spec {
|
||||
Ok(Some(spec))
|
||||
} else {
|
||||
bail!("compute is attached, but spec is empty")
|
||||
Ok(spec_resp) => {
|
||||
CPLANE_REQUESTS_TOTAL
|
||||
.with_label_values(&[
|
||||
CPlaneRequestRPC::GetSpec.as_str(),
|
||||
&StatusCode::OK.to_string(),
|
||||
])
|
||||
.inc();
|
||||
match spec_resp.status {
|
||||
ControlPlaneComputeStatus::Empty => Ok(None),
|
||||
ControlPlaneComputeStatus::Attached => {
|
||||
if let Some(spec) = spec_resp.spec {
|
||||
Ok(Some(spec))
|
||||
} else {
|
||||
bail!("compute is attached, but spec is empty")
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
Err((retry, msg, status)) => {
|
||||
let status_str = status
|
||||
.map(|s| s.to_string())
|
||||
.unwrap_or(UNKNOWN_HTTP_STATUS.to_string());
|
||||
CPLANE_REQUESTS_FAILED
|
||||
.with_label_values(&[CPlaneRequestRPC::GetSpec.as_str(), &status_str])
|
||||
CPLANE_REQUESTS_TOTAL
|
||||
.with_label_values(&[CPlaneRequestRPC::GetSpec.as_str(), &status])
|
||||
.inc();
|
||||
if retry {
|
||||
Err(anyhow!(msg))
|
||||
|
||||
@@ -6,6 +6,7 @@ from typing import TYPE_CHECKING, cast
|
||||
import pytest
|
||||
from fixtures.compute_migrations import COMPUTE_MIGRATIONS, NUM_COMPUTE_MIGRATIONS
|
||||
from fixtures.metrics import parse_metrics
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
@@ -24,7 +25,26 @@ def test_compute_migrations_retry(neon_simple_env: NeonEnv, compute_migrations_d
|
||||
for i in range(1, NUM_COMPUTE_MIGRATIONS + 1):
|
||||
endpoint.start(env={"FAILPOINTS": f"compute-migration=return({i})"})
|
||||
|
||||
# Make sure that the migrations ran
|
||||
# Check that migration failure is properly recorded in the metrics
|
||||
#
|
||||
# N.B. wait_for_migrations() only waits till the last successful
|
||||
# migration is applied. It doesn't wait till the migration failure due
|
||||
# to the failpoint. This opens a race for checking the metrics. To avoid
|
||||
# this, we first wait until the migration failure metric is seen.
|
||||
def check_migration_failure_metrics():
|
||||
client = endpoint.http_client()
|
||||
raw_metrics = client.metrics()
|
||||
metrics = parse_metrics(raw_metrics)
|
||||
failed_migration = metrics.query_all(
|
||||
"compute_ctl_db_migration_failed_total",
|
||||
)
|
||||
assert len(failed_migration) == 1
|
||||
for sample in failed_migration:
|
||||
assert sample.value == 1
|
||||
|
||||
wait_until(check_migration_failure_metrics)
|
||||
|
||||
# Make sure that all migrations before the failed one are applied
|
||||
endpoint.wait_for_migrations(wait_for=i - 1)
|
||||
|
||||
# Confirm that we correctly recorded that in the
|
||||
@@ -34,17 +54,6 @@ def test_compute_migrations_retry(neon_simple_env: NeonEnv, compute_migrations_d
|
||||
migration_id = cast("int", cur.fetchall()[0][0])
|
||||
assert migration_id == i - 1
|
||||
|
||||
# Check that migration failure is properly recorded in the metrics
|
||||
client = endpoint.http_client()
|
||||
raw_metrics = client.metrics()
|
||||
metrics = parse_metrics(raw_metrics)
|
||||
failed_migration = metrics.query_all(
|
||||
"compute_ctl_db_migration_failed_total",
|
||||
)
|
||||
assert len(failed_migration) == 1
|
||||
for sample in failed_migration:
|
||||
assert sample.value == 1
|
||||
|
||||
endpoint.stop()
|
||||
|
||||
endpoint.start()
|
||||
|
||||
Reference in New Issue
Block a user