mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 22:12:56 +00:00
compute_tools: check writability handler (#941)
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -346,6 +346,7 @@ dependencies = [
|
||||
"serde_json",
|
||||
"tar",
|
||||
"tokio",
|
||||
"tokio-postgres 0.7.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
|
||||
@@ -17,4 +17,5 @@ serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
tar = "0.4"
|
||||
tokio = { version = "1.17", features = ["macros", "rt", "rt-multi-thread"] }
|
||||
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" }
|
||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
|
||||
@@ -38,6 +38,7 @@ use clap::Arg;
|
||||
use log::info;
|
||||
use postgres::{Client, NoTls};
|
||||
|
||||
use compute_tools::checker::create_writablity_check_data;
|
||||
use compute_tools::config;
|
||||
use compute_tools::http_api::launch_http_server;
|
||||
use compute_tools::logger::*;
|
||||
@@ -128,6 +129,7 @@ fn run_compute(state: &Arc<RwLock<ComputeState>>) -> Result<ExitStatus> {
|
||||
|
||||
handle_roles(&read_state.spec, &mut client)?;
|
||||
handle_databases(&read_state.spec, &mut client)?;
|
||||
create_writablity_check_data(&mut client)?;
|
||||
|
||||
// 'Close' connection
|
||||
drop(client);
|
||||
|
||||
46
compute_tools/src/checker.rs
Normal file
46
compute_tools/src/checker.rs
Normal file
@@ -0,0 +1,46 @@
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
use log::error;
|
||||
use postgres::Client;
|
||||
use tokio_postgres::NoTls;
|
||||
|
||||
use crate::zenith::ComputeState;
|
||||
|
||||
pub fn create_writablity_check_data(client: &mut Client) -> Result<()> {
|
||||
let query = "
|
||||
CREATE TABLE IF NOT EXISTS health_check (
|
||||
id serial primary key,
|
||||
updated_at timestamptz default now()
|
||||
);
|
||||
INSERT INTO health_check VALUES (1, now())
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET updated_at = now();";
|
||||
let result = client.simple_query(query)?;
|
||||
if result.len() < 2 {
|
||||
return Err(anyhow::format_err!("executed {} queries", result.len()));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn check_writability(state: &Arc<RwLock<ComputeState>>) -> Result<()> {
|
||||
let connstr = state.read().unwrap().connstr.clone();
|
||||
let (client, connection) = tokio_postgres::connect(&connstr, NoTls).await?;
|
||||
if client.is_closed() {
|
||||
return Err(anyhow!("connection to postgres closed"));
|
||||
}
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
error!("connection error: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
let result = client
|
||||
.simple_query("UPDATE health_check SET updated_at = now() WHERE id = 1;")
|
||||
.await?;
|
||||
|
||||
if result.len() != 1 {
|
||||
return Err(anyhow!("statement can't be executed"));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -11,7 +11,7 @@ use log::{error, info};
|
||||
use crate::zenith::*;
|
||||
|
||||
// Service function to handle all available routes.
|
||||
fn routes(req: Request<Body>, state: Arc<RwLock<ComputeState>>) -> Response<Body> {
|
||||
async fn routes(req: Request<Body>, state: Arc<RwLock<ComputeState>>) -> Response<Body> {
|
||||
match (req.method(), req.uri().path()) {
|
||||
// Timestamp of the last Postgres activity in the plain text.
|
||||
(&Method::GET, "/last_activity") => {
|
||||
@@ -29,6 +29,15 @@ fn routes(req: Request<Body>, state: Arc<RwLock<ComputeState>>) -> Response<Body
|
||||
Response::new(Body::from(format!("{}", state.ready)))
|
||||
}
|
||||
|
||||
(&Method::GET, "/check_writability") => {
|
||||
info!("serving /check_writability GET request");
|
||||
let res = crate::checker::check_writability(&state).await;
|
||||
match res {
|
||||
Ok(_) => Response::new(Body::from("true")),
|
||||
Err(e) => Response::new(Body::from(e.to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
// Return the `404 Not Found` for any other routes.
|
||||
_ => {
|
||||
let mut not_found = Response::new(Body::from("404 Not Found"));
|
||||
@@ -48,7 +57,7 @@ async fn serve(state: Arc<RwLock<ComputeState>>) {
|
||||
async move {
|
||||
Ok::<_, Infallible>(service_fn(move |req: Request<Body>| {
|
||||
let state = state.clone();
|
||||
async move { Ok::<_, Infallible>(routes(req, state)) }
|
||||
async move { Ok::<_, Infallible>(routes(req, state).await) }
|
||||
}))
|
||||
}
|
||||
});
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
//! Various tools and helpers to handle cluster / compute node (Postgres)
|
||||
//! configuration.
|
||||
//!
|
||||
pub mod checker;
|
||||
pub mod config;
|
||||
pub mod http_api;
|
||||
#[macro_use]
|
||||
|
||||
Reference in New Issue
Block a user