mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-06 04:52:55 +00:00
Pass PageServerConf as static ref.
It's created once early in server startup, after parsing the command-line options, and never modified afterwards. To simplify things, pass it around as static ref, instead of making copies in all the different structs. We still pass around a reference to it, rather than putting it in a global variable, to allow unit testing with different configs in the same process.
This commit is contained in:
@@ -142,10 +142,12 @@ fn main() -> Result<()> {
|
||||
conf.gc_period = parse(period)?;
|
||||
}
|
||||
|
||||
start_pageserver(&conf)
|
||||
let leaked: &'static PageServerConf = Box::leak(Box::new(conf));
|
||||
|
||||
start_pageserver(leaked)
|
||||
}
|
||||
|
||||
fn start_pageserver(conf: &PageServerConf) -> Result<()> {
|
||||
fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
|
||||
let log_filename = "pageserver.log";
|
||||
// Don't open the same file for output multiple times;
|
||||
// the different fds could overwrite each other's output.
|
||||
@@ -207,12 +209,11 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> {
|
||||
|
||||
// Spawn a thread to listen for connections. It will spawn further threads
|
||||
// for each connection.
|
||||
let conf_copy = conf.clone();
|
||||
let page_server_thread = thread::Builder::new()
|
||||
.name("Page Service thread".into())
|
||||
.spawn(move || {
|
||||
// thread code
|
||||
page_service::thread_main(&conf_copy);
|
||||
page_service::thread_main(conf);
|
||||
})
|
||||
.unwrap();
|
||||
threads.push(page_server_thread);
|
||||
|
||||
@@ -14,7 +14,7 @@ lazy_static! {
|
||||
pub static ref REPOSITORY: Mutex<Option<Arc<dyn Repository + Send + Sync>>> = Mutex::new(None);
|
||||
}
|
||||
|
||||
pub fn init(conf: &PageServerConf) {
|
||||
pub fn init(conf: &'static PageServerConf) {
|
||||
let mut m = REPOSITORY.lock().unwrap();
|
||||
|
||||
// Set up a WAL redo manager, for applying WAL records.
|
||||
|
||||
@@ -424,7 +424,7 @@ impl PagestreamBeMessage {
|
||||
///
|
||||
/// Listens for connections, and launches a new handler thread for each.
|
||||
///
|
||||
pub fn thread_main(conf: &PageServerConf) {
|
||||
pub fn thread_main(conf: &'static PageServerConf) {
|
||||
info!("Starting page server on {}", conf.listen_addr);
|
||||
|
||||
let listener = TcpListener::bind(conf.listen_addr).unwrap();
|
||||
@@ -433,7 +433,7 @@ pub fn thread_main(conf: &PageServerConf) {
|
||||
let (socket, peer_addr) = listener.accept().unwrap();
|
||||
debug!("accepted connection from {}", peer_addr);
|
||||
socket.set_nodelay(true).unwrap();
|
||||
let mut conn_handler = Connection::new(conf.clone(), socket);
|
||||
let mut conn_handler = Connection::new(conf, socket);
|
||||
|
||||
thread::spawn(move || {
|
||||
if let Err(err) = conn_handler.run() {
|
||||
@@ -448,11 +448,11 @@ struct Connection {
|
||||
stream_in: BufReader<TcpStream>,
|
||||
stream: BufWriter<TcpStream>,
|
||||
init_done: bool,
|
||||
conf: PageServerConf,
|
||||
conf: &'static PageServerConf,
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
pub fn new(conf: PageServerConf, socket: TcpStream) -> Connection {
|
||||
pub fn new(conf: &'static PageServerConf, socket: TcpStream) -> Connection {
|
||||
Connection {
|
||||
stream_in: BufReader::new(socket.try_clone().unwrap()),
|
||||
stream: BufWriter::new(socket),
|
||||
|
||||
@@ -335,10 +335,13 @@ mod tests {
|
||||
workdir: repo_dir.into(),
|
||||
pg_distrib_dir: "".into(),
|
||||
};
|
||||
// Make a static copy of the config. This can never be free'd, but that's
|
||||
// OK in a test.
|
||||
let conf: &'static PageServerConf = Box::leak(Box::new(conf));
|
||||
|
||||
let walredo_mgr = TestRedoManager {};
|
||||
|
||||
let repo = rocksdb::RocksRepository::new(&conf, Arc::new(walredo_mgr));
|
||||
let repo = rocksdb::RocksRepository::new(conf, Arc::new(walredo_mgr));
|
||||
|
||||
Ok(Box::new(repo))
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ use zenith_utils::seqwait::SeqWait;
|
||||
static TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
pub struct RocksRepository {
|
||||
conf: PageServerConf,
|
||||
conf: &'static PageServerConf,
|
||||
timelines: Mutex<HashMap<ZTimelineId, Arc<RocksTimeline>>>,
|
||||
|
||||
walredo_mgr: Arc<dyn WalRedoManager>,
|
||||
@@ -178,9 +178,12 @@ impl CacheEntryContent {
|
||||
}
|
||||
|
||||
impl RocksRepository {
|
||||
pub fn new(conf: &PageServerConf, walredo_mgr: Arc<dyn WalRedoManager>) -> RocksRepository {
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
walredo_mgr: Arc<dyn WalRedoManager>,
|
||||
) -> RocksRepository {
|
||||
RocksRepository {
|
||||
conf: conf.clone(),
|
||||
conf: conf,
|
||||
timelines: Mutex::new(HashMap::new()),
|
||||
walredo_mgr,
|
||||
}
|
||||
@@ -204,22 +207,22 @@ impl Repository for RocksRepository {
|
||||
match timelines.get(&timelineid) {
|
||||
Some(timeline) => Ok(timeline.clone()),
|
||||
None => {
|
||||
let timeline = RocksTimeline::new(&self.conf, timelineid, self.walredo_mgr.clone());
|
||||
let timeline = RocksTimeline::new(self.conf, timelineid, self.walredo_mgr.clone());
|
||||
|
||||
restore_timeline(&self.conf, &timeline, timelineid)?;
|
||||
restore_timeline(self.conf, &timeline, timelineid)?;
|
||||
|
||||
let timeline_rc = Arc::new(timeline);
|
||||
|
||||
timelines.insert(timelineid, timeline_rc.clone());
|
||||
|
||||
if self.conf.gc_horizon != 0 {
|
||||
let conf_copy = self.conf.clone();
|
||||
let timeline_rc_copy = timeline_rc.clone();
|
||||
let conf = self.conf;
|
||||
let _gc_thread = thread::Builder::new()
|
||||
.name("Garbage collection thread".into())
|
||||
.spawn(move || {
|
||||
// FIXME
|
||||
timeline_rc_copy.do_gc(&conf_copy).expect("GC thread died");
|
||||
timeline_rc_copy.do_gc(conf).expect("GC thread died");
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
@@ -262,7 +265,7 @@ impl RocksTimeline {
|
||||
}
|
||||
|
||||
fn new(
|
||||
conf: &PageServerConf,
|
||||
conf: &'static PageServerConf,
|
||||
timelineid: ZTimelineId,
|
||||
walredo_mgr: Arc<dyn WalRedoManager>,
|
||||
) -> RocksTimeline {
|
||||
@@ -372,7 +375,7 @@ impl RocksTimeline {
|
||||
Ok(0)
|
||||
}
|
||||
|
||||
fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result<Bytes> {
|
||||
fn do_gc(&self, conf: &'static PageServerConf) -> anyhow::Result<Bytes> {
|
||||
loop {
|
||||
thread::sleep(conf.gc_period);
|
||||
let last_lsn = self.get_last_valid_lsn();
|
||||
|
||||
@@ -45,7 +45,7 @@ lazy_static! {
|
||||
|
||||
// Launch a new WAL receiver, or tell one that's running about change in connection string
|
||||
pub fn launch_wal_receiver(
|
||||
conf: &PageServerConf,
|
||||
conf: &'static PageServerConf,
|
||||
timelineid: ZTimelineId,
|
||||
wal_producer_connstr: &str,
|
||||
) {
|
||||
@@ -62,11 +62,10 @@ pub fn launch_wal_receiver(
|
||||
receivers.insert(timelineid, receiver);
|
||||
|
||||
// Also launch a new thread to handle this connection
|
||||
let conf_copy = conf.clone();
|
||||
let _walreceiver_thread = thread::Builder::new()
|
||||
.name("WAL receiver thread".into())
|
||||
.spawn(move || {
|
||||
thread_main(&conf_copy, timelineid);
|
||||
thread_main(conf, timelineid);
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
@@ -87,7 +86,7 @@ fn get_wal_producer_connstr(timelineid: ZTimelineId) -> String {
|
||||
//
|
||||
// This is the entry point for the WAL receiver thread.
|
||||
//
|
||||
fn thread_main(conf: &PageServerConf, timelineid: ZTimelineId) {
|
||||
fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId) {
|
||||
info!(
|
||||
"WAL receiver thread started for timeline : '{}'",
|
||||
timelineid
|
||||
|
||||
@@ -75,7 +75,7 @@ pub struct PostgresRedoManager {
|
||||
}
|
||||
|
||||
struct PostgresRedoManagerInternal {
|
||||
_conf: PageServerConf,
|
||||
_conf: &'static PageServerConf,
|
||||
|
||||
request_rx: mpsc::Receiver<WalRedoRequest>,
|
||||
}
|
||||
@@ -106,7 +106,7 @@ impl PostgresRedoManager {
|
||||
/// Create a new PostgresRedoManager.
|
||||
///
|
||||
/// This launches a new thread to handle the requests.
|
||||
pub fn new(conf: &PageServerConf) -> PostgresRedoManager {
|
||||
pub fn new(conf: &'static PageServerConf) -> PostgresRedoManager {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
//
|
||||
@@ -115,7 +115,6 @@ impl PostgresRedoManager {
|
||||
// Get mutable references to the values that we need to pass to the
|
||||
// thread.
|
||||
let request_rx = rx;
|
||||
let conf_copy = conf.clone();
|
||||
|
||||
// Currently, the join handle is not saved anywhere and we
|
||||
// won't try restart the thread if it dies.
|
||||
@@ -123,7 +122,7 @@ impl PostgresRedoManager {
|
||||
.name("WAL redo thread".into())
|
||||
.spawn(move || {
|
||||
let mut internal = PostgresRedoManagerInternal {
|
||||
_conf: conf_copy,
|
||||
_conf: conf,
|
||||
request_rx,
|
||||
};
|
||||
internal.wal_redo_main();
|
||||
|
||||
Reference in New Issue
Block a user