diff --git a/Cargo.lock b/Cargo.lock index 865fb33889..c25950f78f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1228,12 +1228,15 @@ dependencies = [ "flate2", "futures", "hyper 0.14.30", + "metrics", "nix 0.27.1", "notify", "num_cpus", + "once_cell", "opentelemetry", "opentelemetry_sdk", "postgres", + "prometheus", "regex", "remote_storage", "reqwest 0.12.4", diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 91e0b9d5b8..0bf4ed53d6 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -18,9 +18,11 @@ clap.workspace = true flate2.workspace = true futures.workspace = true hyper0 = { workspace = true, features = ["full"] } +metrics.workspace = true nix.workspace = true notify.workspace = true num_cpus.workspace = true +once_cell.workspace = true opentelemetry.workspace = true opentelemetry_sdk.workspace = true postgres.workspace = true @@ -39,6 +41,7 @@ tracing-subscriber.workspace = true tracing-utils.workspace = true thiserror.workspace = true url.workspace = true +prometheus.workspace = true compute_api.workspace = true utils.workspace = true diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 285be56264..f256a7430b 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -1121,6 +1121,7 @@ impl ComputeNode { self.pg_reload_conf()?; } self.post_apply_config()?; + self.get_installed_extensions()?; } let startup_end_time = Utc::now(); @@ -1489,20 +1490,29 @@ LIMIT 100", pub fn get_installed_extensions(&self) -> Result<()> { let connstr = self.connstr.clone(); - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("failed to create runtime"); - let result = rt - .block_on(crate::installed_extensions::get_installed_extensions( - connstr, - )) - .expect("failed to get installed extensions"); + let compute_state = self.state.lock().unwrap().clone(); + let pspec = compute_state.pspec.as_ref().expect("spec must be set"); + let tenant_id = pspec.tenant_id; + let timeline_id = pspec.timeline_id; - info!( - "{}", - serde_json::to_string(&result).expect("failed to serialize extensions list") - ); + thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("failed to create runtime"); + let result = rt + .block_on(crate::installed_extensions::get_installed_extensions( + connstr, + tenant_id, + timeline_id, + )) + .expect("failed to get installed extensions"); + + info!( + "{}", + serde_json::to_string(&result).expect("failed to serialize extensions list") + ); + }); Ok(()) } diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index 79e6158081..5f2dc9993b 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -19,6 +19,7 @@ use hyper::{Body, Method, Request, Response, Server, StatusCode}; use tokio::task; use tracing::{debug, error, info, warn}; use tracing_utils::http::OtelName; +use utils::http::endpoint::prometheus_metrics_handler; use utils::http::request::must_get_query_param; fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse { @@ -62,6 +63,18 @@ async fn routes(req: Request, compute: &Arc) -> Response { + info!("serving /metrics GET request"); + match prometheus_metrics_handler(req).await { + Ok(response) => response, + Err(err) => { + let msg = format!("error handling /metrics request: {err}"); + error!(msg); + render_json_error(&msg, StatusCode::INTERNAL_SERVER_ERROR) + } + } + } // Collect Postgres current usage insights (&Method::GET, "/insights") => { info!("serving /insights GET request"); @@ -180,8 +193,15 @@ async fn routes(req: Request, compute: &Arc) -> Response render_json(Body::from(serde_json::to_string(&res).unwrap())), Err(e) => render_json_error( diff --git a/compute_tools/src/http/openapi_spec.yaml b/compute_tools/src/http/openapi_spec.yaml index e9fa66b323..31d6d0fd7a 100644 --- a/compute_tools/src/http/openapi_spec.yaml +++ b/compute_tools/src/http/openapi_spec.yaml @@ -37,6 +37,21 @@ paths: schema: $ref: "#/components/schemas/ComputeMetrics" + /metrics + get: + tags: + - Info + summary: Get compute node metrics in + description: "" + operationId: getComputeMetrics + responses: + 200: + description: ComputeMetrics + content: + text/plain: + schema: + type: string + description: Metrics in text format. /insights: get: tags: diff --git a/compute_tools/src/installed_extensions.rs b/compute_tools/src/installed_extensions.rs index 3d8b22a8a3..8855c592dd 100644 --- a/compute_tools/src/installed_extensions.rs +++ b/compute_tools/src/installed_extensions.rs @@ -2,11 +2,16 @@ use compute_api::responses::{InstalledExtension, InstalledExtensions}; use std::collections::HashMap; use std::collections::HashSet; use url::Url; +use utils::id::TenantId; +use utils::id::TimelineId; use anyhow::Result; use postgres::{Client, NoTls}; use tokio::task; +use metrics::{register_uint_gauge_vec, UIntGaugeVec}; +use once_cell::sync::Lazy; + /// We don't reuse get_existing_dbs() just for code clarity /// and to make database listing query here more explicit. /// @@ -35,7 +40,11 @@ fn list_dbs(client: &mut Client) -> Result> { /// Connect to every database (see list_dbs above) and get the list of installed extensions. /// Same extension can be installed in multiple databases with different versions, /// we only keep the highest and lowest version across all databases. -pub async fn get_installed_extensions(connstr: Url) -> Result { +pub async fn get_installed_extensions( + connstr: Url, + tenant_id: TenantId, + timeline_id: TimelineId, +) -> Result { let mut connstr = connstr.clone(); task::spawn_blocking(move || { @@ -72,9 +81,97 @@ pub async fn get_installed_extensions(connstr: Url) -> Result = ext.versions.iter().cloned().collect(); + vec.sort(); + vec.iter() + .map(|x| x.to_string()) + .collect::>() + .join(",") + }; + + INSTALLED_EXTENSIONS + .with_label_values(&[ + &tenant_id.to_string(), + &timeline_id.to_string(), + &ext.extname, + &versions, + ]) + .set(ext.n_databases as u64); + } + + Ok(res) }) .await? } + +static INSTALLED_EXTENSIONS: Lazy = Lazy::new(|| { + register_uint_gauge_vec!( + "installed_extensions", + "Number of databases where extension is installed, versions passed as label", + &["tenant_id", "timeline_id", "extension_name", "versions"] + ) + .expect("failed to define a metric") +}); + +#[cfg(test)] +mod tests { + use super::*; + //use metrics::{core::Collector}; + use metrics::{core::Collector, TextEncoder}; + + #[test] + fn test_installed_extensions() { + let tenant_id = "b0554b632bd4d547a63b86c3630317e8"; + let timeline_id = "2414a61ffc94e428f14b5758fe308e13"; + let ext = InstalledExtensions { + extensions: vec![ + InstalledExtension { + extname: "extension_1".to_string(), + versions: ["1.0".to_string(), "1.5".to_string(), "1.1".to_string()] + .iter() + .cloned() + .collect(), + n_databases: 5, + }, + InstalledExtension { + extname: "extension_2".to_string(), + versions: ["4.2".to_string()].iter().cloned().collect(), + n_databases: 2, + }, + ], + }; + + for ext in ext.extensions.iter() { + let versions = { + let mut vec: Vec<_> = ext.versions.iter().cloned().collect(); + vec.sort(); + vec.iter() + .map(|x| x.to_string()) + .collect::>() + .join(",") + }; + + INSTALLED_EXTENSIONS + .with_label_values(&[tenant_id, timeline_id, &ext.extname, &versions]) + .set(ext.n_databases as u64); + } + + let mut buffer = String::new(); + let metrics = INSTALLED_EXTENSIONS.collect(); + let encoder = TextEncoder::new(); + encoder.encode_utf8(&metrics, &mut buffer).unwrap(); + + print!("{}", buffer); + assert_eq!( + buffer, + "# HELP installed_extensions Number of databases where extension is installed, versions passed as label\n# TYPE installed_extensions gauge\ninstalled_extensions{extension_name=\"extension_2\",tenant_id=\"b0554b632bd4d547a63b86c3630317e8\",timeline_id=\"2414a61ffc94e428f14b5758fe308e13\",versions=\"4.2\"} 2\ninstalled_extensions{extension_name=\"extension_1\",tenant_id=\"b0554b632bd4d547a63b86c3630317e8\",timeline_id=\"2414a61ffc94e428f14b5758fe308e13\",versions=\"1.0,1.1,1.5\"} 5\n" + ) + } +} diff --git a/test_runner/fixtures/endpoint/http.py b/test_runner/fixtures/endpoint/http.py index 26895df8a6..05c49e0380 100644 --- a/test_runner/fixtures/endpoint/http.py +++ b/test_runner/fixtures/endpoint/http.py @@ -28,3 +28,8 @@ class EndpointHttpClient(requests.Session): res = self.get(f"http://localhost:{self.port}/installed_extensions") res.raise_for_status() return res.json() + + def metrics(self): + res = self.get(f"http://localhost:{self.port}/metrics") + res.raise_for_status() + return res.text diff --git a/test_runner/regress/test_installed_extensions.py b/test_runner/regress/test_installed_extensions.py index 4700db85ee..558348c365 100644 --- a/test_runner/regress/test_installed_extensions.py +++ b/test_runner/regress/test_installed_extensions.py @@ -1,5 +1,7 @@ +import time from logging import info +from fixtures.metrics import parse_metrics from fixtures.neon_fixtures import NeonEnv @@ -85,3 +87,26 @@ def test_installed_extensions(neon_simple_env: NeonEnv): assert ext["n_databases"] == 2 ext["versions"].sort() assert ext["versions"] == ["1.2", "1.3"] + + # check that /metrics endpoint is available + # ensure that we see the metric before and after restart + res = client.metrics() + info("Metrics: %s", res) + m = parse_metrics(res) + neon_m = m.query_all("installed_extensions", {"extension_name": "neon", "versions": "1.2,1.3"}) + assert len(neon_m) == 1 + for sample in neon_m: + assert sample.value == 2 + + endpoint.stop() + endpoint.start() + + time.sleep(1) + + res = client.metrics() + info("After restart metrics: %s", res) + m = parse_metrics(res) + neon_m = m.query_all("installed_extensions", {"extension_name": "neon", "versions": "1.2,1.3"}) + assert len(neon_m) == 1 + for sample in neon_m: + assert sample.value == 2