mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 03:52:56 +00:00
Add installed_extensions prometheus metric (#9608)
and add /metrics endpoint to compute_ctl to expose such metrics
metric format example for extension pg_rag
with versions 1.2.3 and 1.4.2
installed in 3 and 1 databases respectively:
neon_extensions_installed{extension="pg_rag", version="1.2.3"} = 3
neon_extensions_installed{extension="pg_rag", version="1.4.2"} = 1
------
infra part: https://github.com/neondatabase/flux-fleet/pull/251
---------
Co-authored-by: Tristan Partin <tristan@neon.tech>
This commit is contained in:
committed by
GitHub
parent
7595d3afe6
commit
080d585b22
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -1229,12 +1229,15 @@ dependencies = [
|
||||
"flate2",
|
||||
"futures",
|
||||
"hyper 0.14.30",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
"notify",
|
||||
"num_cpus",
|
||||
"once_cell",
|
||||
"opentelemetry",
|
||||
"opentelemetry_sdk",
|
||||
"postgres",
|
||||
"prometheus",
|
||||
"regex",
|
||||
"remote_storage",
|
||||
"reqwest 0.12.4",
|
||||
|
||||
@@ -18,9 +18,11 @@ clap.workspace = true
|
||||
flate2.workspace = true
|
||||
futures.workspace = true
|
||||
hyper0 = { workspace = true, features = ["full"] }
|
||||
metrics.workspace = true
|
||||
nix.workspace = true
|
||||
notify.workspace = true
|
||||
num_cpus.workspace = true
|
||||
once_cell.workspace = true
|
||||
opentelemetry.workspace = true
|
||||
opentelemetry_sdk.workspace = true
|
||||
postgres.workspace = true
|
||||
@@ -39,6 +41,7 @@ tracing-subscriber.workspace = true
|
||||
tracing-utils.workspace = true
|
||||
thiserror.workspace = true
|
||||
url.workspace = true
|
||||
prometheus.workspace = true
|
||||
|
||||
compute_api.workspace = true
|
||||
utils.workspace = true
|
||||
|
||||
@@ -9,6 +9,7 @@ use crate::catalog::SchemaDumpError;
|
||||
use crate::catalog::{get_database_schema, get_dbs_and_roles};
|
||||
use crate::compute::forward_termination_signal;
|
||||
use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
|
||||
use crate::installed_extensions;
|
||||
use compute_api::requests::{ConfigurationRequest, ExtensionInstallRequest, SetRoleGrantsRequest};
|
||||
use compute_api::responses::{
|
||||
ComputeStatus, ComputeStatusResponse, ExtensionInstallResult, GenericAPIError,
|
||||
@@ -19,6 +20,8 @@ use anyhow::Result;
|
||||
use hyper::header::CONTENT_TYPE;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
||||
use metrics::Encoder;
|
||||
use metrics::TextEncoder;
|
||||
use tokio::task;
|
||||
use tracing::{debug, error, info, warn};
|
||||
use tracing_utils::http::OtelName;
|
||||
@@ -65,6 +68,28 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
||||
Response::new(Body::from(serde_json::to_string(&metrics).unwrap()))
|
||||
}
|
||||
|
||||
// Prometheus metrics
|
||||
(&Method::GET, "/metrics") => {
|
||||
debug!("serving /metrics GET request");
|
||||
|
||||
let mut buffer = vec![];
|
||||
let metrics = installed_extensions::collect();
|
||||
let encoder = TextEncoder::new();
|
||||
encoder.encode(&metrics, &mut buffer).unwrap();
|
||||
|
||||
match Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(CONTENT_TYPE, encoder.format_type())
|
||||
.body(Body::from(buffer))
|
||||
{
|
||||
Ok(response) => response,
|
||||
Err(err) => {
|
||||
let msg = format!("error handling /metrics request: {err}");
|
||||
error!(msg);
|
||||
render_json_error(&msg, StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Collect Postgres current usage insights
|
||||
(&Method::GET, "/insights") => {
|
||||
info!("serving /insights GET request");
|
||||
|
||||
@@ -37,6 +37,21 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ComputeMetrics"
|
||||
|
||||
/metrics
|
||||
get:
|
||||
tags:
|
||||
- Info
|
||||
summary: Get compute node metrics in text format.
|
||||
description: ""
|
||||
operationId: getComputeMetrics
|
||||
responses:
|
||||
200:
|
||||
description: ComputeMetrics
|
||||
content:
|
||||
text/plain:
|
||||
schema:
|
||||
type: string
|
||||
description: Metrics in text format.
|
||||
/insights:
|
||||
get:
|
||||
tags:
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use compute_api::responses::{InstalledExtension, InstalledExtensions};
|
||||
use metrics::proto::MetricFamily;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use tracing::info;
|
||||
@@ -8,6 +9,10 @@ use anyhow::Result;
|
||||
use postgres::{Client, NoTls};
|
||||
use tokio::task;
|
||||
|
||||
use metrics::core::Collector;
|
||||
use metrics::{register_uint_gauge_vec, UIntGaugeVec};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
/// We don't reuse get_existing_dbs() just for code clarity
|
||||
/// and to make database listing query here more explicit.
|
||||
///
|
||||
@@ -59,6 +64,12 @@ pub async fn get_installed_extensions(connstr: Url) -> Result<InstalledExtension
|
||||
|
||||
for (extname, v) in extensions.iter() {
|
||||
let version = v.to_string();
|
||||
|
||||
// increment the number of databases where the version of extension is installed
|
||||
INSTALLED_EXTENSIONS
|
||||
.with_label_values(&[extname, &version])
|
||||
.inc();
|
||||
|
||||
extensions_map
|
||||
.entry(extname.to_string())
|
||||
.and_modify(|e| {
|
||||
@@ -74,9 +85,11 @@ pub async fn get_installed_extensions(connstr: Url) -> Result<InstalledExtension
|
||||
}
|
||||
}
|
||||
|
||||
Ok(InstalledExtensions {
|
||||
let res = InstalledExtensions {
|
||||
extensions: extensions_map.values().cloned().collect(),
|
||||
})
|
||||
};
|
||||
|
||||
Ok(res)
|
||||
})
|
||||
.await?
|
||||
}
|
||||
@@ -97,6 +110,18 @@ pub fn get_installed_extensions_sync(connstr: Url) -> Result<()> {
|
||||
"[NEON_EXT_STAT] {}",
|
||||
serde_json::to_string(&result).expect("failed to serialize extensions list")
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
static INSTALLED_EXTENSIONS: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"installed_extensions",
|
||||
"Number of databases where the version of extension is installed",
|
||||
&["extension_name", "version"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub fn collect() -> Vec<MetricFamily> {
|
||||
INSTALLED_EXTENSIONS.collect()
|
||||
}
|
||||
|
||||
@@ -46,3 +46,8 @@ class EndpointHttpClient(requests.Session):
|
||||
)
|
||||
res.raise_for_status()
|
||||
return res.json()
|
||||
|
||||
def metrics(self) -> str:
|
||||
res = self.get(f"http://localhost:{self.port}/metrics")
|
||||
res.raise_for_status()
|
||||
return res.text
|
||||
|
||||
@@ -1,6 +1,14 @@
|
||||
from logging import info
|
||||
from __future__ import annotations
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
import time
|
||||
from logging import info
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import parse_metrics
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
def test_installed_extensions(neon_simple_env: NeonEnv):
|
||||
@@ -85,3 +93,52 @@ def test_installed_extensions(neon_simple_env: NeonEnv):
|
||||
assert ext["n_databases"] == 2
|
||||
ext["versions"].sort()
|
||||
assert ext["versions"] == ["1.2", "1.3"]
|
||||
|
||||
# check that /metrics endpoint is available
|
||||
# ensure that we see the metric before and after restart
|
||||
res = client.metrics()
|
||||
info("Metrics: %s", res)
|
||||
m = parse_metrics(res)
|
||||
neon_m = m.query_all("installed_extensions", {"extension_name": "neon", "version": "1.2"})
|
||||
assert len(neon_m) == 1
|
||||
for sample in neon_m:
|
||||
assert sample.value == 2
|
||||
neon_m = m.query_all("installed_extensions", {"extension_name": "neon", "version": "1.3"})
|
||||
assert len(neon_m) == 1
|
||||
for sample in neon_m:
|
||||
assert sample.value == 1
|
||||
|
||||
endpoint.stop()
|
||||
endpoint.start()
|
||||
|
||||
timeout = 10
|
||||
while timeout > 0:
|
||||
try:
|
||||
res = client.metrics()
|
||||
timeout = -1
|
||||
if len(parse_metrics(res).query_all("installed_extensions")) < 4:
|
||||
# Assume that not all metrics that are collected yet
|
||||
time.sleep(1)
|
||||
timeout -= 1
|
||||
continue
|
||||
except Exception:
|
||||
log.exception("failed to get metrics, assume they are not collected yet")
|
||||
time.sleep(1)
|
||||
timeout -= 1
|
||||
continue
|
||||
|
||||
assert (
|
||||
len(parse_metrics(res).query_all("installed_extensions")) >= 4
|
||||
), "Not all metrics are collected"
|
||||
|
||||
info("After restart metrics: %s", res)
|
||||
m = parse_metrics(res)
|
||||
neon_m = m.query_all("installed_extensions", {"extension_name": "neon", "version": "1.2"})
|
||||
assert len(neon_m) == 1
|
||||
for sample in neon_m:
|
||||
assert sample.value == 1
|
||||
|
||||
neon_m = m.query_all("installed_extensions", {"extension_name": "neon", "version": "1.3"})
|
||||
assert len(neon_m) == 1
|
||||
for sample in neon_m:
|
||||
assert sample.value == 1
|
||||
|
||||
Reference in New Issue
Block a user