From 4b73ada26ed168a810b5c590bcb72e630182be68 Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Wed, 1 Sep 2021 12:51:12 +0300 Subject: [PATCH] fix connection error appeared on zenith start by binding sockets before daemonization also use less annoying error reporting by not printing full error messages for connect errors in first several connection retries closes #507 --- control_plane/src/storage.rs | 54 +++++++++++++++++-------------- pageserver/src/bin/pageserver.rs | 42 +++++++++++++++++------- zenith_utils/src/http/endpoint.rs | 9 +++--- 3 files changed, 65 insertions(+), 40 deletions(-) diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 00b7340de5..13f18b9d0f 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -1,16 +1,17 @@ +use std::io::Write; use std::net::TcpStream; use std::path::PathBuf; use std::process::Command; -use std::thread; use std::time::Duration; +use std::{io, thread}; -use anyhow::{anyhow, bail, ensure, Result}; +use anyhow::{anyhow, bail, Result}; use nix::sys::signal::{kill, Signal}; use nix::unistd::Pid; use pageserver::http::models::{BranchCreateRequest, TenantCreateRequest}; use postgres::{Config, NoTls}; use reqwest::blocking::{Client, RequestBuilder}; -use reqwest::{IntoUrl, Method, StatusCode}; +use reqwest::{IntoUrl, Method}; use zenith_utils::postgres_backend::AuthType; use zenith_utils::zid::ZTenantId; @@ -100,11 +101,12 @@ impl PageServerNode { } pub fn start(&self) -> Result<()> { - println!( - "Starting pageserver at '{}' in {}", + print!( + "Starting pageserver at '{}' in '{}'", connection_address(&self.pg_connection_config), self.repo_path().display() ); + io::stdout().flush().unwrap(); let mut cmd = Command::new(self.env.pageserver_bin()?); cmd.args(&["-D", self.repo_path().to_str().unwrap()]) @@ -121,22 +123,31 @@ impl PageServerNode { // It takes a while for the page server to start up. Wait until it is // open for business. - for retries in 1..15 { + const RETRIES: i8 = 15; + for retries in 1..RETRIES { match self.check_status() { Ok(_) => { - println!("Pageserver started"); + println!("\nPageserver started"); return Ok(()); } Err(err) => { - println!( - "Pageserver not responding yet, err {} retrying ({})...", - err, retries - ); + if err.is_connect() && retries < 5 { + print!("."); + io::stdout().flush().unwrap(); + } else { + if retries == 5 { + print!("\n") // put a line break after dots for second message + } + println!( + "Pageserver not responding yet, err {} retrying ({})...", + err, retries + ); + } thread::sleep(Duration::from_secs(1)); } } } - bail!("pageserver failed to start"); + bail!("pageserver failed to start in {} seconds", RETRIES); } pub fn stop(&self) -> Result<()> { @@ -180,19 +191,14 @@ impl PageServerNode { builder } - pub fn check_status(&self) -> Result<()> { - let status = self - .http_request(Method::GET, format!("{}/{}", self.http_base_url, "status")) + pub fn check_status(&self) -> reqwest::Result<()> { + self.http_request(Method::GET, format!("{}/{}", self.http_base_url, "status")) .send()? - .status(); - ensure!( - status == StatusCode::OK, - format!("got unexpected response status {}", status) - ); + .error_for_status()?; Ok(()) } - pub fn tenant_list(&self) -> Result> { + pub fn tenant_list(&self) -> reqwest::Result> { Ok(self .http_request(Method::GET, format!("{}/{}", self.http_base_url, "tenant")) .send()? @@ -200,7 +206,7 @@ impl PageServerNode { .json()?) } - pub fn tenant_create(&self, tenantid: ZTenantId) -> Result<()> { + pub fn tenant_create(&self, tenantid: ZTenantId) -> reqwest::Result<()> { Ok(self .http_request(Method::POST, format!("{}/{}", self.http_base_url, "tenant")) .json(&TenantCreateRequest { @@ -211,7 +217,7 @@ impl PageServerNode { .json()?) } - pub fn branch_list(&self, tenantid: &ZTenantId) -> Result> { + pub fn branch_list(&self, tenantid: &ZTenantId) -> reqwest::Result> { Ok(self .http_request( Method::GET, @@ -227,7 +233,7 @@ impl PageServerNode { branch_name: &str, startpoint: &str, tenantid: &ZTenantId, - ) -> Result { + ) -> reqwest::Result { Ok(self .http_request(Method::POST, format!("{}/branch", self.http_base_url)) .json(&BranchCreateRequest { diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index fe4da44019..bf6cc90ece 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -281,6 +281,22 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { // TODO: Check that it looks like a valid repository before going further + // bind sockets before daemonizing so we report errors early and do not return until we are listening + info!( + "Starting pageserver http handler on {}", + conf.http_endpoint_addr + ); + let http_listener = TcpListener::bind(conf.http_endpoint_addr.clone())?; + + info!( + "Starting pageserver pg protocol handler on {}", + conf.listen_addr + ); + let pageserver_listener = TcpListener::bind(conf.listen_addr.clone())?; + + // Initialize tenant manager. + tenant_mgr::init(conf); + if conf.daemonize { info!("daemonizing..."); @@ -301,6 +317,9 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { } } + // keep join handles for spawned threads + let mut join_handles = vec![]; + // initialize authentication for incoming connections let auth = match &conf.auth_type { AuthType::Trust | AuthType::MD5 => None, @@ -313,21 +332,16 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { info!("Using auth: {:#?}", conf.auth_type); // Spawn a new thread for the http endpoint + // bind before launching separate thread so the error reported before startup exits let cloned = auth.clone(); - thread::Builder::new() + let http_endpoint_thread = thread::Builder::new() .name("http_endpoint_thread".into()) .spawn(move || { let router = http::make_router(conf, cloned); - endpoint::serve_thread_main(router, conf.http_endpoint_addr.clone()) + endpoint::serve_thread_main(router, http_listener) })?; - // Check that we can bind to address before starting threads to simplify shutdown - // sequence if port is occupied. - info!("Starting pageserver on {}", conf.listen_addr); - let pageserver_listener = TcpListener::bind(conf.listen_addr.clone())?; - - // Initialize tenant manager. - tenant_mgr::init(conf); + join_handles.push(http_endpoint_thread); // Spawn a thread to listen for connections. It will spawn further threads // for each connection. @@ -337,9 +351,13 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type) })?; - page_service_thread - .join() - .expect("Page service thread has panicked")?; + join_handles.push(page_service_thread); + for handle in join_handles.into_iter() { + handle + .join() + .expect("thread panicked") + .expect("thread exited with an error") + } Ok(()) } diff --git a/zenith_utils/src/http/endpoint.rs b/zenith_utils/src/http/endpoint.rs index 06f6afa4b9..f17c6c7448 100644 --- a/zenith_utils/src/http/endpoint.rs +++ b/zenith_utils/src/http/endpoint.rs @@ -1,3 +1,5 @@ +use std::net::TcpListener; + use crate::auth::{self, Claims, JwtAuth}; use crate::http::error; use crate::zid::ZTenantId; @@ -144,10 +146,9 @@ pub fn check_permission(req: &Request, tenantid: Option) -> Res pub fn serve_thread_main( router_builder: RouterBuilder, - addr: String, + listener: TcpListener, ) -> anyhow::Result<()> { - let addr = addr.parse()?; - log::info!("Starting an http endpoint at {}", addr); + log::info!("Starting a http endoint at {}", listener.local_addr()?); // Create a Service from the router above to handle incoming requests. let service = RouterService::new(router_builder.build().map_err(|err| anyhow!(err))?).unwrap(); @@ -159,7 +160,7 @@ pub fn serve_thread_main( let _guard = runtime.enter(); - let server = Server::bind(&addr).serve(service); + let server = Server::from_tcp(listener)?.serve(service); runtime.block_on(server)?;