[BRC-2368] Add PS and compute_ctl metrics to report pagestream request errors (#12716)

## Problem

In our experience running the system so far, almost all of the "hang
compute" situations are due to the compute (postgres) pointing at the
wrong pageservers. We currently mainly rely on the promethesus exporter
(PGExporter) running on PG to detect and report any down time, but these
can be unreliable because the read and write probes the PGExporter runs
do not always generate pageserver requests due to caching, even though
the real user might be experiencing down time when touching uncached
pages.

We are also about to start disk-wiping node pool rotation operations in
prod clusters for our pageservers, and it is critical to have a
convenient way to monitor the impact of these node pool rotations so
that we can quickly respond to any issues. These metrics should provide
very clear signals to address this operational need.

## Summary of changes

Added a pair of metrics to detect issues between postgres' PageStream
protocol (e.g. get_page_at_lsn, get_base_backup, etc.) communications
with pageservers:
* On the compute node (compute_ctl), exports a counter metric that is
incremented every time postgres requests a configuration refresh.
Postgres today only requests these configuration refreshes when it
cannot connect to a pageserver or if the pageserver rejects its request
by disconnecting.
* On the pageserver, exports a counter metric that is incremented every
time it receives a PageStream request that cannot be handled because the
tenant is not known or if the request was routed to the wrong shard
(e.g. secondary).

### How I plan to use metrics
I plan to use the metrics added here to create alerts. The alerts can
fire, for example, if these counters have been continuously increasing
for over a certain period of time. During rollouts, misrouted requests
may occasionally happen, but they should soon die down as
reconfigurations make progress. We can start with something like raising
the alert if the counters have been increasing continuously for over 5
minutes.

## How is this tested?

New integration tests in
`test_runner/regress/test_hadron_ps_connectivity_metrics.py`

Co-authored-by: William Huang <william.huang@databricks.com>
This commit is contained in:
Tristan Partin
2025-07-24 14:05:00 -05:00
committed by GitHub
parent 9eebd6fc79
commit 512210bb5a
7 changed files with 157 additions and 8 deletions

View File

@@ -49,10 +49,10 @@ use compute_tools::compute::{
BUILD_TAG, ComputeNode, ComputeNodeParams, forward_termination_signal, BUILD_TAG, ComputeNode, ComputeNodeParams, forward_termination_signal,
}; };
use compute_tools::extension_server::get_pg_version_string; use compute_tools::extension_server::get_pg_version_string;
use compute_tools::logger::*;
use compute_tools::params::*; use compute_tools::params::*;
use compute_tools::pg_isready::get_pg_isready_bin; use compute_tools::pg_isready::get_pg_isready_bin;
use compute_tools::spec::*; use compute_tools::spec::*;
use compute_tools::{hadron_metrics, installed_extensions, logger::*};
use rlimit::{Resource, setrlimit}; use rlimit::{Resource, setrlimit};
use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM}; use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM};
use signal_hook::iterator::Signals; use signal_hook::iterator::Signals;
@@ -205,6 +205,9 @@ fn main() -> Result<()> {
// enable core dumping for all child processes // enable core dumping for all child processes
setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?; setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
installed_extensions::initialize_metrics();
hadron_metrics::initialize_metrics();
let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?; let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?;
let config = get_config(&cli)?; let config = get_config(&cli)?;

View File

@@ -13,6 +13,7 @@ use metrics::{Encoder, TextEncoder};
use crate::communicator_socket_client::connect_communicator_socket; use crate::communicator_socket_client::connect_communicator_socket;
use crate::compute::ComputeNode; use crate::compute::ComputeNode;
use crate::hadron_metrics;
use crate::http::JsonResponse; use crate::http::JsonResponse;
use crate::metrics::collect; use crate::metrics::collect;
@@ -21,11 +22,18 @@ pub(in crate::http) async fn get_metrics() -> Response {
// When we call TextEncoder::encode() below, it will immediately return an // When we call TextEncoder::encode() below, it will immediately return an
// error if a metric family has no metrics, so we need to preemptively // error if a metric family has no metrics, so we need to preemptively
// filter out metric families with no metrics. // filter out metric families with no metrics.
let metrics = collect() let mut metrics = collect()
.into_iter() .into_iter()
.filter(|m| !m.get_metric().is_empty()) .filter(|m| !m.get_metric().is_empty())
.collect::<Vec<MetricFamily>>(); .collect::<Vec<MetricFamily>>();
// Add Hadron metrics.
let hadron_metrics: Vec<MetricFamily> = hadron_metrics::collect()
.into_iter()
.filter(|m| !m.get_metric().is_empty())
.collect();
metrics.extend(hadron_metrics);
let encoder = TextEncoder::new(); let encoder = TextEncoder::new();
let mut buffer = vec![]; let mut buffer = vec![];

View File

@@ -9,7 +9,7 @@ use axum::{
use http::StatusCode; use http::StatusCode;
use crate::compute::ComputeNode; use crate::compute::ComputeNode;
// use crate::hadron_metrics::POSTGRES_PAGESTREAM_REQUEST_ERRORS; use crate::hadron_metrics::POSTGRES_PAGESTREAM_REQUEST_ERRORS;
use crate::http::JsonResponse; use crate::http::JsonResponse;
/// The /refresh_configuration POST method is used to nudge compute_ctl to pull a new spec /// The /refresh_configuration POST method is used to nudge compute_ctl to pull a new spec
@@ -21,6 +21,7 @@ use crate::http::JsonResponse;
pub(in crate::http) async fn refresh_configuration( pub(in crate::http) async fn refresh_configuration(
State(compute): State<Arc<ComputeNode>>, State(compute): State<Arc<ComputeNode>>,
) -> Response { ) -> Response {
POSTGRES_PAGESTREAM_REQUEST_ERRORS.inc();
match compute.signal_refresh_configuration().await { match compute.signal_refresh_configuration().await {
Ok(_) => StatusCode::OK.into_response(), Ok(_) => StatusCode::OK.into_response(),
Err(e) => JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e), Err(e) => JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e),

View File

@@ -535,6 +535,7 @@ impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrappe
match resolved { match resolved {
ShardResolveResult::Found(tenant_shard) => break tenant_shard, ShardResolveResult::Found(tenant_shard) => break tenant_shard,
ShardResolveResult::NotFound => { ShardResolveResult::NotFound => {
MISROUTED_PAGESTREAM_REQUESTS.inc();
return Err(GetActiveTimelineError::Tenant( return Err(GetActiveTimelineError::Tenant(
GetActiveTenantError::NotFound(GetTenantError::NotFound(*tenant_id)), GetActiveTenantError::NotFound(GetTenantError::NotFound(*tenant_id)),
)); ));

View File

@@ -587,7 +587,9 @@ class NeonLocalCli(AbstractNeonCli):
] ]
extra_env_vars = env or {} extra_env_vars = env or {}
if basebackup_request_tries is not None: if basebackup_request_tries is not None:
extra_env_vars["NEON_COMPUTE_TESTING_BASEBACKUP_TRIES"] = str(basebackup_request_tries) extra_env_vars["NEON_COMPUTE_TESTING_BASEBACKUP_RETRIES"] = str(
basebackup_request_tries
)
if remote_ext_base_url is not None: if remote_ext_base_url is not None:
args.extend(["--remote-ext-base-url", remote_ext_base_url]) args.extend(["--remote-ext-base-url", remote_ext_base_url])
@@ -623,6 +625,7 @@ class NeonLocalCli(AbstractNeonCli):
pageserver_id: int | None = None, pageserver_id: int | None = None,
safekeepers: list[int] | None = None, safekeepers: list[int] | None = None,
check_return_code=True, check_return_code=True,
timeout_sec: float | None = None,
) -> subprocess.CompletedProcess[str]: ) -> subprocess.CompletedProcess[str]:
args = ["endpoint", "reconfigure", endpoint_id] args = ["endpoint", "reconfigure", endpoint_id]
if tenant_id is not None: if tenant_id is not None:
@@ -631,7 +634,7 @@ class NeonLocalCli(AbstractNeonCli):
args.extend(["--pageserver-id", str(pageserver_id)]) args.extend(["--pageserver-id", str(pageserver_id)])
if safekeepers is not None: if safekeepers is not None:
args.extend(["--safekeepers", (",".join(map(str, safekeepers)))]) args.extend(["--safekeepers", (",".join(map(str, safekeepers)))])
return self.raw_cli(args, check_return_code=check_return_code) return self.raw_cli(args, check_return_code=check_return_code, timeout=timeout_sec)
def endpoint_refresh_configuration( def endpoint_refresh_configuration(
self, self,

View File

@@ -4930,7 +4930,12 @@ class Endpoint(PgProtocol, LogUtils):
def is_running(self): def is_running(self):
return self._running._value > 0 return self._running._value > 0
def reconfigure(self, pageserver_id: int | None = None, safekeepers: list[int] | None = None): def reconfigure(
self,
pageserver_id: int | None = None,
safekeepers: list[int] | None = None,
timeout_sec: float = 120,
):
assert self.endpoint_id is not None assert self.endpoint_id is not None
# If `safekeepers` is not None, they are remember them as active and use # If `safekeepers` is not None, they are remember them as active and use
# in the following commands. # in the following commands.
@@ -4941,11 +4946,15 @@ class Endpoint(PgProtocol, LogUtils):
while True: while True:
try: try:
self.env.neon_cli.endpoint_reconfigure( self.env.neon_cli.endpoint_reconfigure(
self.endpoint_id, self.tenant_id, pageserver_id, self.active_safekeepers self.endpoint_id,
self.tenant_id,
pageserver_id,
self.active_safekeepers,
timeout_sec=timeout_sec,
) )
return return
except RuntimeError as e: except RuntimeError as e:
if time.time() - start_time > 120: if time.time() - start_time > timeout_sec:
raise e raise e
log.warning(f"Reconfigure failed with error: {e}. Retrying...") log.warning(f"Reconfigure failed with error: {e}. Retrying...")
time.sleep(5) time.sleep(5)

View File

@@ -0,0 +1,124 @@
import json
import shutil
from fixtures.common_types import TenantShardId
from fixtures.log_helper import log
from fixtures.metrics import parse_metrics
from fixtures.neon_fixtures import Endpoint, NeonEnvBuilder, NeonPageserver
# Helper function to attempt reconfiguration of the compute to point to a new pageserver. Note that in these tests,
# we don't expect the reconfiguration attempts to go through, as we will be pointing the compute at a "wrong" pageserver.
def _attempt_reconfiguration(endpoint: Endpoint, new_pageserver_id: int, timeout_sec: float):
try:
endpoint.reconfigure(pageserver_id=new_pageserver_id, timeout_sec=timeout_sec)
except Exception as e:
log.info(f"reconfiguration failed with exception {e}")
pass
def read_misrouted_metric_value(pageserver: NeonPageserver) -> float:
return (
pageserver.http_client()
.get_metrics()
.query_one("pageserver_misrouted_pagestream_requests_total")
.value
)
def read_request_error_metric_value(endpoint: Endpoint) -> float:
return (
parse_metrics(endpoint.http_client().metrics())
.query_one("pg_cctl_pagestream_request_errors_total")
.value
)
def test_misrouted_to_secondary(
neon_env_builder: NeonEnvBuilder,
):
"""
Tests that the following metrics are incremented when compute tries to talk to a secondary pageserver:
- On pageserver receiving the request: pageserver_misrouted_pagestream_requests_total
- On compute: pg_cctl_pagestream_request_errors_total
"""
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
env.broker.start()
env.storage_controller.start()
for ps in env.pageservers:
ps.start()
for sk in env.safekeepers:
sk.start()
# Create a tenant that has one primary and one secondary. Due to primary/secondary placement constraints,
# the primary and secondary pageservers will be different.
tenant_id, _ = env.create_tenant(shard_count=1, placement_policy=json.dumps({"Attached": 1}))
endpoint = env.endpoints.create(
"main", tenant_id=tenant_id, config_lines=["neon.lakebase_mode = true"]
)
endpoint.respec(skip_pg_catalog_updates=False)
endpoint.start()
# Get the primary pageserver serving the zero shard of the tenant, and detach it from the primary pageserver.
# This test operation configures tenant directly on the pageserver/does not go through the storage controller,
# so the compute does not get any notifications and will keep pointing at the detached pageserver.
tenant_zero_shard = TenantShardId(tenant_id, shard_number=0, shard_count=1)
primary_ps = env.get_tenant_pageserver(tenant_zero_shard)
secondary_ps = (
env.pageservers[1] if primary_ps.id == env.pageservers[0].id else env.pageservers[0]
)
# Now try to point the compute at the pageserver that is acting as secondary for the tenant. Test that the metrics
# on both compute_ctl and the pageserver register the misrouted requests following the reconfiguration attempt.
assert read_misrouted_metric_value(secondary_ps) == 0
assert read_request_error_metric_value(endpoint) == 0
_attempt_reconfiguration(endpoint, new_pageserver_id=secondary_ps.id, timeout_sec=2.0)
assert read_misrouted_metric_value(secondary_ps) > 0, "PS metric not incremented"
assert read_request_error_metric_value(endpoint) > 0, "compute_ctl metric not incremented"
def test_misrouted_to_ps_not_hosting_tenant(
neon_env_builder: NeonEnvBuilder,
):
"""
Tests that the following metrics are incremented when compute tries to talk to a pageserver that does not host the tenant:
- On pageserver receiving the request: pageserver_misrouted_pagestream_requests_total
- On compute: pg_cctl_pagestream_request_errors_total
"""
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
env.broker.start()
env.storage_controller.start(handle_ps_local_disk_loss=False)
for ps in env.pageservers:
ps.start()
for sk in env.safekeepers:
sk.start()
tenant_id, _ = env.create_tenant(shard_count=1)
endpoint = env.endpoints.create(
"main", tenant_id=tenant_id, config_lines=["neon.lakebase_mode = true"]
)
endpoint.respec(skip_pg_catalog_updates=False)
endpoint.start()
tenant_ps_id = env.get_tenant_pageserver(
TenantShardId(tenant_id, shard_number=0, shard_count=1)
).id
non_hosting_ps = (
env.pageservers[1] if tenant_ps_id == env.pageservers[0].id else env.pageservers[0]
)
# Clear the disk of the non-hosting PS to make sure that it indeed doesn't have any information about the tenant.
non_hosting_ps.stop(immediate=True)
shutil.rmtree(non_hosting_ps.tenant_dir())
non_hosting_ps.start()
# Now try to point the compute to the non-hosting pageserver. Test that the metrics
# on both compute_ctl and the pageserver register the misrouted requests following the reconfiguration attempt.
assert read_misrouted_metric_value(non_hosting_ps) == 0
assert read_request_error_metric_value(endpoint) == 0
_attempt_reconfiguration(endpoint, new_pageserver_id=non_hosting_ps.id, timeout_sec=2.0)
assert read_misrouted_metric_value(non_hosting_ps) > 0, "PS metric not incremented"
assert read_request_error_metric_value(endpoint) > 0, "compute_ctl metric not incremented"