mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-02 04:50:38 +00:00
Added insights route
This commit is contained in:
@@ -26,6 +26,7 @@
|
||||
//! -b /usr/local/bin/postgres
|
||||
//! ```
|
||||
//!
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::fs::File;
|
||||
use std::panic;
|
||||
use std::path::Path;
|
||||
@@ -141,6 +142,7 @@ fn main() -> Result<()> {
|
||||
pageserver_connstr,
|
||||
metrics: ComputeMetrics::new(),
|
||||
state: RwLock::new(ComputeState::new()),
|
||||
insights_count: AtomicU64::new(0),
|
||||
};
|
||||
let compute = Arc::new(compute_state);
|
||||
|
||||
|
||||
@@ -26,6 +26,7 @@ use chrono::{DateTime, Utc};
|
||||
use log::info;
|
||||
use postgres::{Client, NoTls};
|
||||
use serde::{Serialize, Serializer};
|
||||
use tokio_postgres;
|
||||
|
||||
use crate::checker::create_writablity_check_data;
|
||||
use crate::config;
|
||||
@@ -48,6 +49,19 @@ pub struct ComputeNode {
|
||||
/// to allow HTTP API server to serve status requests, while configuration
|
||||
/// is in progress.
|
||||
pub state: RwLock<ComputeState>,
|
||||
pub insights_count: AtomicU64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct InsightsRow {
|
||||
pub backend_type: String,
|
||||
pub count: String,
|
||||
}
|
||||
|
||||
#[derive (Serialize)]
|
||||
pub struct Insights {
|
||||
count: u64,
|
||||
rows: Vec<InsightsRow>,
|
||||
}
|
||||
|
||||
fn rfc3339_serialize<S>(x: &DateTime<Utc>, s: S) -> Result<S::Ok, S::Error>
|
||||
@@ -352,4 +366,32 @@ impl ComputeNode {
|
||||
self.prepare_pgdata()?;
|
||||
self.run()
|
||||
}
|
||||
|
||||
pub async fn collect_insights(&self) -> Insights {
|
||||
let prev = self.insights_count.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let mut result_rows: Vec<InsightsRow> = Vec::new();
|
||||
let connect_result = tokio_postgres::connect(self.connstr.as_str(), NoTls).await;
|
||||
let (client, connection) = connect_result.unwrap();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
eprintln!("connection error: {}", e);
|
||||
}
|
||||
});
|
||||
for message in (client.simple_query("SELECT backend_type, count(*) FROM pg_stat_activity GROUP BY backend_type").await).unwrap().iter() {
|
||||
match message {
|
||||
postgres::SimpleQueryMessage::Row(row) => {
|
||||
result_rows.push(InsightsRow {
|
||||
backend_type: row.get(0).unwrap().to_string(),
|
||||
count: row.get(1).unwrap().to_string(),
|
||||
});
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
return Insights {
|
||||
count: prev + 1,
|
||||
rows: result_rows,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,6 +46,13 @@ async fn routes(req: Request<Body>, compute: Arc<ComputeNode>) -> Response<Body>
|
||||
Response::new(Body::from(serde_json::to_string(&compute.metrics).unwrap()))
|
||||
}
|
||||
|
||||
// Collect pg stat metrics
|
||||
(&Method::GET, "/insights") => {
|
||||
info!("serving /insights GET request");
|
||||
let insights = compute.collect_insights().await;
|
||||
Response::new(Body::from(serde_json::to_string(&insights).unwrap()))
|
||||
}
|
||||
|
||||
// DEPRECATED, use POST instead
|
||||
(&Method::GET, "/check_writability") => {
|
||||
info!("serving /check_writability GET request");
|
||||
|
||||
Reference in New Issue
Block a user