fix connection error appeared on zenith start

by binding sockets before daemonization

also use less annoying error reporting by not printing full error
messages for connect errors in first several connection retries

closes #507
This commit is contained in:
Dmitry Rodionov
2021-09-01 12:51:12 +03:00
committed by Dmitry
parent b4ecae33e4
commit 4b73ada26e
3 changed files with 65 additions and 40 deletions

View File

@@ -1,16 +1,17 @@
use std::io::Write;
use std::net::TcpStream;
use std::path::PathBuf;
use std::process::Command;
use std::thread;
use std::time::Duration;
use std::{io, thread};
use anyhow::{anyhow, bail, ensure, Result};
use anyhow::{anyhow, bail, Result};
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use pageserver::http::models::{BranchCreateRequest, TenantCreateRequest};
use postgres::{Config, NoTls};
use reqwest::blocking::{Client, RequestBuilder};
use reqwest::{IntoUrl, Method, StatusCode};
use reqwest::{IntoUrl, Method};
use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::ZTenantId;
@@ -100,11 +101,12 @@ impl PageServerNode {
}
pub fn start(&self) -> Result<()> {
println!(
"Starting pageserver at '{}' in {}",
print!(
"Starting pageserver at '{}' in '{}'",
connection_address(&self.pg_connection_config),
self.repo_path().display()
);
io::stdout().flush().unwrap();
let mut cmd = Command::new(self.env.pageserver_bin()?);
cmd.args(&["-D", self.repo_path().to_str().unwrap()])
@@ -121,22 +123,31 @@ impl PageServerNode {
// It takes a while for the page server to start up. Wait until it is
// open for business.
for retries in 1..15 {
const RETRIES: i8 = 15;
for retries in 1..RETRIES {
match self.check_status() {
Ok(_) => {
println!("Pageserver started");
println!("\nPageserver started");
return Ok(());
}
Err(err) => {
println!(
"Pageserver not responding yet, err {} retrying ({})...",
err, retries
);
if err.is_connect() && retries < 5 {
print!(".");
io::stdout().flush().unwrap();
} else {
if retries == 5 {
print!("\n") // put a line break after dots for second message
}
println!(
"Pageserver not responding yet, err {} retrying ({})...",
err, retries
);
}
thread::sleep(Duration::from_secs(1));
}
}
}
bail!("pageserver failed to start");
bail!("pageserver failed to start in {} seconds", RETRIES);
}
pub fn stop(&self) -> Result<()> {
@@ -180,19 +191,14 @@ impl PageServerNode {
builder
}
pub fn check_status(&self) -> Result<()> {
let status = self
.http_request(Method::GET, format!("{}/{}", self.http_base_url, "status"))
pub fn check_status(&self) -> reqwest::Result<()> {
self.http_request(Method::GET, format!("{}/{}", self.http_base_url, "status"))
.send()?
.status();
ensure!(
status == StatusCode::OK,
format!("got unexpected response status {}", status)
);
.error_for_status()?;
Ok(())
}
pub fn tenant_list(&self) -> Result<Vec<String>> {
pub fn tenant_list(&self) -> reqwest::Result<Vec<String>> {
Ok(self
.http_request(Method::GET, format!("{}/{}", self.http_base_url, "tenant"))
.send()?
@@ -200,7 +206,7 @@ impl PageServerNode {
.json()?)
}
pub fn tenant_create(&self, tenantid: ZTenantId) -> Result<()> {
pub fn tenant_create(&self, tenantid: ZTenantId) -> reqwest::Result<()> {
Ok(self
.http_request(Method::POST, format!("{}/{}", self.http_base_url, "tenant"))
.json(&TenantCreateRequest {
@@ -211,7 +217,7 @@ impl PageServerNode {
.json()?)
}
pub fn branch_list(&self, tenantid: &ZTenantId) -> Result<Vec<BranchInfo>> {
pub fn branch_list(&self, tenantid: &ZTenantId) -> reqwest::Result<Vec<BranchInfo>> {
Ok(self
.http_request(
Method::GET,
@@ -227,7 +233,7 @@ impl PageServerNode {
branch_name: &str,
startpoint: &str,
tenantid: &ZTenantId,
) -> Result<BranchInfo> {
) -> reqwest::Result<BranchInfo> {
Ok(self
.http_request(Method::POST, format!("{}/branch", self.http_base_url))
.json(&BranchCreateRequest {

View File

@@ -281,6 +281,22 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
// TODO: Check that it looks like a valid repository before going further
// bind sockets before daemonizing so we report errors early and do not return until we are listening
info!(
"Starting pageserver http handler on {}",
conf.http_endpoint_addr
);
let http_listener = TcpListener::bind(conf.http_endpoint_addr.clone())?;
info!(
"Starting pageserver pg protocol handler on {}",
conf.listen_addr
);
let pageserver_listener = TcpListener::bind(conf.listen_addr.clone())?;
// Initialize tenant manager.
tenant_mgr::init(conf);
if conf.daemonize {
info!("daemonizing...");
@@ -301,6 +317,9 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
}
}
// keep join handles for spawned threads
let mut join_handles = vec![];
// initialize authentication for incoming connections
let auth = match &conf.auth_type {
AuthType::Trust | AuthType::MD5 => None,
@@ -313,21 +332,16 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
info!("Using auth: {:#?}", conf.auth_type);
// Spawn a new thread for the http endpoint
// bind before launching separate thread so the error reported before startup exits
let cloned = auth.clone();
thread::Builder::new()
let http_endpoint_thread = thread::Builder::new()
.name("http_endpoint_thread".into())
.spawn(move || {
let router = http::make_router(conf, cloned);
endpoint::serve_thread_main(router, conf.http_endpoint_addr.clone())
endpoint::serve_thread_main(router, http_listener)
})?;
// Check that we can bind to address before starting threads to simplify shutdown
// sequence if port is occupied.
info!("Starting pageserver on {}", conf.listen_addr);
let pageserver_listener = TcpListener::bind(conf.listen_addr.clone())?;
// Initialize tenant manager.
tenant_mgr::init(conf);
join_handles.push(http_endpoint_thread);
// Spawn a thread to listen for connections. It will spawn further threads
// for each connection.
@@ -337,9 +351,13 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type)
})?;
page_service_thread
.join()
.expect("Page service thread has panicked")?;
join_handles.push(page_service_thread);
for handle in join_handles.into_iter() {
handle
.join()
.expect("thread panicked")
.expect("thread exited with an error")
}
Ok(())
}

View File

@@ -1,3 +1,5 @@
use std::net::TcpListener;
use crate::auth::{self, Claims, JwtAuth};
use crate::http::error;
use crate::zid::ZTenantId;
@@ -144,10 +146,9 @@ pub fn check_permission(req: &Request<Body>, tenantid: Option<ZTenantId>) -> Res
pub fn serve_thread_main(
router_builder: RouterBuilder<hyper::Body, ApiError>,
addr: String,
listener: TcpListener,
) -> anyhow::Result<()> {
let addr = addr.parse()?;
log::info!("Starting an http endpoint at {}", addr);
log::info!("Starting a http endoint at {}", listener.local_addr()?);
// Create a Service from the router above to handle incoming requests.
let service = RouterService::new(router_builder.build().map_err(|err| anyhow!(err))?).unwrap();
@@ -159,7 +160,7 @@ pub fn serve_thread_main(
let _guard = runtime.enter();
let server = Server::bind(&addr).serve(service);
let server = Server::from_tcp(listener)?.serve(service);
runtime.block_on(server)?;