Add installed_extensions prometheus metric

and add /metrics endpoint to compute_ctl to expose such metrics
This commit is contained in:
Anastasia Lubennikova
2024-10-14 10:32:56 +01:00
parent 72ef0e0fa1
commit b78e140fa8
8 changed files with 195 additions and 17 deletions

3
Cargo.lock generated
View File

@@ -1228,12 +1228,15 @@ dependencies = [
"flate2",
"futures",
"hyper 0.14.30",
"metrics",
"nix 0.27.1",
"notify",
"num_cpus",
"once_cell",
"opentelemetry",
"opentelemetry_sdk",
"postgres",
"prometheus",
"regex",
"remote_storage",
"reqwest 0.12.4",

View File

@@ -18,9 +18,11 @@ clap.workspace = true
flate2.workspace = true
futures.workspace = true
hyper0 = { workspace = true, features = ["full"] }
metrics.workspace = true
nix.workspace = true
notify.workspace = true
num_cpus.workspace = true
once_cell.workspace = true
opentelemetry.workspace = true
opentelemetry_sdk.workspace = true
postgres.workspace = true
@@ -39,6 +41,7 @@ tracing-subscriber.workspace = true
tracing-utils.workspace = true
thiserror.workspace = true
url.workspace = true
prometheus.workspace = true
compute_api.workspace = true
utils.workspace = true

View File

@@ -1121,6 +1121,7 @@ impl ComputeNode {
self.pg_reload_conf()?;
}
self.post_apply_config()?;
self.get_installed_extensions()?;
}
let startup_end_time = Utc::now();
@@ -1489,20 +1490,29 @@ LIMIT 100",
pub fn get_installed_extensions(&self) -> Result<()> {
let connstr = self.connstr.clone();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create runtime");
let result = rt
.block_on(crate::installed_extensions::get_installed_extensions(
connstr,
))
.expect("failed to get installed extensions");
let compute_state = self.state.lock().unwrap().clone();
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let tenant_id = pspec.tenant_id;
let timeline_id = pspec.timeline_id;
info!(
"{}",
serde_json::to_string(&result).expect("failed to serialize extensions list")
);
thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create runtime");
let result = rt
.block_on(crate::installed_extensions::get_installed_extensions(
connstr,
tenant_id,
timeline_id,
))
.expect("failed to get installed extensions");
info!(
"{}",
serde_json::to_string(&result).expect("failed to serialize extensions list")
);
});
Ok(())
}

View File

