diff --git a/Cargo.lock b/Cargo.lock index 7df1c4ab7a..0584b9d6d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -346,6 +346,7 @@ dependencies = [ "serde_json", "tar", "tokio", + "tokio-postgres 0.7.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)", "workspace_hack", ] diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 56047093f1..fc52ce4e83 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -17,4 +17,5 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1" tar = "0.4" tokio = { version = "1.17", features = ["macros", "rt", "rt-multi-thread"] } +tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" } workspace_hack = { version = "0.1", path = "../workspace_hack" } diff --git a/compute_tools/src/bin/zenith_ctl.rs b/compute_tools/src/bin/zenith_ctl.rs index 49ba653fa1..372afbc633 100644 --- a/compute_tools/src/bin/zenith_ctl.rs +++ b/compute_tools/src/bin/zenith_ctl.rs @@ -38,6 +38,7 @@ use clap::Arg; use log::info; use postgres::{Client, NoTls}; +use compute_tools::checker::create_writablity_check_data; use compute_tools::config; use compute_tools::http_api::launch_http_server; use compute_tools::logger::*; @@ -128,6 +129,7 @@ fn run_compute(state: &Arc>) -> Result { handle_roles(&read_state.spec, &mut client)?; handle_databases(&read_state.spec, &mut client)?; + create_writablity_check_data(&mut client)?; // 'Close' connection drop(client); diff --git a/compute_tools/src/checker.rs b/compute_tools/src/checker.rs new file mode 100644 index 0000000000..63da6ea23e --- /dev/null +++ b/compute_tools/src/checker.rs @@ -0,0 +1,46 @@ +use std::sync::{Arc, RwLock}; + +use anyhow::{anyhow, Result}; +use log::error; +use postgres::Client; +use tokio_postgres::NoTls; + +use crate::zenith::ComputeState; + +pub fn create_writablity_check_data(client: &mut Client) -> Result<()> { + let query = " + CREATE TABLE IF NOT EXISTS health_check ( + id serial primary key, + updated_at timestamptz default now() + ); + INSERT INTO health_check VALUES (1, now()) + ON CONFLICT (id) DO UPDATE + SET updated_at = now();"; + let result = client.simple_query(query)?; + if result.len() < 2 { + return Err(anyhow::format_err!("executed {} queries", result.len())); + } + Ok(()) +} + +pub async fn check_writability(state: &Arc>) -> Result<()> { + let connstr = state.read().unwrap().connstr.clone(); + let (client, connection) = tokio_postgres::connect(&connstr, NoTls).await?; + if client.is_closed() { + return Err(anyhow!("connection to postgres closed")); + } + tokio::spawn(async move { + if let Err(e) = connection.await { + error!("connection error: {}", e); + } + }); + + let result = client + .simple_query("UPDATE health_check SET updated_at = now() WHERE id = 1;") + .await?; + + if result.len() != 1 { + return Err(anyhow!("statement can't be executed")); + } + Ok(()) +} diff --git a/compute_tools/src/http_api.rs b/compute_tools/src/http_api.rs index 02fab08a6e..7e1a876044 100644 --- a/compute_tools/src/http_api.rs +++ b/compute_tools/src/http_api.rs @@ -11,7 +11,7 @@ use log::{error, info}; use crate::zenith::*; // Service function to handle all available routes. -fn routes(req: Request, state: Arc>) -> Response { +async fn routes(req: Request, state: Arc>) -> Response { match (req.method(), req.uri().path()) { // Timestamp of the last Postgres activity in the plain text. (&Method::GET, "/last_activity") => { @@ -29,6 +29,15 @@ fn routes(req: Request, state: Arc>) -> Response { + info!("serving /check_writability GET request"); + let res = crate::checker::check_writability(&state).await; + match res { + Ok(_) => Response::new(Body::from("true")), + Err(e) => Response::new(Body::from(e.to_string())), + } + } + // Return the `404 Not Found` for any other routes. _ => { let mut not_found = Response::new(Body::from("404 Not Found")); @@ -48,7 +57,7 @@ async fn serve(state: Arc>) { async move { Ok::<_, Infallible>(service_fn(move |req: Request| { let state = state.clone(); - async move { Ok::<_, Infallible>(routes(req, state)) } + async move { Ok::<_, Infallible>(routes(req, state).await) } })) } }); diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs index 592011d95e..ffb9700a49 100644 --- a/compute_tools/src/lib.rs +++ b/compute_tools/src/lib.rs @@ -2,6 +2,7 @@ //! Various tools and helpers to handle cluster / compute node (Postgres) //! configuration. //! +pub mod checker; pub mod config; pub mod http_api; #[macro_use]