Bind to socket earlier during pageserver init.

That allows printing reasonable error message instead of panicking if
address is already in use.
This commit is contained in:
Stas Kelvich
2021-05-20 12:36:59 +03:00
parent 1f6ca23db6
commit d45839879c
2 changed files with 23 additions and 28 deletions

View File

@@ -4,12 +4,15 @@
use log::*;
use parse_duration::parse;
use std::fs::{File, OpenOptions};
use std::io;
use std::process::exit;
use std::thread;
use std::time::Duration;
use std::{env, path::PathBuf};
use std::{
fs::{File, OpenOptions},
net::TcpListener,
};
use anyhow::{Context, Result};
use clap::{App, Arg};
@@ -168,21 +171,21 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
// Note: this `info!(...)` macro comes from `log` crate
info!("standard logging redirected to slog");
let tui_thread: Option<thread::JoinHandle<()>>;
if conf.interactive {
let tui_thread = if conf.interactive {
// Initialize the UI
tui_thread = Some(
Some(
thread::Builder::new()
.name("UI thread".into())
.spawn(|| {
let _ = tui::ui_main();
})
.unwrap(),
);
//threads.push(tui_thread);
)
} else {
tui_thread = None;
}
None
};
// TODO: Check that it looks like a valid repository before going further
if conf.daemonize {
info!("daemonizing...");
@@ -204,32 +207,28 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
}
}
let mut threads = Vec::new();
// TODO: Check that it looks like a valid repository before going further
// Check that we can bind to address before further initialization
info!("Starting pageserver on {}", conf.listen_addr);
let pageserver_listener = TcpListener::bind(conf.listen_addr)?;
// Initialize page cache, this will spawn walredo_thread
page_cache::init(conf);
// Spawn a thread to listen for connections. It will spawn further threads
// for each connection.
let page_server_thread = thread::Builder::new()
let page_service_thread = thread::Builder::new()
.name("Page Service thread".into())
.spawn(move || {
// thread code
page_service::thread_main(conf);
})
.unwrap();
threads.push(page_server_thread);
.spawn(move || page_service::thread_main(conf, pageserver_listener))?;
if let Some(tui_thread) = tui_thread {
// The TUI thread exits when the user asks to Quit.
tui_thread.join().unwrap();
} else {
// In non-interactive mode, wait forever.
for t in threads {
t.join().unwrap()
}
page_service_thread
.join()
.expect("Page service thread has panicked")?
}
Ok(())
}

View File

@@ -424,13 +424,9 @@ impl PagestreamBeMessage {
///
/// Listens for connections, and launches a new handler thread for each.
///
pub fn thread_main(conf: &'static PageServerConf) {
info!("Starting page server on {}", conf.listen_addr);
let listener = TcpListener::bind(conf.listen_addr).unwrap();
pub fn thread_main(conf: &'static PageServerConf, listener: TcpListener) -> anyhow::Result<()> {
loop {
let (socket, peer_addr) = listener.accept().unwrap();
let (socket, peer_addr) = listener.accept()?;
debug!("accepted connection from {}", peer_addr);
socket.set_nodelay(true).unwrap();
let mut conn_handler = Connection::new(conf, socket);