diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index af2fb8c2f7..8bbe31fdd7 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -98,10 +98,10 @@ fn main() -> Result<()> { conf.listen_addr = addr.parse().unwrap(); } - start_pageserver(conf) + start_pageserver(&conf) } -fn start_pageserver(conf: PageServerConf) -> Result<()> { +fn start_pageserver(conf: &PageServerConf) -> Result<()> { // Initialize logger let _scope_guard = init_logging(&conf)?; let _log_guard = slog_stdlog::init().unwrap(); @@ -186,13 +186,13 @@ fn start_pageserver(conf: PageServerConf) -> Result<()> { // // All other wal receivers are started on demand by "callmemaybe" command // sent to pageserver. - let conf_copy = conf.clone(); - if let Some(wal_producer) = conf.wal_producer_connstr { - let conf = conf_copy.clone(); + if let Some(wal_producer) = &conf.wal_producer_connstr { + let conf_copy = conf.clone(); + let wal_producer = wal_producer.clone(); let walreceiver_thread = thread::Builder::new() .name("static WAL receiver thread".into()) .spawn(move || { - walreceiver::thread_main(conf, &wal_producer); + walreceiver::thread_main(&conf_copy, &wal_producer); }) .unwrap(); threads.push(walreceiver_thread); @@ -200,12 +200,12 @@ fn start_pageserver(conf: PageServerConf) -> Result<()> { // GetPage@LSN requests are served by another thread. (It uses async I/O, // but the code in page_service sets up it own thread pool for that) - let conf = conf_copy.clone(); + let conf_copy = conf.clone(); let page_server_thread = thread::Builder::new() .name("Page Service thread".into()) - .spawn(|| { + .spawn(move || { // thread code - page_service::thread_main(conf); + page_service::thread_main(&conf_copy); }) .unwrap(); threads.push(page_server_thread); diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index c514292340..a453c10a73 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -113,7 +113,7 @@ lazy_static! { pub static ref PAGECACHES: Mutex>> = Mutex::new(HashMap::new()); } -pub fn get_pagecache(conf: PageServerConf, sys_id: u64) -> Arc { +pub fn get_pagecache(conf: &PageServerConf, sys_id: u64) -> Arc { let mut pcaches = PAGECACHES.lock().unwrap(); if !pcaches.contains_key(&sys_id) { @@ -124,10 +124,11 @@ pub fn get_pagecache(conf: PageServerConf, sys_id: u64) -> Arc { // Now join_handle is not saved any where and we won'try restart tharead // if it is dead. We may later stop that treads after some inactivity period // and restart them on demand. + let conf = conf.clone(); let _walredo_thread = thread::Builder::new() .name("WAL redo thread".into()) .spawn(move || { - walredo::wal_redo_main(conf, sys_id); + walredo::wal_redo_main(&conf, sys_id); }) .unwrap(); } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index fcaf8c11f6..1feb4d6f12 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -215,7 +215,7 @@ impl FeMessage { /////////////////////////////////////////////////////////////////////////////// -pub fn thread_main(conf: PageServerConf) { +pub fn thread_main(conf: &PageServerConf) { // Create a new thread pool // // FIXME: keep it single-threaded for now, make it easier to debug with gdb, @@ -458,7 +458,7 @@ impl Connection { let _walreceiver_thread = thread::Builder::new() .name("WAL receiver thread".into()) .spawn(move || { - walreceiver::thread_main(conf_copy, &connstr); + walreceiver::thread_main(&conf_copy, &connstr); }) .unwrap(); @@ -503,7 +503,7 @@ impl Connection { self.stream.write_i16(0).await?; /* numAttributes */ self.stream.flush().await?; - let pcache = page_cache::get_pagecache(self.conf.clone(), sysid); + let pcache = page_cache::get_pagecache(&self.conf, sysid); loop { let message = self.read_message().await?; diff --git a/pageserver/src/restore_datadir.rs b/pageserver/src/restore_datadir.rs index d490e25dea..985f5e3905 100644 --- a/pageserver/src/restore_datadir.rs +++ b/pageserver/src/restore_datadir.rs @@ -80,7 +80,7 @@ async fn restore_chunk(conf: &PageServerConf) -> Result<(), FilePathError> { } } - let pcache = page_cache::get_pagecache(conf.clone(), sys_id); + let pcache = page_cache::get_pagecache(conf, sys_id); pcache.init_valid_lsn(control_lsn); info!("{} files to restore...", slurp_futures.len()); @@ -313,7 +313,7 @@ async fn slurp_base_file( // FIXME: use constants (BLCKSZ) let mut blknum: u32 = parsed.segno * (1024 * 1024 * 1024 / 8192); - let pcache = page_cache::get_pagecache(conf.clone(), sys_id); + let pcache = page_cache::get_pagecache(conf, sys_id); let reltag = page_cache::RelTag { spcnode: parsed.spcnode, diff --git a/pageserver/src/restore_s3.rs b/pageserver/src/restore_s3.rs index 08ba3e7fa3..17e045fb5b 100644 --- a/pageserver/src/restore_s3.rs +++ b/pageserver/src/restore_s3.rs @@ -119,7 +119,7 @@ async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> { panic!("no base backup found"); } - let pcache = page_cache::get_pagecache(conf.clone(), sys_id); + let pcache = page_cache::get_pagecache(conf, sys_id); pcache.init_valid_lsn(oldest_lsn); info!("{} files to restore...", slurp_futures.len()); @@ -305,7 +305,7 @@ async fn slurp_base_file( // FIXME: use constants (BLCKSZ) let mut blknum: u32 = parsed.segno * (1024 * 1024 * 1024 / 8192); - let pcache = page_cache::get_pagecache(conf.clone(), sys_id); + let pcache = page_cache::get_pagecache(conf, sys_id); while bytes.remaining() >= 8192 { let tag = page_cache::BufferTag { diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 9f382b2efb..7ce3e9727f 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -22,7 +22,7 @@ use tokio_postgres::{connect_replication, Error, NoTls, ReplicationMode}; // // This is the entry point for the WAL receiver thread. // -pub fn thread_main(conf: PageServerConf, wal_producer_connstr: &String) { +pub fn thread_main(conf: &PageServerConf, wal_producer_connstr: &str) { info!("WAL receiver thread started: '{}'", wal_producer_connstr); let runtime = runtime::Builder::new_current_thread() @@ -32,7 +32,7 @@ pub fn thread_main(conf: PageServerConf, wal_producer_connstr: &String) { runtime.block_on(async { loop { - let _res = walreceiver_main(conf.clone(), wal_producer_connstr).await; + let _res = walreceiver_main(conf, wal_producer_connstr).await; // TODO: print/log the error info!( @@ -45,13 +45,13 @@ pub fn thread_main(conf: PageServerConf, wal_producer_connstr: &String) { } async fn walreceiver_main( - conf: PageServerConf, - wal_producer_connstr: &String, + conf: &PageServerConf, + wal_producer_connstr: &str, ) -> Result<(), Error> { // Connect to the database in replication mode. debug!("connecting to {}...", wal_producer_connstr); let (mut rclient, connection) = connect_replication( - wal_producer_connstr.as_str(), + wal_producer_connstr, NoTls, ReplicationMode::Physical, ) diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 685e771f4a..9d5a1ee143 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -41,7 +41,7 @@ static TIMEOUT: Duration = Duration::from_secs(20); // // Main entry point for the WAL applicator thread. // -pub fn wal_redo_main(conf: PageServerConf, sys_id: u64) { +pub fn wal_redo_main(conf: &PageServerConf, sys_id: u64) { info!("WAL redo thread started {}", sys_id); // We block on waiting for requests on the walredo request channel, but @@ -52,7 +52,7 @@ pub fn wal_redo_main(conf: PageServerConf, sys_id: u64) { .build() .unwrap(); - let pcache = page_cache::get_pagecache(conf.clone(), sys_id); + let pcache = page_cache::get_pagecache(conf, sys_id); // Loop forever, handling requests as they come. let walredo_channel_receiver = &pcache.walredo_receiver;