diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index fc7a3e2827..da11ac2860 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -45,7 +45,9 @@ use anyhow::{Context, Result}; use clap::Parser; use compute_api::responses::ComputeCtlConfig; use compute_api::spec::ComputeSpec; -use compute_tools::compute::{ComputeNode, ComputeNodeParams, forward_termination_signal}; +use compute_tools::compute::{ + BUILD_TAG, ComputeNode, ComputeNodeParams, forward_termination_signal, +}; use compute_tools::extension_server::get_pg_version_string; use compute_tools::logger::*; use compute_tools::params::*; @@ -57,10 +59,6 @@ use tracing::{error, info}; use url::Url; use utils::failpoint_support; -// this is an arbitrary build tag. Fine as a default / for testing purposes -// in-case of not-set environment var -const BUILD_TAG_DEFAULT: &str = "latest"; - // Compatibility hack: if the control plane specified any remote-ext-config // use the default value for extension storage proxy gateway. // Remove this once the control plane is updated to pass the gateway URL @@ -147,7 +145,7 @@ fn main() -> Result<()> { .build()?; let _rt_guard = runtime.enter(); - let build_tag = runtime.block_on(init())?; + runtime.block_on(init())?; // enable core dumping for all child processes setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?; @@ -174,8 +172,6 @@ fn main() -> Result<()> { cgroup: cli.cgroup, #[cfg(target_os = "linux")] vm_monitor_addr: cli.vm_monitor_addr, - build_tag, - live_config_allowed: cli_spec.live_config_allowed, }, cli_spec.spec, @@ -189,7 +185,7 @@ fn main() -> Result<()> { deinit_and_exit(exit_code); } -async fn init() -> Result { +async fn init() -> Result<()> { init_tracing_and_logging(DEFAULT_LOG_LEVEL).await?; let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?; @@ -199,12 +195,9 @@ async fn init() -> Result { } }); - let build_tag = option_env!("BUILD_TAG") - .unwrap_or(BUILD_TAG_DEFAULT) - .to_string(); - info!("build_tag: {build_tag}"); + info!("compute build_tag: {}", &BUILD_TAG.to_string()); - Ok(build_tag) + Ok(()) } fn try_spec_from_cli(cli: &Cli) -> Result { diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 4126835c1a..f27bf164ae 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -20,6 +20,7 @@ use futures::future::join_all; use futures::stream::FuturesUnordered; use nix::sys::signal::{Signal, kill}; use nix::unistd::Pid; +use once_cell::sync::Lazy; use postgres; use postgres::NoTls; use postgres::error::SqlState; @@ -35,6 +36,7 @@ use crate::disk_quota::set_disk_quota; use crate::installed_extensions::get_installed_extensions; use crate::logger::startup_context_from_env; use crate::lsn_lease::launch_lsn_lease_bg_task_for_static; +use crate::metrics::COMPUTE_CTL_UP; use crate::monitor::launch_monitor; use crate::pg_helpers::*; use crate::rsyslog::{ @@ -49,6 +51,17 @@ use crate::{config, extension_server, local_proxy}; pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0); pub static PG_PID: AtomicU32 = AtomicU32::new(0); +// This is an arbitrary build tag. Fine as a default / for testing purposes +// in-case of not-set environment var +const BUILD_TAG_DEFAULT: &str = "latest"; +/// Build tag/version of the compute node binaries/image. It's tricky and ugly +/// to pass it everywhere as a part of `ComputeNodeParams`, so we use a +/// global static variable. +pub static BUILD_TAG: Lazy = Lazy::new(|| { + option_env!("BUILD_TAG") + .unwrap_or(BUILD_TAG_DEFAULT) + .to_string() +}); /// Static configuration params that don't change after startup. These mostly /// come from the CLI args, or are derived from them. @@ -72,7 +85,6 @@ pub struct ComputeNodeParams { pub pgdata: String, pub pgbin: String, pub pgversion: String, - pub build_tag: String, /// The port that the compute's external HTTP server listens on pub external_http_port: u16, @@ -173,6 +185,11 @@ impl ComputeState { info!("Changing compute status from {} to {}", prev, status); self.status = status; state_changed.notify_all(); + + COMPUTE_CTL_UP.reset(); + COMPUTE_CTL_UP + .with_label_values(&[&BUILD_TAG, format!("{}", status).as_str()]) + .set(1); } pub fn set_failed_status(&mut self, err: anyhow::Error, state_changed: &Condvar) { @@ -352,13 +369,19 @@ impl ComputeNode { } .launch(&this); - // The internal HTTP server could be launched later, but there isn't much - // sense in waiting. + // The internal HTTP server is needed for a further activation by control plane + // if compute was started for a pool, so we have to start server before hanging + // waiting for a spec. crate::http::server::Server::Internal { port: this.params.internal_http_port, } .launch(&this); + // HTTP server is running, so we can officially declare compute_ctl as 'up' + COMPUTE_CTL_UP + .with_label_values(&[&BUILD_TAG, ComputeStatus::Empty.to_string().as_str()]) + .set(1); + // If we got a spec from the CLI already, use that. Otherwise wait for the // control plane to pass it to us with a /configure HTTP request let pspec = if let Some(cli_spec) = cli_spec { @@ -2032,12 +2055,8 @@ LIMIT 100", let mut download_tasks = Vec::new(); for library in &libs_vec { - let (ext_name, ext_path) = remote_extensions.get_ext( - library, - true, - &self.params.build_tag, - &self.params.pgversion, - )?; + let (ext_name, ext_path) = + remote_extensions.get_ext(library, true, &BUILD_TAG, &self.params.pgversion)?; download_tasks.push(self.download_extension(ext_name, ext_path)); } let results = join_all(download_tasks).await; diff --git a/compute_tools/src/http/routes/extension_server.rs b/compute_tools/src/http/routes/extension_server.rs index 563b73ae65..6508de6eee 100644 --- a/compute_tools/src/http/routes/extension_server.rs +++ b/compute_tools/src/http/routes/extension_server.rs @@ -5,7 +5,7 @@ use axum::response::{IntoResponse, Response}; use http::StatusCode; use serde::Deserialize; -use crate::compute::ComputeNode; +use crate::compute::{BUILD_TAG, ComputeNode}; use crate::http::JsonResponse; use crate::http::extract::{Path, Query}; @@ -47,7 +47,7 @@ pub(in crate::http) async fn download_extension( remote_extensions.get_ext( &filename, ext_server_params.is_library, - &compute.params.build_tag, + &BUILD_TAG, &compute.params.pgversion, ) }; diff --git a/compute_tools/src/metrics.rs b/compute_tools/src/metrics.rs index 4caa48307e..52f1795703 100644 --- a/compute_tools/src/metrics.rs +++ b/compute_tools/src/metrics.rs @@ -1,7 +1,8 @@ use metrics::core::{AtomicF64, Collector, GenericGauge}; use metrics::proto::MetricFamily; use metrics::{ - IntCounterVec, UIntGaugeVec, register_gauge, register_int_counter_vec, register_uint_gauge_vec, + IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter_vec, + register_int_gauge_vec, register_uint_gauge_vec, }; use once_cell::sync::Lazy; @@ -70,8 +71,19 @@ pub(crate) static AUDIT_LOG_DIR_SIZE: Lazy> = Lazy::new( .expect("failed to define a metric") }); +// Report that `compute_ctl` is up and what's the current compute status. +pub(crate) static COMPUTE_CTL_UP: Lazy = Lazy::new(|| { + register_int_gauge_vec!( + "compute_ctl_up", + "Whether compute_ctl is running", + &["build_tag", "status"] + ) + .expect("failed to define a metric") +}); + pub fn collect() -> Vec { - let mut metrics = INSTALLED_EXTENSIONS.collect(); + let mut metrics = COMPUTE_CTL_UP.collect(); + metrics.extend(INSTALLED_EXTENSIONS.collect()); metrics.extend(CPLANE_REQUESTS_TOTAL.collect()); metrics.extend(REMOTE_EXT_REQUESTS_TOTAL.collect()); metrics.extend(DB_MIGRATION_FAILED.collect()); diff --git a/test_runner/fixtures/endpoint/http.py b/test_runner/fixtures/endpoint/http.py index 9b28246f58..4073ebc3b9 100644 --- a/test_runner/fixtures/endpoint/http.py +++ b/test_runner/fixtures/endpoint/http.py @@ -5,6 +5,8 @@ import urllib.parse import requests from requests.adapters import HTTPAdapter +from fixtures.log_helper import log + class EndpointHttpClient(requests.Session): def __init__( @@ -51,6 +53,7 @@ class EndpointHttpClient(requests.Session): def metrics(self) -> str: res = self.get(f"http://localhost:{self.external_port}/metrics") res.raise_for_status() + log.debug("raw compute metrics: %s", res.text) return res.text # Current compute status. diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index d3cb35fe49..34a841f59f 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4209,7 +4209,7 @@ class Endpoint(PgProtocol, LogUtils): # Write it back updated with open(config_path, "w") as file: - log.info(json.dumps(dict(data_dict, **kwargs))) + log.debug(json.dumps(dict(data_dict, **kwargs))) json.dump(dict(data_dict, **kwargs), file, indent=4) def respec_deep(self, **kwargs: Any) -> None: @@ -4226,7 +4226,7 @@ class Endpoint(PgProtocol, LogUtils): with open(config_path) as f: data_dict: dict[str, Any] = json.load(f) - log.info("Current compute spec: %s", json.dumps(data_dict, indent=4)) + log.debug("Current compute spec: %s", json.dumps(data_dict, indent=4)) for key, value in kwargs.items(): if isinstance(value, dict): @@ -4238,7 +4238,7 @@ class Endpoint(PgProtocol, LogUtils): data_dict[key] = value with open(config_path, "w") as file: - log.info("Updating compute spec to: %s", json.dumps(data_dict, indent=4)) + log.debug("Updating compute spec to: %s", json.dumps(data_dict, indent=4)) json.dump(data_dict, file, indent=4) def wait_for_migrations(self, wait_for: int = NUM_COMPUTE_MIGRATIONS) -> None: diff --git a/test_runner/regress/test_compute_reconfigure.py b/test_runner/regress/test_compute_reconfigure.py index ed453f3f8d..6396ba67a1 100644 --- a/test_runner/regress/test_compute_reconfigure.py +++ b/test_runner/regress/test_compute_reconfigure.py @@ -1,7 +1,9 @@ from __future__ import annotations +import os from typing import TYPE_CHECKING +from fixtures.metrics import parse_metrics from fixtures.utils import wait_until if TYPE_CHECKING: @@ -64,3 +66,20 @@ def test_compute_reconfigure(neon_simple_env: NeonEnv): row = cursor.fetchone() assert row is not None assert row[0] == TEST_LOG_LINE_PREFIX + + # Check that even after reconfigure and state transitions we still report + # only the current status. + client = endpoint.http_client() + raw_metrics = client.metrics() + metrics = parse_metrics(raw_metrics) + samples = metrics.query_all("compute_ctl_up") + assert len(samples) == 1 + assert samples[0].value == 1 + samples = metrics.query_all("compute_ctl_up", {"status": "running"}) + assert len(samples) == 1 + assert samples[0].value == 1 + # Check that build tag is reported + build_tag = os.environ.get("BUILD_TAG", "latest") + samples = metrics.query_all("compute_ctl_up", {"build_tag": build_tag}) + assert len(samples) == 1 + assert samples[0].value == 1