diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index fc5bbc5fd2..02ee9ad9f6 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -26,6 +26,7 @@ //! -b /usr/local/bin/postgres //! ``` //! +use std::sync::atomic::AtomicU64; use std::fs::File; use std::panic; use std::path::Path; @@ -141,6 +142,7 @@ fn main() -> Result<()> { pageserver_connstr, metrics: ComputeMetrics::new(), state: RwLock::new(ComputeState::new()), + insights_count: AtomicU64::new(0), }; let compute = Arc::new(compute_state); diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index a254ade50b..21302b772e 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -26,6 +26,7 @@ use chrono::{DateTime, Utc}; use log::info; use postgres::{Client, NoTls}; use serde::{Serialize, Serializer}; +use tokio_postgres; use crate::checker::create_writablity_check_data; use crate::config; @@ -48,6 +49,19 @@ pub struct ComputeNode { /// to allow HTTP API server to serve status requests, while configuration /// is in progress. pub state: RwLock, + pub insights_count: AtomicU64, +} + +#[derive(Debug, Serialize)] +pub struct InsightsRow { + pub backend_type: String, + pub count: String, +} + +#[derive (Serialize)] +pub struct Insights { + count: u64, + rows: Vec, } fn rfc3339_serialize(x: &DateTime, s: S) -> Result @@ -352,4 +366,32 @@ impl ComputeNode { self.prepare_pgdata()?; self.run() } + + pub async fn collect_insights(&self) -> Insights { + let prev = self.insights_count.fetch_add(1, Ordering::Relaxed); + + let mut result_rows: Vec = Vec::new(); + let connect_result = tokio_postgres::connect(self.connstr.as_str(), NoTls).await; + let (client, connection) = connect_result.unwrap(); + tokio::spawn(async move { + if let Err(e) = connection.await { + eprintln!("connection error: {}", e); + } + }); + for message in (client.simple_query("SELECT backend_type, count(*) FROM pg_stat_activity GROUP BY backend_type").await).unwrap().iter() { + match message { + postgres::SimpleQueryMessage::Row(row) => { + result_rows.push(InsightsRow { + backend_type: row.get(0).unwrap().to_string(), + count: row.get(1).unwrap().to_string(), + }); + } + _ => {} + } + } + return Insights { + count: prev + 1, + rows: result_rows, + }; + } } diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index 4c8bbc608b..559de99253 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -46,6 +46,13 @@ async fn routes(req: Request, compute: Arc) -> Response Response::new(Body::from(serde_json::to_string(&compute.metrics).unwrap())) } + // Collect pg stat metrics + (&Method::GET, "/insights") => { + info!("serving /insights GET request"); + let insights = compute.collect_insights().await; + Response::new(Body::from(serde_json::to_string(&insights).unwrap())) + } + // DEPRECATED, use POST instead (&Method::GET, "/check_writability") => { info!("serving /check_writability GET request");