diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index c8af8822b7..8ceef44d61 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -25,6 +25,7 @@ use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; use postgres::{Client, NoTls}; use serde::{Serialize, Serializer}; +use tokio_postgres; use tracing::{info, instrument, warn}; use crate::checker::create_writability_check_data; @@ -284,6 +285,7 @@ impl ComputeNode { handle_role_deletions(self, &mut client)?; handle_grants(self, &mut client)?; create_writability_check_data(&mut client)?; + handle_extensions(&self.spec, &mut client)?; // 'Close' connection drop(client); @@ -400,4 +402,43 @@ impl ComputeNode { Ok(()) } + + /// Select `pg_stat_statements` data and return it as a stringified JSON + pub async fn collect_insights(&self) -> String { + let mut result_rows: Vec = Vec::new(); + let connect_result = tokio_postgres::connect(self.connstr.as_str(), NoTls).await; + let (client, connection) = connect_result.unwrap(); + tokio::spawn(async move { + if let Err(e) = connection.await { + eprintln!("connection error: {}", e); + } + }); + let result = client + .simple_query( + "SELECT + row_to_json(pg_stat_statements) +FROM + pg_stat_statements +WHERE + userid != 'cloud_admin'::regrole::oid +ORDER BY + (mean_exec_time + mean_plan_time) DESC +LIMIT 100", + ) + .await; + + if let Ok(raw_rows) = result { + for message in raw_rows.iter() { + if let postgres::SimpleQueryMessage::Row(row) = message { + if let Some(json) = row.get(0) { + result_rows.push(json.to_string()); + } + } + } + + format!("{{\"pg_stat_statements\": [{}]}}", result_rows.join(",")) + } else { + "{{\"pg_stat_statements\": []}}".to_string() + } + } } diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index 589a8e1434..2392863303 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -33,6 +33,13 @@ async fn routes(req: Request, compute: &Arc) -> Response { + info!("serving /insights GET request"); + let insights = compute.collect_insights().await; + Response::new(Body::from(insights)) + } + (&Method::POST, "/check_writability") => { info!("serving /check_writability POST request"); let res = crate::checker::check_writability(compute).await; diff --git a/compute_tools/src/http/openapi_spec.yaml b/compute_tools/src/http/openapi_spec.yaml index a857531d26..3a8e9fc1dc 100644 --- a/compute_tools/src/http/openapi_spec.yaml +++ b/compute_tools/src/http/openapi_spec.yaml @@ -10,12 +10,12 @@ paths: /status: get: tags: - - "info" + - Info summary: Get compute node internal status description: "" operationId: getComputeStatus responses: - "200": + 200: description: ComputeState content: application/json: @@ -25,27 +25,43 @@ paths: /metrics.json: get: tags: - - "info" + - Info summary: Get compute node startup metrics in JSON format description: "" operationId: getComputeMetricsJSON responses: - "200": + 200: description: ComputeMetrics content: application/json: schema: $ref: "#/components/schemas/ComputeMetrics" + /insights: + get: + tags: + - Info + summary: Get current compute insights in JSON format + description: | + Note, that this doesn't include any historical data + operationId: getComputeInsights + responses: + 200: + description: Compute insights + content: + application/json: + schema: + $ref: "#/components/schemas/ComputeInsights" + /check_writability: post: tags: - - "check" + - Check summary: Check that we can write new data on this compute description: "" operationId: checkComputeWritability responses: - "200": + 200: description: Check result content: text/plain: @@ -96,6 +112,15 @@ components: type: string description: Text of the error during compute startup, if any + ComputeInsights: + type: object + properties: + pg_stat_statements: + description: Contains raw output from pg_stat_statements in JSON format + type: array + items: + type: object + ComputeStatus: type: string enum: diff --git a/compute_tools/src/pg_helpers.rs b/compute_tools/src/pg_helpers.rs index 6ab2864721..6a1377b6aa 100644 --- a/compute_tools/src/pg_helpers.rs +++ b/compute_tools/src/pg_helpers.rs @@ -63,6 +63,8 @@ impl GenericOption { /// Represent `GenericOption` as configuration option. pub fn to_pg_setting(&self) -> String { if let Some(val) = &self.value { + // TODO: check in the console DB that we don't have these settings + // set for any non-deleted project and drop this override. let name = match self.name.as_str() { "safekeepers" => "neon.safekeepers", "wal_acceptor_reconnect" => "neon.safekeeper_reconnect_timeout", diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index bbd0ec21ed..47f1d69cff 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -515,3 +515,18 @@ pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> { Ok(()) } + +/// Create required system extensions +#[instrument(skip_all)] +pub fn handle_extensions(spec: &ComputeSpec, client: &mut Client) -> Result<()> { + if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") { + if libs.contains("pg_stat_statements") { + // Create extension only if this compute really needs it + let query = "CREATE EXTENSION IF NOT EXISTS pg_stat_statements"; + info!("creating system extensions with query: {}", query); + client.simple_query(query)?; + } + } + + Ok(()) +}