diff --git a/Cargo.lock b/Cargo.lock index e7a0d8b965..f25e3d1574 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -286,6 +286,7 @@ dependencies = [ "git-version", "hyper", "metrics", + "once_cell", "pageserver_api", "pageserver_client", "postgres_connection", diff --git a/control_plane/attachment_service/Cargo.toml b/control_plane/attachment_service/Cargo.toml index ada35295f9..9e1c6377ee 100644 --- a/control_plane/attachment_service/Cargo.toml +++ b/control_plane/attachment_service/Cargo.toml @@ -18,6 +18,7 @@ clap.workspace = true futures.workspace = true git-version.workspace = true hyper.workspace = true +once_cell.workspace = true pageserver_api.workspace = true pageserver_client.workspace = true postgres_connection.workspace = true diff --git a/control_plane/attachment_service/src/lib.rs b/control_plane/attachment_service/src/lib.rs index 238efdf5a8..1a2b001392 100644 --- a/control_plane/attachment_service/src/lib.rs +++ b/control_plane/attachment_service/src/lib.rs @@ -3,6 +3,7 @@ use utils::seqwait::MonotonicCounter; mod compute_hook; pub mod http; +pub mod metrics; mod node; pub mod persistence; mod reconciler; diff --git a/control_plane/attachment_service/src/main.rs b/control_plane/attachment_service/src/main.rs index b323ae8820..db4f00644f 100644 --- a/control_plane/attachment_service/src/main.rs +++ b/control_plane/attachment_service/src/main.rs @@ -6,6 +6,7 @@ /// use anyhow::{anyhow, Context}; use attachment_service::http::make_router; +use attachment_service::metrics::preinitialize_metrics; use attachment_service::persistence::Persistence; use attachment_service::service::{Config, Service}; use aws_config::{self, BehaviorVersion, Region}; @@ -205,6 +206,8 @@ async fn async_main() -> anyhow::Result<()> { logging::Output::Stdout, )?; + preinitialize_metrics(); + let args = Cli::parse(); tracing::info!( "version: {}, launch_timestamp: {}, build_tag {}, state at {}, listening on {}", diff --git a/control_plane/attachment_service/src/metrics.rs b/control_plane/attachment_service/src/metrics.rs new file mode 100644 index 0000000000..ffe093b9c8 --- /dev/null +++ b/control_plane/attachment_service/src/metrics.rs @@ -0,0 +1,32 @@ +use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec}; +use once_cell::sync::Lazy; + +pub(crate) struct ReconcilerMetrics { + pub(crate) spawned: IntCounter, + pub(crate) complete: IntCounterVec, +} + +impl ReconcilerMetrics { + // Labels used on [`Self::complete`] + pub(crate) const SUCCESS: &'static str = "ok"; + pub(crate) const ERROR: &'static str = "success"; + pub(crate) const CANCEL: &'static str = "cancel"; +} + +pub(crate) static RECONCILER: Lazy = Lazy::new(|| ReconcilerMetrics { + spawned: register_int_counter!( + "storage_controller_reconcile_spawn", + "Count of how many times we spawn a reconcile task", + ) + .expect("failed to define a metric"), + complete: register_int_counter_vec!( + "storage_controller_reconcile_complete", + "Reconciler tasks completed, broken down by success/failure/cancelled", + &["status"], + ) + .expect("failed to define a metric"), +}); + +pub fn preinitialize_metrics() { + Lazy::force(&RECONCILER); +} diff --git a/control_plane/attachment_service/src/reconciler.rs b/control_plane/attachment_service/src/reconciler.rs index e765dfc2ae..cdd6f76b14 100644 --- a/control_plane/attachment_service/src/reconciler.rs +++ b/control_plane/attachment_service/src/reconciler.rs @@ -92,6 +92,8 @@ impl TargetState { pub(crate) enum ReconcileError { #[error(transparent)] Notify(#[from] NotifyError), + #[error("Cancelled")] + Cancel, #[error(transparent)] Other(#[from] anyhow::Error), } @@ -497,6 +499,9 @@ impl Reconciler { } for (node_id, conf) in changes { + if self.cancel.is_cancelled() { + return Err(ReconcileError::Cancel); + } self.location_config(node_id, conf, None).await?; } diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index 097b4a1a47..b1e66ebdad 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -807,6 +807,15 @@ impl Service { }; shard_state.generation = std::cmp::max(shard_state.generation, new_gen); + if let Some(observed) = shard_state + .observed + .locations + .get_mut(&reattach_req.node_id) + { + if let Some(conf) = observed.conf.as_mut() { + conf.generation = new_gen.into(); + } + } // TODO: cancel/restart any running reconciliation for this tenant, it might be trying // to call location_conf API with an old generation. Wait for cancellation to complete diff --git a/control_plane/attachment_service/src/tenant_state.rs b/control_plane/attachment_service/src/tenant_state.rs index 1a68864091..b0ddb83f06 100644 --- a/control_plane/attachment_service/src/tenant_state.rs +++ b/control_plane/attachment_service/src/tenant_state.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; +use crate::metrics; use control_plane::attachment_service::NodeAvailability; use pageserver_api::{ models::{LocationConfig, LocationConfigMode, TenantConfig}, @@ -570,6 +571,7 @@ impl TenantState { let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq, tenant_id=%reconciler.tenant_shard_id.tenant_id, shard_id=%reconciler.tenant_shard_id.shard_slug()); + metrics::RECONCILER.spawned.inc(); let join_handle = tokio::task::spawn( async move { // Wait for any previous reconcile task to complete before we start @@ -586,6 +588,10 @@ impl TenantState { // TODO: wrap all remote API operations in cancellation check // as well. if reconciler.cancel.is_cancelled() { + metrics::RECONCILER + .complete + .with_label_values(&[metrics::ReconcilerMetrics::CANCEL]) + .inc(); return; } @@ -599,6 +605,20 @@ impl TenantState { reconciler.compute_notify().await.ok(); } + // Update result counter + match &result { + Ok(_) => metrics::RECONCILER + .complete + .with_label_values(&[metrics::ReconcilerMetrics::SUCCESS]), + Err(ReconcileError::Cancel) => metrics::RECONCILER + .complete + .with_label_values(&[metrics::ReconcilerMetrics::CANCEL]), + Err(_) => metrics::RECONCILER + .complete + .with_label_values(&[metrics::ReconcilerMetrics::ERROR]), + } + .inc(); + result_tx .send(ReconcileResult { sequence: reconcile_seq, diff --git a/test_runner/fixtures/metrics.py b/test_runner/fixtures/metrics.py index 418370c3ab..f433db2167 100644 --- a/test_runner/fixtures/metrics.py +++ b/test_runner/fixtures/metrics.py @@ -4,6 +4,8 @@ from typing import Dict, List, Optional, Tuple from prometheus_client.parser import text_string_to_metric_families from prometheus_client.samples import Sample +from fixtures.log_helper import log + class Metrics: metrics: Dict[str, List[Sample]] @@ -31,6 +33,55 @@ class Metrics: return res[0] +class MetricsGetter: + """ + Mixin for types that implement a `get_metrics` function and would like associated + helpers for querying the metrics + """ + + def get_metrics(self) -> Metrics: + raise NotImplementedError() + + def get_metric_value( + self, name: str, filter: Optional[Dict[str, str]] = None + ) -> Optional[float]: + metrics = self.get_metrics() + results = metrics.query_all(name, filter=filter) + if not results: + log.info(f'could not find metric "{name}"') + return None + assert len(results) == 1, f"metric {name} with given filters is not unique, got: {results}" + return results[0].value + + def get_metrics_values( + self, names: list[str], filter: Optional[Dict[str, str]] = None + ) -> Dict[str, float]: + """ + When fetching multiple named metrics, it is more efficient to use this + than to call `get_metric_value` repeatedly. + + Throws RuntimeError if no metrics matching `names` are found, or if + not all of `names` are found: this method is intended for loading sets + of metrics whose existence is coupled. + """ + metrics = self.get_metrics() + samples = [] + for name in names: + samples.extend(metrics.query_all(name, filter=filter)) + + result = {} + for sample in samples: + if sample.name in result: + raise RuntimeError(f"Multiple values found for {sample.name}") + result[sample.name] = sample.value + + if len(result) != len(names): + log.info(f"Metrics found: {metrics.metrics}") + raise RuntimeError(f"could not find all metrics {' '.join(names)}") + + return result + + def parse_metrics(text: str, name: str = "") -> Metrics: metrics = Metrics(name) gen = text_string_to_metric_families(text) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 04af73c327..b347ff44e9 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -46,6 +46,7 @@ from urllib3.util.retry import Retry from fixtures import overlayfs from fixtures.broker import NeonBroker from fixtures.log_helper import log +from fixtures.metrics import Metrics, MetricsGetter, parse_metrics from fixtures.pageserver.allowed_errors import ( DEFAULT_PAGESERVER_ALLOWED_ERRORS, scan_pageserver_log_for_errors, @@ -1913,7 +1914,7 @@ class Pagectl(AbstractNeonCli): return IndexPartDump.from_json(parsed) -class NeonAttachmentService: +class NeonAttachmentService(MetricsGetter): def __init__(self, env: NeonEnv, auth_enabled: bool): self.env = env self.running = False @@ -1951,6 +1952,11 @@ class NeonAttachmentService: return headers + def get_metrics(self) -> Metrics: + res = self.request("GET", f"{self.env.attachment_service_api}/metrics") + res.raise_for_status() + return parse_metrics(res.text) + def ready(self) -> bool: resp = self.request("GET", f"{self.env.attachment_service_api}/ready") if resp.status_code == 503: diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index adea9ca764..6af3b6a912 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -12,7 +12,7 @@ from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from fixtures.log_helper import log -from fixtures.metrics import Metrics, parse_metrics +from fixtures.metrics import Metrics, MetricsGetter, parse_metrics from fixtures.pg_version import PgVersion from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId from fixtures.utils import Fn @@ -125,7 +125,7 @@ class TenantConfig: ) -class PageserverHttpClient(requests.Session): +class PageserverHttpClient(requests.Session, MetricsGetter): def __init__( self, port: int, @@ -721,45 +721,6 @@ class PageserverHttpClient(requests.Session): assert len(matches) < 2, "above filter should uniquely identify metric" return value - def get_metric_value( - self, name: str, filter: Optional[Dict[str, str]] = None - ) -> Optional[float]: - metrics = self.get_metrics() - results = metrics.query_all(name, filter=filter) - if not results: - log.info(f'could not find metric "{name}"') - return None - assert len(results) == 1, f"metric {name} with given filters is not unique, got: {results}" - return results[0].value - - def get_metrics_values( - self, names: list[str], filter: Optional[Dict[str, str]] = None - ) -> Dict[str, float]: - """ - When fetching multiple named metrics, it is more efficient to use this - than to call `get_metric_value` repeatedly. - - Throws RuntimeError if no metrics matching `names` are found, or if - not all of `names` are found: this method is intended for loading sets - of metrics whose existence is coupled. - """ - metrics = self.get_metrics() - samples = [] - for name in names: - samples.extend(metrics.query_all(name, filter=filter)) - - result = {} - for sample in samples: - if sample.name in result: - raise RuntimeError(f"Multiple values found for {sample.name}") - result[sample.name] = sample.value - - if len(result) != len(names): - log.info(f"Metrics found: {metrics.metrics}") - raise RuntimeError(f"could not find all metrics {' '.join(names)}") - - return result - def layer_map_info( self, tenant_id: Union[TenantId, TenantShardId], diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index fcf4b9f72a..5676727a2e 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -255,3 +255,26 @@ def test_sharding_split_smoke( env.neon_cli.tenant_migrate(migrate_shard, destination, timeout_secs=10) workload.validate() + + # Check that we didn't do any spurious reconciliations. + # Total number of reconciles should have been one per original shard, plus + # one for each shard that was migrated. + reconcile_ok = env.attachment_service.get_metric_value( + "storage_controller_reconcile_complete_total", filter={"status": "ok"} + ) + assert reconcile_ok == shard_count + split_shard_count // 2 + + # Check that no cancelled or errored reconciliations occurred: this test does no + # failure injection and should run clean. + assert ( + env.attachment_service.get_metric_value( + "storage_controller_reconcile_complete_total", filter={"status": "cancel"} + ) + is None + ) + assert ( + env.attachment_service.get_metric_value( + "storage_controller_reconcile_complete_total", filter={"status": "error"} + ) + is None + )