diff --git a/integration_tests/tests/test_wal_acceptor.rs b/integration_tests/tests/test_wal_acceptor.rs index 2c1d964267..f49cf2061b 100644 --- a/integration_tests/tests/test_wal_acceptor.rs +++ b/integration_tests/tests/test_wal_acceptor.rs @@ -229,7 +229,7 @@ fn test_acceptors_unavailability() { .unwrap(); // Here we check that the query above was hanging // while wal_acceptor was unavailiable - assert!(now.elapsed().unwrap().as_secs() >= 2*DOWNTIME); + assert!(now.elapsed().unwrap().as_secs() >= 2 * DOWNTIME); psql.execute("INSERT INTO t values (5, 'payload')", &[]) .unwrap(); diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 9acf4982fd..fe35226c29 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -160,7 +160,6 @@ fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache num_page_images: AtomicU64::new(0), num_wal_records: AtomicU64::new(0), num_getpage_requests: AtomicU64::new(0), - } } @@ -800,8 +799,11 @@ impl PageCache { if (v[0] & PAGE_IMAGE_FLAG) == 0 { trace!("Reconstruct most recent page {:?}", key); // force reconstruction of most recent page version - let (base_img, records) = self.collect_records_for_apply(key.tag, key.lsn); - let new_img = self.walredo_mgr.request_redo(key.tag, key.lsn, base_img, records)?; + let (base_img, records) = + self.collect_records_for_apply(key.tag, key.lsn); + let new_img = self + .walredo_mgr + .request_redo(key.tag, key.lsn, base_img, records)?; self.put_page_image(key.tag, key.lsn, new_img.clone()); @@ -824,8 +826,11 @@ impl PageCache { let v = iter.value().unwrap(); if (v[0] & PAGE_IMAGE_FLAG) == 0 { trace!("Reconstruct horizon page {:?}", key); - let (base_img, records) = self.collect_records_for_apply(key.tag, key.lsn); - let new_img = self.walredo_mgr.request_redo(key.tag, key.lsn, base_img, records)?; + let (base_img, records) = + self.collect_records_for_apply(key.tag, key.lsn); + let new_img = self + .walredo_mgr + .request_redo(key.tag, key.lsn, base_img, records)?; self.put_page_image(key.tag, key.lsn, new_img.clone()); truncated += 1; diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 70858bc9f0..229e1c9516 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -38,7 +38,7 @@ use crate::page_cache::BufferTag; use crate::page_cache::WALRecord; use crate::ZTimelineId; use crate::{pg_constants, PageServerConf}; -use postgres_ffi::xlog_utils::{XLogRecord}; +use postgres_ffi::xlog_utils::XLogRecord; static TIMEOUT: Duration = Duration::from_secs(20); @@ -111,14 +111,22 @@ impl WalRedoManager { }) .unwrap(); - WalRedoManager { request_tx: Mutex::new(tx) } + WalRedoManager { + request_tx: Mutex::new(tx), + } } /// /// Request the WAL redo manager to apply WAL records, to reconstruct the page image /// of the given page version. /// - pub fn request_redo(&self, tag: BufferTag, lsn: Lsn, base_img: Option, records: Vec) -> Result { + pub fn request_redo( + &self, + tag: BufferTag, + lsn: Lsn, + base_img: Option, + records: Vec, + ) -> Result { // Create a channel where to receive the response let (tx, rx) = mpsc::channel::>(); diff --git a/walkeeper/src/pq_protocol.rs b/walkeeper/src/pq_protocol.rs index e959548a10..97fae0936c 100644 --- a/walkeeper/src/pq_protocol.rs +++ b/walkeeper/src/pq_protocol.rs @@ -40,7 +40,7 @@ pub struct FeStartupMessage { pub version: u32, pub kind: StartupRequestCode, pub timelineid: ZTimelineId, - pub appname: Option, + pub appname: Option, } #[derive(Debug)] @@ -87,7 +87,7 @@ impl FeStartupMessage { let params = params_str.split('\0'); let mut options = false; let mut timelineid: Option = None; - let mut appname: Option = None; + let mut appname: Option = None; for p in params { if p == "options" { options = true; @@ -97,8 +97,8 @@ impl FeStartupMessage { // FIXME: rethrow parsing error, don't unwrap timelineid = Some(ZTimelineId::from_str(ztimelineid_str).unwrap()); } else if let Some(val) = opt.strip_prefix("application_name=") { - appname = Some(val.to_string()); - } + appname = Some(val.to_string()); + } } break; } @@ -114,7 +114,7 @@ impl FeStartupMessage { Ok(Some(FeMessage::StartupMessage(FeStartupMessage { version, kind, - appname, + appname, timelineid: timelineid.unwrap(), }))) } diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index e9f58cfb0d..7e64a91eb5 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -158,12 +158,12 @@ pub struct Timeline { #[derive(Debug)] struct Connection { timeline: Option>, - stream: TcpStream, /* Postgres connection */ - inbuf: BytesMut, /* input buffer */ - outbuf: BytesMut, /* output buffer */ - init_done: bool, /* startup packet proceeded */ - appname: Option,/* assigned application name */ - conf: WalAcceptorConf, /* wal acceptor configuration */ + stream: TcpStream, /* Postgres connection */ + inbuf: BytesMut, /* input buffer */ + outbuf: BytesMut, /* output buffer */ + init_done: bool, /* startup packet proceeded */ + appname: Option, /* assigned application name */ + conf: WalAcceptorConf, /* wal acceptor configuration */ } /* @@ -544,7 +544,7 @@ impl Connection { inbuf: BytesMut::with_capacity(10 * 1024), outbuf: BytesMut::with_capacity(10 * 1024), init_done: false, - appname: None, + appname: None, conf: conf.clone(), } } @@ -857,7 +857,7 @@ impl Connection { self.send().await?; self.init_done = true; self.set_timeline(m.timelineid)?; - self.appname = m.appname; + self.appname = m.appname; } StartupRequestCode::Cancel => return Ok(()), } @@ -954,9 +954,9 @@ impl Connection { if start_pos == 0 { start_pos = wal_end; } - if stop_pos == 0 && self.appname == Some("wal_proposer_recovery".to_string()) { - stop_pos = wal_end; - } + if stop_pos == 0 && self.appname == Some("wal_proposer_recovery".to_string()) { + stop_pos = wal_end; + } info!( "Start replication from {:X}/{:>08X} till {:X}/{:>08X}", (start_pos >> 32) as u32,