mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
Prefer passing PageServerConf by reference.
It seems more idiomatic Rust.
This commit is contained in:
@@ -98,10 +98,10 @@ fn main() -> Result<()> {
|
||||
conf.listen_addr = addr.parse().unwrap();
|
||||
}
|
||||
|
||||
start_pageserver(conf)
|
||||
start_pageserver(&conf)
|
||||
}
|
||||
|
||||
fn start_pageserver(conf: PageServerConf) -> Result<()> {
|
||||
fn start_pageserver(conf: &PageServerConf) -> Result<()> {
|
||||
// Initialize logger
|
||||
let _scope_guard = init_logging(&conf)?;
|
||||
let _log_guard = slog_stdlog::init().unwrap();
|
||||
@@ -186,13 +186,13 @@ fn start_pageserver(conf: PageServerConf) -> Result<()> {
|
||||
//
|
||||
// All other wal receivers are started on demand by "callmemaybe" command
|
||||
// sent to pageserver.
|
||||
let conf_copy = conf.clone();
|
||||
if let Some(wal_producer) = conf.wal_producer_connstr {
|
||||
let conf = conf_copy.clone();
|
||||
if let Some(wal_producer) = &conf.wal_producer_connstr {
|
||||
let conf_copy = conf.clone();
|
||||
let wal_producer = wal_producer.clone();
|
||||
let walreceiver_thread = thread::Builder::new()
|
||||
.name("static WAL receiver thread".into())
|
||||
.spawn(move || {
|
||||
walreceiver::thread_main(conf, &wal_producer);
|
||||
walreceiver::thread_main(&conf_copy, &wal_producer);
|
||||
})
|
||||
.unwrap();
|
||||
threads.push(walreceiver_thread);
|
||||
@@ -200,12 +200,12 @@ fn start_pageserver(conf: PageServerConf) -> Result<()> {
|
||||
|
||||
// GetPage@LSN requests are served by another thread. (It uses async I/O,
|
||||
// but the code in page_service sets up it own thread pool for that)
|
||||
let conf = conf_copy.clone();
|
||||
let conf_copy = conf.clone();
|
||||
let page_server_thread = thread::Builder::new()
|
||||
.name("Page Service thread".into())
|
||||
.spawn(|| {
|
||||
.spawn(move || {
|
||||
// thread code
|
||||
page_service::thread_main(conf);
|
||||
page_service::thread_main(&conf_copy);
|
||||
})
|
||||
.unwrap();
|
||||
threads.push(page_server_thread);
|
||||
|
||||
@@ -113,7 +113,7 @@ lazy_static! {
|
||||
pub static ref PAGECACHES: Mutex<HashMap<u64, Arc<PageCache>>> = Mutex::new(HashMap::new());
|
||||
}
|
||||
|
||||
pub fn get_pagecache(conf: PageServerConf, sys_id: u64) -> Arc<PageCache> {
|
||||
pub fn get_pagecache(conf: &PageServerConf, sys_id: u64) -> Arc<PageCache> {
|
||||
let mut pcaches = PAGECACHES.lock().unwrap();
|
||||
|
||||
if !pcaches.contains_key(&sys_id) {
|
||||
@@ -124,10 +124,11 @@ pub fn get_pagecache(conf: PageServerConf, sys_id: u64) -> Arc<PageCache> {
|
||||
// Now join_handle is not saved any where and we won'try restart tharead
|
||||
// if it is dead. We may later stop that treads after some inactivity period
|
||||
// and restart them on demand.
|
||||
let conf = conf.clone();
|
||||
let _walredo_thread = thread::Builder::new()
|
||||
.name("WAL redo thread".into())
|
||||
.spawn(move || {
|
||||
walredo::wal_redo_main(conf, sys_id);
|
||||
walredo::wal_redo_main(&conf, sys_id);
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
@@ -215,7 +215,7 @@ impl FeMessage {
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
pub fn thread_main(conf: PageServerConf) {
|
||||
pub fn thread_main(conf: &PageServerConf) {
|
||||
// Create a new thread pool
|
||||
//
|
||||
// FIXME: keep it single-threaded for now, make it easier to debug with gdb,
|
||||
@@ -458,7 +458,7 @@ impl Connection {
|
||||
let _walreceiver_thread = thread::Builder::new()
|
||||
.name("WAL receiver thread".into())
|
||||
.spawn(move || {
|
||||
walreceiver::thread_main(conf_copy, &connstr);
|
||||
walreceiver::thread_main(&conf_copy, &connstr);
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
@@ -503,7 +503,7 @@ impl Connection {
|
||||
self.stream.write_i16(0).await?; /* numAttributes */
|
||||
self.stream.flush().await?;
|
||||
|
||||
let pcache = page_cache::get_pagecache(self.conf.clone(), sysid);
|
||||
let pcache = page_cache::get_pagecache(&self.conf, sysid);
|
||||
|
||||
loop {
|
||||
let message = self.read_message().await?;
|
||||
|
||||
@@ -80,7 +80,7 @@ async fn restore_chunk(conf: &PageServerConf) -> Result<(), FilePathError> {
|
||||
}
|
||||
}
|
||||
|
||||
let pcache = page_cache::get_pagecache(conf.clone(), sys_id);
|
||||
let pcache = page_cache::get_pagecache(conf, sys_id);
|
||||
pcache.init_valid_lsn(control_lsn);
|
||||
|
||||
info!("{} files to restore...", slurp_futures.len());
|
||||
@@ -313,7 +313,7 @@ async fn slurp_base_file(
|
||||
// FIXME: use constants (BLCKSZ)
|
||||
let mut blknum: u32 = parsed.segno * (1024 * 1024 * 1024 / 8192);
|
||||
|
||||
let pcache = page_cache::get_pagecache(conf.clone(), sys_id);
|
||||
let pcache = page_cache::get_pagecache(conf, sys_id);
|
||||
|
||||
let reltag = page_cache::RelTag {
|
||||
spcnode: parsed.spcnode,
|
||||
|
||||
@@ -119,7 +119,7 @@ async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> {
|
||||
panic!("no base backup found");
|
||||
}
|
||||
|
||||
let pcache = page_cache::get_pagecache(conf.clone(), sys_id);
|
||||
let pcache = page_cache::get_pagecache(conf, sys_id);
|
||||
pcache.init_valid_lsn(oldest_lsn);
|
||||
|
||||
info!("{} files to restore...", slurp_futures.len());
|
||||
@@ -305,7 +305,7 @@ async fn slurp_base_file(
|
||||
// FIXME: use constants (BLCKSZ)
|
||||
let mut blknum: u32 = parsed.segno * (1024 * 1024 * 1024 / 8192);
|
||||
|
||||
let pcache = page_cache::get_pagecache(conf.clone(), sys_id);
|
||||
let pcache = page_cache::get_pagecache(conf, sys_id);
|
||||
|
||||
while bytes.remaining() >= 8192 {
|
||||
let tag = page_cache::BufferTag {
|
||||
|
||||
@@ -22,7 +22,7 @@ use tokio_postgres::{connect_replication, Error, NoTls, ReplicationMode};
|
||||
//
|
||||
// This is the entry point for the WAL receiver thread.
|
||||
//
|
||||
pub fn thread_main(conf: PageServerConf, wal_producer_connstr: &String) {
|
||||
pub fn thread_main(conf: &PageServerConf, wal_producer_connstr: &str) {
|
||||
info!("WAL receiver thread started: '{}'", wal_producer_connstr);
|
||||
|
||||
let runtime = runtime::Builder::new_current_thread()
|
||||
@@ -32,7 +32,7 @@ pub fn thread_main(conf: PageServerConf, wal_producer_connstr: &String) {
|
||||
|
||||
runtime.block_on(async {
|
||||
loop {
|
||||
let _res = walreceiver_main(conf.clone(), wal_producer_connstr).await;
|
||||
let _res = walreceiver_main(conf, wal_producer_connstr).await;
|
||||
|
||||
// TODO: print/log the error
|
||||
info!(
|
||||
@@ -45,13 +45,13 @@ pub fn thread_main(conf: PageServerConf, wal_producer_connstr: &String) {
|
||||
}
|
||||
|
||||
async fn walreceiver_main(
|
||||
conf: PageServerConf,
|
||||
wal_producer_connstr: &String,
|
||||
conf: &PageServerConf,
|
||||
wal_producer_connstr: &str,
|
||||
) -> Result<(), Error> {
|
||||
// Connect to the database in replication mode.
|
||||
debug!("connecting to {}...", wal_producer_connstr);
|
||||
let (mut rclient, connection) = connect_replication(
|
||||
wal_producer_connstr.as_str(),
|
||||
wal_producer_connstr,
|
||||
NoTls,
|
||||
ReplicationMode::Physical,
|
||||
)
|
||||
|
||||
@@ -41,7 +41,7 @@ static TIMEOUT: Duration = Duration::from_secs(20);
|
||||
//
|
||||
// Main entry point for the WAL applicator thread.
|
||||
//
|
||||
pub fn wal_redo_main(conf: PageServerConf, sys_id: u64) {
|
||||
pub fn wal_redo_main(conf: &PageServerConf, sys_id: u64) {
|
||||
info!("WAL redo thread started {}", sys_id);
|
||||
|
||||
// We block on waiting for requests on the walredo request channel, but
|
||||
@@ -52,7 +52,7 @@ pub fn wal_redo_main(conf: PageServerConf, sys_id: u64) {
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let pcache = page_cache::get_pagecache(conf.clone(), sys_id);
|
||||
let pcache = page_cache::get_pagecache(conf, sys_id);
|
||||
|
||||
// Loop forever, handling requests as they come.
|
||||
let walredo_channel_receiver = &pcache.walredo_receiver;
|
||||
|
||||
Reference in New Issue
Block a user