Compare commits

...

4 Commits

Author SHA1 Message Date
Anastasia Lubennikova
6ac7188c82 fix test timeout logic 2024-10-16 15:48:56 +01:00
Anastasia Lubennikova
2575be6dec remove redundant labels 2024-10-15 14:34:26 +01:00
Anastasia Lubennikova
e7d9f9245d Review fixes:
- do not use default collector,  collect only one metric;
- remove unit test, same behavior is covered by python test;
- code cleanup
2024-10-15 12:36:45 +01:00
Anastasia Lubennikova
b78e140fa8 Add installed_extensions prometheus metric
and add /metrics endpoint to compute_ctl to expose such metrics
2024-10-14 10:32:56 +01:00
8 changed files with 140 additions and 15 deletions

3
Cargo.lock generated
View File

@@ -1228,12 +1228,15 @@ dependencies = [
"flate2",
"futures",
"hyper 0.14.30",
"metrics",
"nix 0.27.1",
"notify",
"num_cpus",
"once_cell",
"opentelemetry",
"opentelemetry_sdk",
"postgres",
"prometheus",
"regex",
"remote_storage",
"reqwest 0.12.4",

View File

@@ -18,9 +18,11 @@ clap.workspace = true
flate2.workspace = true
futures.workspace = true
hyper0 = { workspace = true, features = ["full"] }
metrics.workspace = true
nix.workspace = true
notify.workspace = true
num_cpus.workspace = true
once_cell.workspace = true
opentelemetry.workspace = true
opentelemetry_sdk.workspace = true
postgres.workspace = true
@@ -39,6 +41,7 @@ tracing-subscriber.workspace = true
tracing-utils.workspace = true
thiserror.workspace = true
url.workspace = true
prometheus.workspace = true
compute_api.workspace = true
utils.workspace = true

View File

@@ -1121,6 +1121,7 @@ impl ComputeNode {
self.pg_reload_conf()?;
}
self.post_apply_config()?;
self.get_installed_extensions()?;
}
let startup_end_time = Utc::now();
@@ -1489,20 +1490,22 @@ LIMIT 100",
pub fn get_installed_extensions(&self) -> Result<()> {
let connstr = self.connstr.clone();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create runtime");
let result = rt
.block_on(crate::installed_extensions::get_installed_extensions(
connstr,
))
.expect("failed to get installed extensions");
thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create runtime");
let result = rt
.block_on(crate::installed_extensions::get_installed_extensions(
connstr,
))
.expect("failed to get installed extensions");
info!(
"{}",
serde_json::to_string(&result).expect("failed to serialize extensions list")
);
info!(
"{}",
serde_json::to_string(&result).expect("failed to serialize extensions list")
);
});
Ok(())
}

View File

@@ -9,6 +9,7 @@ use crate::catalog::SchemaDumpError;
use crate::catalog::{get_database_schema, get_dbs_and_roles};
use crate::compute::forward_termination_signal;
use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
use crate::installed_extensions;
use compute_api::requests::ConfigurationRequest;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
@@ -16,6 +17,8 @@ use anyhow::Result;
use hyper::header::CONTENT_TYPE;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use metrics::Encoder;
use metrics::TextEncoder;
use tokio::task;
use tracing::{debug, error, info, warn};
use tracing_utils::http::OtelName;
@@ -62,6 +65,28 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
Response::new(Body::from(serde_json::to_string(&metrics).unwrap()))
}
// Prometheus metrics
(&Method::GET, "/metrics") => {
debug!("serving /metrics GET request");
let mut buffer = vec![];
let metrics = installed_extensions::collect();
let encoder = TextEncoder::new();
encoder.encode(&metrics, &mut buffer).unwrap();
match Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, encoder.format_type())
.body(Body::from(buffer))
{
Ok(response) => response,
Err(err) => {
let msg = format!("error handling /metrics request: {err}");
error!(msg);
render_json_error(&msg, StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
// Collect Postgres current usage insights
(&Method::GET, "/insights") => {
info!("serving /insights GET request");

View File

@@ -37,6 +37,21 @@ paths:
schema:
$ref: "#/components/schemas/ComputeMetrics"
/metrics
get:
tags:
- Info
summary: Get compute node metrics in
description: ""
operationId: getComputeMetrics
responses:
200:
description: ComputeMetrics
content:
text/plain:
schema:
type: string
description: Metrics in text format.
/insights:
get:
tags:

View File

@@ -1,4 +1,5 @@
use compute_api::responses::{InstalledExtension, InstalledExtensions};
use metrics::proto::MetricFamily;
use std::collections::HashMap;
use std::collections::HashSet;
use url::Url;
@@ -7,6 +8,10 @@ use anyhow::Result;
use postgres::{Client, NoTls};
use tokio::task;
use metrics::core::Collector;
use metrics::{register_uint_gauge_vec, UIntGaugeVec};
use once_cell::sync::Lazy;
/// We don't reuse get_existing_dbs() just for code clarity
/// and to make database listing query here more explicit.
///
@@ -72,9 +77,40 @@ pub async fn get_installed_extensions(connstr: Url) -> Result<InstalledExtension
}
}
Ok(InstalledExtensions {
let res = InstalledExtensions {
extensions: extensions_map.values().cloned().collect(),
})
};
// set the prometheus metrics
for ext in res.extensions.iter() {
let versions = {
let mut vec: Vec<_> = ext.versions.iter().cloned().collect();
vec.sort();
vec.iter()
.map(|x| x.to_string())
.collect::<Vec<_>>()
.join(",")
};
INSTALLED_EXTENSIONS
.with_label_values(&[&ext.extname, &versions])
.set(ext.n_databases as u64);
}
Ok(res)
})
.await?
}
static INSTALLED_EXTENSIONS: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"installed_extensions",
"Number of databases where extension is installed, versions passed as label",
&["extension_name", "versions"]
)
.expect("failed to define a metric")
});
pub fn collect() -> Vec<MetricFamily> {
INSTALLED_EXTENSIONS.collect()
}

View File

@@ -28,3 +28,8 @@ class EndpointHttpClient(requests.Session):
res = self.get(f"http://localhost:{self.port}/installed_extensions")
res.raise_for_status()
return res.json()
def metrics(self):
res = self.get(f"http://localhost:{self.port}/metrics")
res.raise_for_status()
return res.text

View File

@@ -1,5 +1,7 @@
import time
from logging import info
from fixtures.metrics import parse_metrics
from fixtures.neon_fixtures import NeonEnv
@@ -85,3 +87,36 @@ def test_installed_extensions(neon_simple_env: NeonEnv):
assert ext["n_databases"] == 2
ext["versions"].sort()
assert ext["versions"] == ["1.2", "1.3"]
# check that /metrics endpoint is available
# ensure that we see the metric before and after restart
res = client.metrics()
info("Metrics: %s", res)
m = parse_metrics(res)
neon_m = m.query_all("installed_extensions", {"extension_name": "neon", "versions": "1.2,1.3"})
assert len(neon_m) == 1
for sample in neon_m:
assert sample.value == 2
endpoint.stop()
endpoint.start()
timeout = 5
while timeout > 0:
try:
res = client.metrics()
timeout = -1
except Exception as e:
info("failed to get metrics, assume they are not collected yet: %s", e)
time.sleep(1)
timeout -= 1
continue
info("After restart metrics: %s", res)
m = parse_metrics(res)
neon_m = m.query_all(
"installed_extensions", {"extension_name": "neon", "versions": "1.2,1.3"}
)
assert len(neon_m) == 1
for sample in neon_m:
assert sample.value == 2