From 4fae115dc22d28b769d930b040e22db6f2c1f93d Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Mon, 6 Sep 2021 14:40:32 +0300 Subject: [PATCH] propagate pageserver http error messages to zenith cli --- Cargo.lock | 1 + control_plane/Cargo.toml | 1 + control_plane/src/storage.rs | 96 ++++++++++++++++++++++++---------- zenith_utils/src/http/error.rs | 4 +- 4 files changed, 73 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9e5ef536c7..d0d5d27e0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -325,6 +325,7 @@ dependencies = [ "serde", "serde_json", "tar", + "thiserror", "toml", "url", "walkeeper", diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index 769f776867..de1ead91d1 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -16,6 +16,7 @@ toml = "0.5" lazy_static = "1.4" regex = "1" anyhow = "1.0" +thiserror = "1" bytes = "1.0.1" nix = "0.20" url = "2.2.2" diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 13f18b9d0f..34e7678291 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -3,15 +3,17 @@ use std::net::TcpStream; use std::path::PathBuf; use std::process::Command; use std::time::Duration; -use std::{io, thread}; +use std::{io, result, thread}; -use anyhow::{anyhow, bail, Result}; +use anyhow::{anyhow, bail}; use nix::sys::signal::{kill, Signal}; use nix::unistd::Pid; use pageserver::http::models::{BranchCreateRequest, TenantCreateRequest}; use postgres::{Config, NoTls}; -use reqwest::blocking::{Client, RequestBuilder}; +use reqwest::blocking::{Client, RequestBuilder, Response}; use reqwest::{IntoUrl, Method}; +use thiserror::Error; +use zenith_utils::http::error::HttpErrorBody; use zenith_utils::postgres_backend::AuthType; use zenith_utils::zid::ZTenantId; @@ -22,6 +24,39 @@ use zenith_utils::connstring::connection_address; const HTTP_BASE_URL: &str = "http://127.0.0.1:9898/v1"; +#[derive(Error, Debug)] +pub enum PageserverHttpError { + #[error("Reqwest error")] + Transport(#[from] reqwest::Error), + + #[error("Error: {0}")] + Response(String), +} + +type Result = result::Result; + +pub trait ResponseErrorMessageExt: Sized { + fn error_from_body(self) -> Result; +} + +impl ResponseErrorMessageExt for Response { + fn error_from_body(self) -> Result { + let status = self.status(); + if !(status.is_client_error() || status.is_server_error()) { + return Ok(self); + } + + // reqwest do not export it's error construction utility functions, so lets craft the message ourselves + let url = self.url().to_owned(); + Err(PageserverHttpError::Response( + match self.json::() { + Ok(err_body) => format!("Error: {}", err_body.msg), + Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url), + }, + )) + } +} + // // Control routines for pageserver. // @@ -59,7 +94,7 @@ impl PageServerNode { .unwrap() } - pub fn init(&self, create_tenant: Option<&str>, enable_auth: bool) -> Result<()> { + pub fn init(&self, create_tenant: Option<&str>, enable_auth: bool) -> anyhow::Result<()> { let mut cmd = Command::new(self.env.pageserver_bin()?); let mut args = vec![ "--init", @@ -100,7 +135,7 @@ impl PageServerNode { self.repo_path().join("pageserver.pid") } - pub fn start(&self) -> Result<()> { + pub fn start(&self) -> anyhow::Result<()> { print!( "Starting pageserver at '{}' in '{}'", connection_address(&self.pg_connection_config), @@ -131,17 +166,24 @@ impl PageServerNode { return Ok(()); } Err(err) => { - if err.is_connect() && retries < 5 { - print!("."); - io::stdout().flush().unwrap(); - } else { - if retries == 5 { - print!("\n") // put a line break after dots for second message + match err { + PageserverHttpError::Transport(err) => { + if err.is_connect() && retries < 5 { + print!("."); + io::stdout().flush().unwrap(); + } else { + if retries == 5 { + print!("\n") // put a line break after dots for second message + } + println!( + "Pageserver not responding yet, err {} retrying ({})...", + err, retries + ); + } + } + PageserverHttpError::Response(msg) => { + bail!("pageserver failed to start: {} ", msg) } - println!( - "Pageserver not responding yet, err {} retrying ({})...", - err, retries - ); } thread::sleep(Duration::from_secs(1)); } @@ -150,7 +192,7 @@ impl PageServerNode { bail!("pageserver failed to start in {} seconds", RETRIES); } - pub fn stop(&self) -> Result<()> { + pub fn stop(&self) -> anyhow::Result<()> { let pid = read_pidfile(&self.pid_file())?; let pid = Pid::from_raw(pid); if kill(pid, Signal::SIGTERM).is_err() { @@ -179,7 +221,7 @@ impl PageServerNode { client.simple_query(sql).unwrap() } - pub fn page_server_psql_client(&self) -> Result { + pub fn page_server_psql_client(&self) -> result::Result { self.pg_connection_config.connect(NoTls) } @@ -191,40 +233,40 @@ impl PageServerNode { builder } - pub fn check_status(&self) -> reqwest::Result<()> { + pub fn check_status(&self) -> Result<()> { self.http_request(Method::GET, format!("{}/{}", self.http_base_url, "status")) .send()? - .error_for_status()?; + .error_from_body()?; Ok(()) } - pub fn tenant_list(&self) -> reqwest::Result> { + pub fn tenant_list(&self) -> Result> { Ok(self .http_request(Method::GET, format!("{}/{}", self.http_base_url, "tenant")) .send()? - .error_for_status()? + .error_from_body()? .json()?) } - pub fn tenant_create(&self, tenantid: ZTenantId) -> reqwest::Result<()> { + pub fn tenant_create(&self, tenantid: ZTenantId) -> Result<()> { Ok(self .http_request(Method::POST, format!("{}/{}", self.http_base_url, "tenant")) .json(&TenantCreateRequest { tenant_id: tenantid, }) .send()? - .error_for_status()? + .error_from_body()? .json()?) } - pub fn branch_list(&self, tenantid: &ZTenantId) -> reqwest::Result> { + pub fn branch_list(&self, tenantid: &ZTenantId) -> Result> { Ok(self .http_request( Method::GET, format!("{}/branch/{}", self.http_base_url, tenantid), ) .send()? - .error_for_status()? + .error_from_body()? .json()?) } @@ -233,7 +275,7 @@ impl PageServerNode { branch_name: &str, startpoint: &str, tenantid: &ZTenantId, - ) -> reqwest::Result { + ) -> Result { Ok(self .http_request(Method::POST, format!("{}/branch", self.http_base_url)) .json(&BranchCreateRequest { @@ -242,7 +284,7 @@ impl PageServerNode { start_point: startpoint.to_owned(), }) .send()? - .error_for_status()? + .error_from_body()? .json()?) } diff --git a/zenith_utils/src/http/error.rs b/zenith_utils/src/http/error.rs index f8e3e274ed..a5348cb897 100644 --- a/zenith_utils/src/http/error.rs +++ b/zenith_utils/src/http/error.rs @@ -1,6 +1,6 @@ use anyhow::anyhow; use hyper::{header, Body, Response, StatusCode}; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use serde_json; use thiserror::Error; @@ -45,7 +45,7 @@ impl ApiError { } } -#[derive(Serialize)] +#[derive(Serialize, Deserialize)] pub struct HttpErrorBody { pub msg: String, }