@@ -19,6 +19,7 @@ use hyper::{Body, Method, Request, Response, Server, StatusCode};
use tokio::task;
use tracing::{debug, error, info, warn};
use tracing_utils::http::OtelName;
use utils::http::endpoint::prometheus_metrics_handler;
use utils::http::request::must_get_query_param;
fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
@@ -62,6 +63,18 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
Response::new(Body::from(serde_json::to_string(&metrics).unwrap()))
}
// Prometheus metrics
(&Method::GET, "/metrics") => {
info!("serving /metrics GET request");
match prometheus_metrics_handler(req).await {
Ok(response) => response,
Err(err) => {
let msg = format!("error handling /metrics request: {err}");
error!(msg);
render_json_error(&msg, StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
// Collect Postgres current usage insights
(&Method::GET, "/insights") => {
info!("serving /insights GET request");
@@ -180,8 +193,15 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
return Response::new(Body::from(msg));
}
let compute_state = compute.state.lock().unwrap().clone();
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let connstr = compute.connstr.clone();
let res = crate::installed_extensions::get_installed_extensions(connstr).await;
let res = crate::installed_extensions::get_installed_extensions(
connstr,
pspec.tenant_id,
pspec.timeline_id,
)
.await;
match res {
Ok(res) => render_json(Body::from(serde_json::to_string(&res).unwrap())),
Err(e) => render_json_error(

View File

@@ -37,6 +37,21 @@ paths:
schema:
$ref: "#/components/schemas/ComputeMetrics"
/metrics
get:
tags:
- Info
summary: Get compute node metrics in
description: ""
operationId: getComputeMetrics
responses:
200:
description: ComputeMetrics
content:
text/plain:
schema:
type: string
description: Metrics in text format.
/insights:
get:
tags:

View File

@@ -2,11 +2,16 @@ use compute_api::responses::{InstalledExtension, InstalledExtensions};
use std::collections::HashMap;
use std::collections::HashSet;
use url::Url;
use utils::id::TenantId;
use utils::id::TimelineId;
use anyhow::Result;
use postgres::{Client, NoTls};
use tokio::task;
use metrics::{register_uint_gauge_vec, UIntGaugeVec};
use once_cell::sync::Lazy;
/// We don't reuse get_existing_dbs() just for code clarity
/// and to make database listing query here more explicit.
///
@@ -35,7 +40,11 @@ fn list_dbs(client: &mut Client) -> Result<Vec<String>> {
/// Connect to every database (see list_dbs above) and get the list of installed extensions.
/// Same extension can be installed in multiple databases with different versions,
/// we only keep the highest and lowest version across all databases.
pub async fn get_installed_extensions(connstr: Url) -> Result<InstalledExtensions> {
pub async fn get_installed_extensions(
connstr: Url,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<InstalledExtensions> {
let mut connstr = connstr.clone();
task::spawn_blocking(move || {
@@ -72,9 +81,97 @@ pub async fn get_installed_extensions(connstr: Url) -> Result<InstalledExtension
}
}
Ok(InstalledExtensions {
let res = InstalledExtensions {
extensions: extensions_map.values().cloned().collect(),
})
};
// set the prometheus metrics
for ext in res.extensions.iter() {
let versions = {
let mut vec: Vec<_> = ext.versions.iter().cloned().collect();
vec.sort();
vec.iter()
.map(|x| x.to_string())
.collect::<Vec<_>>()
.join(",")
};
INSTALLED_EXTENSIONS
.with_label_values(&[
&tenant_id.to_string(),
&timeline_id.to_string(),
&ext.extname,
&versions,
])
.set(ext.n_databases as u64);
}
Ok(res)
})
.await?
}
static INSTALLED_EXTENSIONS: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"installed_extensions",
"Number of databases where extension is installed, versions passed as label",
&["tenant_id", "timeline_id", "extension_name", "versions"]
)
.expect("failed to define a metric")
});
#[cfg(test)]
mod tests {
use super::*;
//use metrics::{core::Collector};
use metrics::{core::Collector, TextEncoder};
#[test]
fn test_installed_extensions() {
let tenant_id = "b0554b632bd4d547a63b86c3630317e8";
let timeline_id = "2414a61ffc94e428f14b5758fe308e13";
let ext = InstalledExtensions {
extensions: vec![
InstalledExtension {
extname: "extension_1".to_string(),
versions: ["1.0".to_string(), "1.5".to_string(), "1.1".to_string()]
.iter()
.cloned()
.collect(),
n_databases: 5,
},
InstalledExtension {
extname: "extension_2".to_string(),
versions: ["4.2".to_string()].iter().cloned().collect(),
n_databases: 2,
},
],
};
for ext in ext.extensions.iter() {
let versions = {
let mut vec: Vec<_> = ext.versions.iter().cloned().collect();
vec.sort();
vec.iter()
.map(|x| x.to_string())
.collect::<Vec<_>>()
.join(",")
};
INSTALLED_EXTENSIONS
.with_label_values(&[tenant_id, timeline_id, &ext.extname, &versions])
.set(ext.n_databases as u64);
}
let mut buffer = String::new();
let metrics = INSTALLED_EXTENSIONS.collect();
let encoder = TextEncoder::new();
encoder.encode_utf8(&metrics, &mut buffer).unwrap();
print!("{}", buffer);
assert_eq!(
buffer,
"# HELP installed_extensions Number of databases where extension is installed, versions passed as label\n# TYPE installed_extensions gauge\ninstalled_extensions{extension_name=\"extension_2\",tenant_id=\"b0554b632bd4d547a63b86c3630317e8\",timeline_id=\"2414a61ffc94e428f14b5758fe308e13\",versions=\"4.2\"} 2\ninstalled_extensions{extension_name=\"extension_1\",tenant_id=\"b0554b632bd4d547a63b86c3630317e8\",timeline_id=\"2414a61ffc94e428f14b5758fe308e13\",versions=\"1.0,1.1,1.5\"} 5\n"
)
}
}

View File

@@ -28,3 +28,8 @@ class EndpointHttpClient(requests.Session):
res = self.get(f"http://localhost:{self.port}/installed_extensions")
res.raise_for_status()
return res.json()
def metrics(self):
res = self.get(f"http://localhost:{self.port}/metrics")
res.raise_for_status()
return res.text

View File

@@ -1,5 +1,7 @@
import time
from logging import info
from fixtures.metrics import parse_metrics
from fixtures.neon_fixtures import NeonEnv
@@ -85,3 +87,26 @@ def test_installed_extensions(neon_simple_env: NeonEnv):
assert ext["n_databases"] == 2
ext["versions"].sort()
assert ext["versions"] == ["1.2", "1.3"]
# check that /metrics endpoint is available
# ensure that we see the metric before and after restart
res = client.metrics()
info("Metrics: %s", res)
m = parse_metrics(res)
neon_m = m.query_all("installed_extensions", {"extension_name": "neon", "versions": "1.2,1.3"})
assert len(neon_m) == 1
for sample in neon_m:
assert sample.value == 2
endpoint.stop()
endpoint.start()
time.sleep(1)
res = client.metrics()
info("After restart metrics: %s", res)
m = parse_metrics(res)
neon_m = m.query_all("installed_extensions", {"extension_name": "neon", "versions": "1.2,1.3"})
assert len(neon_m) == 1
for sample in neon_m:
assert sample.value == 2