From 557127550c291ebab338d5c8c723e5ae5c2bf7ff Mon Sep 17 00:00:00 2001 From: Alexey Kondratov Date: Tue, 1 Apr 2025 10:51:17 +0200 Subject: [PATCH] feat(compute): Add compute_ctl_up metric (#11376) ## Problem For computes running inside NeonVM, the actual compute image tag is buried inside the NeonVM spec, and we cannot get it as part of standard k8s container metrics (it's always an image and a tag of the NeonVM runner container). The workaround we currently use is to extract the running computes info from the control plane database with SQL. It has several drawbacks: i) it's complicated, separate DB per region; ii) it's slow; iii) it's still an indirect source of info, i.e. k8s state could be different from what the control plane expects. ## Summary of changes Add a new `compute_ctl_up` gauge metric with `build_tag` and `status` labels. It will help us to both overview what are the tags/versions of all running computes; and to break them down by current status (`empty`, `running`, `failed`, etc.) Later, we could introduce low cardinality (no endpoint or compute ids) streaming aggregates for such metrics, so they will be blazingly fast and usable for monitoring the fleet-wide state. --- compute_tools/src/bin/compute_ctl.rs | 21 ++++------- compute_tools/src/compute.rs | 37 ++++++++++++++----- .../src/http/routes/extension_server.rs | 4 +- compute_tools/src/metrics.rs | 16 +++++++- test_runner/fixtures/endpoint/http.py | 3 ++ test_runner/fixtures/neon_fixtures.py | 6 +-- .../regress/test_compute_reconfigure.py | 19 ++++++++++ 7 files changed, 76 insertions(+), 30 deletions(-) diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index fc7a3e2827..da11ac2860 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -45,7 +45,9 @@ use anyhow::{Context, Result}; use clap::Parser; use compute_api::responses::ComputeCtlConfig; use compute_api::spec::ComputeSpec; -use compute_tools::compute::{ComputeNode, ComputeNodeParams, forward_termination_signal}; +use compute_tools::compute::{ + BUILD_TAG, ComputeNode, ComputeNodeParams, forward_termination_signal, +}; use compute_tools::extension_server::get_pg_version_string; use compute_tools::logger::*; use compute_tools::params::*; @@ -57,10 +59,6 @@ use tracing::{error, info}; use url::Url; use utils::failpoint_support; -// this is an arbitrary build tag. Fine as a default / for testing purposes -// in-case of not-set environment var -const BUILD_TAG_DEFAULT: &str = "latest"; - // Compatibility hack: if the control plane specified any remote-ext-config // use the default value for extension storage proxy gateway. // Remove this once the control plane is updated to pass the gateway URL @@ -147,7 +145,7 @@ fn main() -> Result<()> { .build()?; let _rt_guard = runtime.enter(); - let build_tag = runtime.block_on(init())?; + runtime.block_on(init())?; // enable core dumping for all child processes setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?; @@ -174,8 +172,6 @@ fn main() -> Result<()> { cgroup: cli.cgroup, #[cfg(target_os = "linux")] vm_monitor_addr: cli.vm_monitor_addr, - build_tag, - live_config_allowed: cli_spec.live_config_allowed, }, cli_spec.spec, @@ -189,7 +185,7 @@ fn main() -> Result<()> { deinit_and_exit(exit_code); } -async fn init() -> Result { +async fn init() -> Result<()> { init_tracing_and_logging(DEFAULT_LOG_LEVEL).await?; let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?; @@ -199,12 +195,9 @@ async fn init() -> Result { } }); - let build_tag = option_env!("BUILD_TAG") - .unwrap_or(BUILD_TAG_DEFAULT) - .to_string(); - info!("build_tag: {build_tag}"); + info!("compute build_tag: {}", &BUILD_TAG.to_string()); - Ok(build_tag) + Ok(()) } fn try_spec_from_cli(cli: &Cli) -> Result { diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 4126835c1a..f27bf164ae 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -20,6 +20,7 @@ use futures::future::join_all; use futures::stream::FuturesUnordered; use nix::sys::signal::{Signal, kill}; use nix::unistd::Pid; +use once_cell::sync::Lazy; use postgres; use postgres::NoTls; use postgres::error::SqlState; @@ -35,6 +36,7 @@ use crate::disk_quota::set_disk_quota; use crate::installed_extensions::get_installed_extensions; use crate::logger::startup_context_from_env; use crate::lsn_lease::launch_lsn_lease_bg_task_for_static; +use crate::metrics::COMPUTE_CTL_UP; use crate::monitor::launch_monitor; use crate::pg_helpers::*; use crate::rsyslog::{ @@ -49,6 +51,17 @@ use crate::{config, extension_server, local_proxy}; pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0); pub static PG_PID: AtomicU32 = AtomicU32::new(0); +// This is an arbitrary build tag. Fine as a default / for testing purposes +// in-case of not-set environment var +const BUILD_TAG_DEFAULT: &str = "latest"; +/// Build tag/version of the compute node binaries/image. It's tricky and ugly +/// to pass it everywhere as a part of `ComputeNodeParams`, so we use a +/// global static variable. +pub static BUILD_TAG: Lazy = Lazy::new(|| { + option_env!("BUILD_TAG") + .unwrap_or(BUILD_TAG_DEFAULT) + .to_string() +}); /// Static configuration params that don't change after startup. These mostly /// come from the CLI args, or are derived from them. @@ -72,7 +85,6 @@ pub struct ComputeNodeParams { pub pgdata: String, pub pgbin: String, pub pgversion: String, - pub build_tag: String, /// The port that the compute's external HTTP server listens on pub external_http_port: u16, @@ -173,6 +185,11 @@ impl ComputeState { info!("Changing compute status from {} to {}", prev, status); self.status = status; state_changed.notify_all(); + + COMPUTE_CTL_UP.reset(); + COMPUTE_CTL_UP + .with_label_values(&[&BUILD_TAG, format!("{}", status).as_str()]) + .set(1); } pub fn set_failed_status(&mut self, err: anyhow::Error, state_changed: &Condvar) { @@ -352,13 +369,19 @@ impl ComputeNode { } .launch(&this); - // The internal HTTP server could be launched later, but there isn't much - // sense in waiting. + // The internal HTTP server is needed for a further activation by control plane + // if compute was started for a pool, so we have to start server before hanging + // waiting for a spec. crate::http::server::Server::Internal { port: this.params.internal_http_port, } .launch(&this); + // HTTP server is running, so we can officially declare compute_ctl as 'up' + COMPUTE_CTL_UP + .with_label_values(&[&BUILD_TAG, ComputeStatus::Empty.to_string().as_str()]) + .set(1); + // If we got a spec from the CLI already, use that. Otherwise wait for the // control plane to pass it to us with a /configure HTTP request let pspec = if let Some(cli_spec) = cli_spec { @@ -2032,12 +2055,8 @@ LIMIT 100", let mut download_tasks = Vec::new(); for library in &libs_vec { - let (ext_name, ext_path) = remote_extensions.get_ext( - library, - true, - &self.params.build_tag, - &self.params.pgversion, - )?; + let (ext_name, ext_path) = + remote_extensions.get_ext(library, true, &BUILD_TAG, &self.params.pgversion)?; download_tasks.push(self.download_extension(ext_name, ext_path)); } let results = join_all(download_tasks).await; diff --git a/compute_tools/src/http/routes/extension_server.rs b/compute_tools/src/http/routes/extension_server.rs index 563b73ae65..6508de6eee 100644 --- a/compute_tools/src/http/routes/extension_server.rs +++ b/compute_tools/src/http/routes/extension_server.rs @@ -5,7 +5,7 @@ use axum::response::{IntoResponse, Response}; use http::StatusCode; use serde::Deserialize; -use crate::compute::ComputeNode; +use crate::compute::{BUILD_TAG, ComputeNode}; use crate::http::JsonResponse; use crate::http::extract::{Path, Query}; @@ -47,7 +47,7 @@ pub(in crate::http) async fn download_extension( remote_extensions.get_ext( &filename, ext_server_params.is_library, - &compute.params.build_tag, + &BUILD_TAG, &compute.params.pgversion, ) }; diff --git a/compute_tools/src/metrics.rs b/compute_tools/src/metrics.rs index 4caa48307e..52f1795703 100644 --- a/compute_tools/src/metrics.rs +++ b/compute_tools/src/metrics.rs @@ -1,7 +1,8 @@ use metrics::core::{AtomicF64, Collector, GenericGauge}; use metrics::proto::MetricFamily; use metrics::{ - IntCounterVec, UIntGaugeVec, register_gauge, register_int_counter_vec, register_uint_gauge_vec, + IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter_vec, + register_int_gauge_vec, register_uint_gauge_vec, }; use once_cell::sync::Lazy; @@ -70,8 +71,19 @@ pub(crate) static AUDIT_LOG_DIR_SIZE: Lazy> = Lazy::new( .expect("failed to define a metric") }); +// Report that `compute_ctl` is up and what's the current compute status. +pub(crate) static COMPUTE_CTL_UP: Lazy = Lazy::new(|| { + register_int_gauge_vec!( + "compute_ctl_up", + "Whether compute_ctl is running", + &["build_tag", "status"] + ) + .expect("failed to define a metric") +}); + pub fn collect() -> Vec { - let mut metrics = INSTALLED_EXTENSIONS.collect(); + let mut metrics = COMPUTE_CTL_UP.collect(); + metrics.extend(INSTALLED_EXTENSIONS.collect()); metrics.extend(CPLANE_REQUESTS_TOTAL.collect()); metrics.extend(REMOTE_EXT_REQUESTS_TOTAL.collect()); metrics.extend(DB_MIGRATION_FAILED.collect()); diff --git a/test_runner/fixtures/endpoint/http.py b/test_runner/fixtures/endpoint/http.py index 9b28246f58..4073ebc3b9 100644 --- a/test_runner/fixtures/endpoint/http.py +++ b/test_runner/fixtures/endpoint/http.py @@ -5,6 +5,8 @@ import urllib.parse import requests from requests.adapters import HTTPAdapter +from fixtures.log_helper import log + class EndpointHttpClient(requests.Session): def __init__( @@ -51,6 +53,7 @@ class EndpointHttpClient(requests.Session): def metrics(self) -> str: res = self.get(f"http://localhost:{self.external_port}/metrics") res.raise_for_status() + log.debug("raw compute metrics: %s", res.text) return res.text # Current compute status. diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index d3cb35fe49..34a841f59f 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4209,7 +4209,7 @@ class Endpoint(PgProtocol, LogUtils): # Write it back updated with open(config_path, "w") as file: - log.info(json.dumps(dict(data_dict, **kwargs))) + log.debug(json.dumps(dict(data_dict, **kwargs))) json.dump(dict(data_dict, **kwargs), file, indent=4) def respec_deep(self, **kwargs: Any) -> None: @@ -4226,7 +4226,7 @@ class Endpoint(PgProtocol, LogUtils): with open(config_path) as f: data_dict: dict[str, Any] = json.load(f) - log.info("Current compute spec: %s", json.dumps(data_dict, indent=4)) + log.debug("Current compute spec: %s", json.dumps(data_dict, indent=4)) for key, value in kwargs.items(): if isinstance(value, dict): @@ -4238,7 +4238,7 @@ class Endpoint(PgProtocol, LogUtils): data_dict[key] = value with open(config_path, "w") as file: - log.info("Updating compute spec to: %s", json.dumps(data_dict, indent=4)) + log.debug("Updating compute spec to: %s", json.dumps(data_dict, indent=4)) json.dump(data_dict, file, indent=4) def wait_for_migrations(self, wait_for: int = NUM_COMPUTE_MIGRATIONS) -> None: diff --git a/test_runner/regress/test_compute_reconfigure.py b/test_runner/regress/test_compute_reconfigure.py index ed453f3f8d..6396ba67a1 100644 --- a/test_runner/regress/test_compute_reconfigure.py +++ b/test_runner/regress/test_compute_reconfigure.py @@ -1,7 +1,9 @@ from __future__ import annotations +import os from typing import TYPE_CHECKING +from fixtures.metrics import parse_metrics from fixtures.utils import wait_until if TYPE_CHECKING: @@ -64,3 +66,20 @@ def test_compute_reconfigure(neon_simple_env: NeonEnv): row = cursor.fetchone() assert row is not None assert row[0] == TEST_LOG_LINE_PREFIX + + # Check that even after reconfigure and state transitions we still report + # only the current status. + client = endpoint.http_client() + raw_metrics = client.metrics() + metrics = parse_metrics(raw_metrics) + samples = metrics.query_all("compute_ctl_up") + assert len(samples) == 1 + assert samples[0].value == 1 + samples = metrics.query_all("compute_ctl_up", {"status": "running"}) + assert len(samples) == 1 + assert samples[0].value == 1 + # Check that build tag is reported + build_tag = os.environ.get("BUILD_TAG", "latest") + samples = metrics.query_all("compute_ctl_up", {"build_tag": build_tag}) + assert len(samples) == 1 + assert samples[0].value == 1