diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 00b7340de5..13f18b9d0f 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -1,16 +1,17 @@ +use std::io::Write; use std::net::TcpStream; use std::path::PathBuf; use std::process::Command; -use std::thread; use std::time::Duration; +use std::{io, thread}; -use anyhow::{anyhow, bail, ensure, Result}; +use anyhow::{anyhow, bail, Result}; use nix::sys::signal::{kill, Signal}; use nix::unistd::Pid; use pageserver::http::models::{BranchCreateRequest, TenantCreateRequest}; use postgres::{Config, NoTls}; use reqwest::blocking::{Client, RequestBuilder}; -use reqwest::{IntoUrl, Method, StatusCode}; +use reqwest::{IntoUrl, Method}; use zenith_utils::postgres_backend::AuthType; use zenith_utils::zid::ZTenantId; @@ -100,11 +101,12 @@ impl PageServerNode { } pub fn start(&self) -> Result<()> { - println!( - "Starting pageserver at '{}' in {}", + print!( + "Starting pageserver at '{}' in '{}'", connection_address(&self.pg_connection_config), self.repo_path().display() ); + io::stdout().flush().unwrap(); let mut cmd = Command::new(self.env.pageserver_bin()?); cmd.args(&["-D", self.repo_path().to_str().unwrap()]) @@ -121,22 +123,31 @@ impl PageServerNode { // It takes a while for the page server to start up. Wait until it is // open for business. - for retries in 1..15 { + const RETRIES: i8 = 15; + for retries in 1..RETRIES { match self.check_status() { Ok(_) => { - println!("Pageserver started"); + println!("\nPageserver started"); return Ok(()); } Err(err) => { - println!( - "Pageserver not responding yet, err {} retrying ({})...", - err, retries - ); + if err.is_connect() && retries < 5 { + print!("."); + io::stdout().flush().unwrap(); + } else { + if retries == 5 { + print!("\n") // put a line break after dots for second message + } + println!( + "Pageserver not responding yet, err {} retrying ({})...", + err, retries + ); + } thread::sleep(Duration::from_secs(1)); } } } - bail!("pageserver failed to start"); + bail!("pageserver failed to start in {} seconds", RETRIES); } pub fn stop(&self) -> Result<()> { @@ -180,19 +191,14 @@ impl PageServerNode { builder } - pub fn check_status(&self) -> Result<()> { - let status = self - .http_request(Method::GET, format!("{}/{}", self.http_base_url, "status")) + pub fn check_status(&self) -> reqwest::Result<()> { + self.http_request(Method::GET, format!("{}/{}", self.http_base_url, "status")) .send()? - .status(); - ensure!( - status == StatusCode::OK, - format!("got unexpected response status {}", status) - ); + .error_for_status()?; Ok(()) } - pub fn tenant_list(&self) -> Result> { + pub fn tenant_list(&self) -> reqwest::Result> { Ok(self .http_request(Method::GET, format!("{}/{}", self.http_base_url, "tenant")) .send()? @@ -200,7 +206,7 @@ impl PageServerNode { .json()?) } - pub fn tenant_create(&self, tenantid: ZTenantId) -> Result<()> { + pub fn tenant_create(&self, tenantid: ZTenantId) -> reqwest::Result<()> { Ok(self .http_request(Method::POST, format!("{}/{}", self.http_base_url, "tenant")) .json(&TenantCreateRequest { @@ -211,7 +217,7 @@ impl PageServerNode { .json()?) } - pub fn branch_list(&self, tenantid: &ZTenantId) -> Result> { + pub fn branch_list(&self, tenantid: &ZTenantId) -> reqwest::Result> { Ok(self .http_request( Method::GET, @@ -227,7 +233,7 @@ impl PageServerNode { branch_name: &str, startpoint: &str, tenantid: &ZTenantId, - ) -> Result { + ) -> reqwest::Result { Ok(self .http_request(Method::POST, format!("{}/branch", self.http_base_url)) .json(&BranchCreateRequest { diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index fe4da44019..bf6cc90ece 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -281,6 +281,22 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { // TODO: Check that it looks like a valid repository before going further + // bind sockets before daemonizing so we report errors early and do not return until we are listening + info!( + "Starting pageserver http handler on {}", + conf.http_endpoint_addr + ); + let http_listener = TcpListener::bind(conf.http_endpoint_addr.clone())?; + + info!( + "Starting pageserver pg protocol handler on {}", + conf.listen_addr + ); + let pageserver_listener = TcpListener::bind(conf.listen_addr.clone())?; + + // Initialize tenant manager. + tenant_mgr::init(conf); + if conf.daemonize { info!("daemonizing..."); @@ -301,6 +317,9 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { } } + // keep join handles for spawned threads + let mut join_handles = vec![]; + // initialize authentication for incoming connections let auth = match &conf.auth_type { AuthType::Trust | AuthType::MD5 => None, @@ -313,21 +332,16 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { info!("Using auth: {:#?}", conf.auth_type); // Spawn a new thread for the http endpoint + // bind before launching separate thread so the error reported before startup exits let cloned = auth.clone(); - thread::Builder::new() + let http_endpoint_thread = thread::Builder::new() .name("http_endpoint_thread".into()) .spawn(move || { let router = http::make_router(conf, cloned); - endpoint::serve_thread_main(router, conf.http_endpoint_addr.clone()) + endpoint::serve_thread_main(router, http_listener) })?; - // Check that we can bind to address before starting threads to simplify shutdown - // sequence if port is occupied. - info!("Starting pageserver on {}", conf.listen_addr); - let pageserver_listener = TcpListener::bind(conf.listen_addr.clone())?; - - // Initialize tenant manager. - tenant_mgr::init(conf); + join_handles.push(http_endpoint_thread); // Spawn a thread to listen for connections. It will spawn further threads // for each connection. @@ -337,9 +351,13 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type) })?; - page_service_thread - .join() - .expect("Page service thread has panicked")?; + join_handles.push(page_service_thread); + for handle in join_handles.into_iter() { + handle + .join() + .expect("thread panicked") + .expect("thread exited with an error") + } Ok(()) } diff --git a/zenith_utils/src/http/endpoint.rs b/zenith_utils/src/http/endpoint.rs index 06f6afa4b9..f17c6c7448 100644 --- a/zenith_utils/src/http/endpoint.rs +++ b/zenith_utils/src/http/endpoint.rs @@ -1,3 +1,5 @@ +use std::net::TcpListener; + use crate::auth::{self, Claims, JwtAuth}; use crate::http::error; use crate::zid::ZTenantId; @@ -144,10 +146,9 @@ pub fn check_permission(req: &Request, tenantid: Option) -> Res pub fn serve_thread_main( router_builder: RouterBuilder, - addr: String, + listener: TcpListener, ) -> anyhow::Result<()> { - let addr = addr.parse()?; - log::info!("Starting an http endpoint at {}", addr); + log::info!("Starting a http endoint at {}", listener.local_addr()?); // Create a Service from the router above to handle incoming requests. let service = RouterService::new(router_builder.build().map_err(|err| anyhow!(err))?).unwrap(); @@ -159,7 +160,7 @@ pub fn serve_thread_main( let _guard = runtime.enter(); - let server = Server::bind(&addr).serve(service); + let server = Server::from_tcp(listener)?.serve(service); runtime.block_on(server)?;