propagate pageserver http error messages to zenith cli

This commit is contained in:
Dmitry Rodionov
2021-09-06 14:40:32 +03:00
committed by Dmitry
parent 3d17255400
commit 4fae115dc2
4 changed files with 73 additions and 29 deletions

1
Cargo.lock generated
View File

@@ -325,6 +325,7 @@ dependencies = [
"serde",
"serde_json",
"tar",
"thiserror",
"toml",
"url",
"walkeeper",

View File

@@ -16,6 +16,7 @@ toml = "0.5"
lazy_static = "1.4"
regex = "1"
anyhow = "1.0"
thiserror = "1"
bytes = "1.0.1"
nix = "0.20"
url = "2.2.2"

View File

@@ -3,15 +3,17 @@ use std::net::TcpStream;
use std::path::PathBuf;
use std::process::Command;
use std::time::Duration;
use std::{io, thread};
use std::{io, result, thread};
use anyhow::{anyhow, bail, Result};
use anyhow::{anyhow, bail};
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use pageserver::http::models::{BranchCreateRequest, TenantCreateRequest};
use postgres::{Config, NoTls};
use reqwest::blocking::{Client, RequestBuilder};
use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::{IntoUrl, Method};
use thiserror::Error;
use zenith_utils::http::error::HttpErrorBody;
use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::ZTenantId;
@@ -22,6 +24,39 @@ use zenith_utils::connstring::connection_address;
const HTTP_BASE_URL: &str = "http://127.0.0.1:9898/v1";
#[derive(Error, Debug)]
pub enum PageserverHttpError {
#[error("Reqwest error")]
Transport(#[from] reqwest::Error),
#[error("Error: {0}")]
Response(String),
}
type Result<T> = result::Result<T, PageserverHttpError>;
pub trait ResponseErrorMessageExt: Sized {
fn error_from_body(self) -> Result<Self>;
}
impl ResponseErrorMessageExt for Response {
fn error_from_body(self) -> Result<Self> {
let status = self.status();
if !(status.is_client_error() || status.is_server_error()) {
return Ok(self);
}
// reqwest do not export it's error construction utility functions, so lets craft the message ourselves
let url = self.url().to_owned();
Err(PageserverHttpError::Response(
match self.json::<HttpErrorBody>() {
Ok(err_body) => format!("Error: {}", err_body.msg),
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
},
))
}
}
//
// Control routines for pageserver.
//
@@ -59,7 +94,7 @@ impl PageServerNode {
.unwrap()
}
pub fn init(&self, create_tenant: Option<&str>, enable_auth: bool) -> Result<()> {
pub fn init(&self, create_tenant: Option<&str>, enable_auth: bool) -> anyhow::Result<()> {
let mut cmd = Command::new(self.env.pageserver_bin()?);
let mut args = vec![
"--init",
@@ -100,7 +135,7 @@ impl PageServerNode {
self.repo_path().join("pageserver.pid")
}
pub fn start(&self) -> Result<()> {
pub fn start(&self) -> anyhow::Result<()> {
print!(
"Starting pageserver at '{}' in '{}'",
connection_address(&self.pg_connection_config),
@@ -131,17 +166,24 @@ impl PageServerNode {
return Ok(());
}
Err(err) => {
if err.is_connect() && retries < 5 {
print!(".");
io::stdout().flush().unwrap();
} else {
if retries == 5 {
print!("\n") // put a line break after dots for second message
match err {
PageserverHttpError::Transport(err) => {
if err.is_connect() && retries < 5 {
print!(".");
io::stdout().flush().unwrap();
} else {
if retries == 5 {
print!("\n") // put a line break after dots for second message
}
println!(
"Pageserver not responding yet, err {} retrying ({})...",
err, retries
);
}
}
PageserverHttpError::Response(msg) => {
bail!("pageserver failed to start: {} ", msg)
}
println!(
"Pageserver not responding yet, err {} retrying ({})...",
err, retries
);
}
thread::sleep(Duration::from_secs(1));
}
@@ -150,7 +192,7 @@ impl PageServerNode {
bail!("pageserver failed to start in {} seconds", RETRIES);
}
pub fn stop(&self) -> Result<()> {
pub fn stop(&self) -> anyhow::Result<()> {
let pid = read_pidfile(&self.pid_file())?;
let pid = Pid::from_raw(pid);
if kill(pid, Signal::SIGTERM).is_err() {
@@ -179,7 +221,7 @@ impl PageServerNode {
client.simple_query(sql).unwrap()
}
pub fn page_server_psql_client(&self) -> Result<postgres::Client, postgres::Error> {
pub fn page_server_psql_client(&self) -> result::Result<postgres::Client, postgres::Error> {
self.pg_connection_config.connect(NoTls)
}
@@ -191,40 +233,40 @@ impl PageServerNode {
builder
}
pub fn check_status(&self) -> reqwest::Result<()> {
pub fn check_status(&self) -> Result<()> {
self.http_request(Method::GET, format!("{}/{}", self.http_base_url, "status"))
.send()?
.error_for_status()?;
.error_from_body()?;
Ok(())
}
pub fn tenant_list(&self) -> reqwest::Result<Vec<String>> {
pub fn tenant_list(&self) -> Result<Vec<String>> {
Ok(self
.http_request(Method::GET, format!("{}/{}", self.http_base_url, "tenant"))
.send()?
.error_for_status()?
.error_from_body()?
.json()?)
}
pub fn tenant_create(&self, tenantid: ZTenantId) -> reqwest::Result<()> {
pub fn tenant_create(&self, tenantid: ZTenantId) -> Result<()> {
Ok(self
.http_request(Method::POST, format!("{}/{}", self.http_base_url, "tenant"))
.json(&TenantCreateRequest {
tenant_id: tenantid,
})
.send()?
.error_for_status()?
.error_from_body()?
.json()?)
}
pub fn branch_list(&self, tenantid: &ZTenantId) -> reqwest::Result<Vec<BranchInfo>> {
pub fn branch_list(&self, tenantid: &ZTenantId) -> Result<Vec<BranchInfo>> {
Ok(self
.http_request(
Method::GET,
format!("{}/branch/{}", self.http_base_url, tenantid),
)
.send()?
.error_for_status()?
.error_from_body()?
.json()?)
}
@@ -233,7 +275,7 @@ impl PageServerNode {
branch_name: &str,
startpoint: &str,
tenantid: &ZTenantId,
) -> reqwest::Result<BranchInfo> {
) -> Result<BranchInfo> {
Ok(self
.http_request(Method::POST, format!("{}/branch", self.http_base_url))
.json(&BranchCreateRequest {
@@ -242,7 +284,7 @@ impl PageServerNode {
start_point: startpoint.to_owned(),
})
.send()?
.error_for_status()?
.error_from_body()?
.json()?)
}

View File

@@ -1,6 +1,6 @@
use anyhow::anyhow;
use hyper::{header, Body, Response, StatusCode};
use serde::Serialize;
use serde::{Deserialize, Serialize};
use serde_json;
use thiserror::Error;
@@ -45,7 +45,7 @@ impl ApiError {
}
}
#[derive(Serialize)]
#[derive(Serialize, Deserialize)]
pub struct HttpErrorBody {
pub msg: String,
}