cargo fmt

This commit is contained in:
Eric Seppanen
2021-04-28 10:01:58 -07:00
parent ab61ce2267
commit 975b2d12dc
5 changed files with 38 additions and 25 deletions

View File

@@ -229,7 +229,7 @@ fn test_acceptors_unavailability() {
.unwrap();
// Here we check that the query above was hanging
// while wal_acceptor was unavailiable
assert!(now.elapsed().unwrap().as_secs() >= 2*DOWNTIME);
assert!(now.elapsed().unwrap().as_secs() >= 2 * DOWNTIME);
psql.execute("INSERT INTO t values (5, 'payload')", &[])
.unwrap();

View File

@@ -160,7 +160,6 @@ fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache
num_page_images: AtomicU64::new(0),
num_wal_records: AtomicU64::new(0),
num_getpage_requests: AtomicU64::new(0),
}
}
@@ -800,8 +799,11 @@ impl PageCache {
if (v[0] & PAGE_IMAGE_FLAG) == 0 {
trace!("Reconstruct most recent page {:?}", key);
// force reconstruction of most recent page version
let (base_img, records) = self.collect_records_for_apply(key.tag, key.lsn);
let new_img = self.walredo_mgr.request_redo(key.tag, key.lsn, base_img, records)?;
let (base_img, records) =
self.collect_records_for_apply(key.tag, key.lsn);
let new_img = self
.walredo_mgr
.request_redo(key.tag, key.lsn, base_img, records)?;
self.put_page_image(key.tag, key.lsn, new_img.clone());
@@ -824,8 +826,11 @@ impl PageCache {
let v = iter.value().unwrap();
if (v[0] & PAGE_IMAGE_FLAG) == 0 {
trace!("Reconstruct horizon page {:?}", key);
let (base_img, records) = self.collect_records_for_apply(key.tag, key.lsn);
let new_img = self.walredo_mgr.request_redo(key.tag, key.lsn, base_img, records)?;
let (base_img, records) =
self.collect_records_for_apply(key.tag, key.lsn);
let new_img = self
.walredo_mgr
.request_redo(key.tag, key.lsn, base_img, records)?;
self.put_page_image(key.tag, key.lsn, new_img.clone());
truncated += 1;

View File

@@ -38,7 +38,7 @@ use crate::page_cache::BufferTag;
use crate::page_cache::WALRecord;
use crate::ZTimelineId;
use crate::{pg_constants, PageServerConf};
use postgres_ffi::xlog_utils::{XLogRecord};
use postgres_ffi::xlog_utils::XLogRecord;
static TIMEOUT: Duration = Duration::from_secs(20);
@@ -111,14 +111,22 @@ impl WalRedoManager {
})
.unwrap();
WalRedoManager { request_tx: Mutex::new(tx) }
WalRedoManager {
request_tx: Mutex::new(tx),
}
}
///
/// Request the WAL redo manager to apply WAL records, to reconstruct the page image
/// of the given page version.
///
pub fn request_redo(&self, tag: BufferTag, lsn: Lsn, base_img: Option<Bytes>, records: Vec<WALRecord>) -> Result<Bytes, WalRedoError> {
pub fn request_redo(
&self,
tag: BufferTag,
lsn: Lsn,
base_img: Option<Bytes>,
records: Vec<WALRecord>,
) -> Result<Bytes, WalRedoError> {
// Create a channel where to receive the response
let (tx, rx) = mpsc::channel::<Result<Bytes, WalRedoError>>();

View File

@@ -40,7 +40,7 @@ pub struct FeStartupMessage {
pub version: u32,
pub kind: StartupRequestCode,
pub timelineid: ZTimelineId,
pub appname: Option<String>,
pub appname: Option<String>,
}
#[derive(Debug)]
@@ -87,7 +87,7 @@ impl FeStartupMessage {
let params = params_str.split('\0');
let mut options = false;
let mut timelineid: Option<ZTimelineId> = None;
let mut appname: Option<String> = None;
let mut appname: Option<String> = None;
for p in params {
if p == "options" {
options = true;
@@ -97,8 +97,8 @@ impl FeStartupMessage {
// FIXME: rethrow parsing error, don't unwrap
timelineid = Some(ZTimelineId::from_str(ztimelineid_str).unwrap());
} else if let Some(val) = opt.strip_prefix("application_name=") {
appname = Some(val.to_string());
}
appname = Some(val.to_string());
}
}
break;
}
@@ -114,7 +114,7 @@ impl FeStartupMessage {
Ok(Some(FeMessage::StartupMessage(FeStartupMessage {
version,
kind,
appname,
appname,
timelineid: timelineid.unwrap(),
})))
}

View File

@@ -158,12 +158,12 @@ pub struct Timeline {
#[derive(Debug)]
struct Connection {
timeline: Option<Arc<Timeline>>,
stream: TcpStream, /* Postgres connection */
inbuf: BytesMut, /* input buffer */
outbuf: BytesMut, /* output buffer */
init_done: bool, /* startup packet proceeded */
appname: Option<String>,/* assigned application name */
conf: WalAcceptorConf, /* wal acceptor configuration */
stream: TcpStream, /* Postgres connection */
inbuf: BytesMut, /* input buffer */
outbuf: BytesMut, /* output buffer */
init_done: bool, /* startup packet proceeded */
appname: Option<String>, /* assigned application name */
conf: WalAcceptorConf, /* wal acceptor configuration */
}
/*
@@ -544,7 +544,7 @@ impl Connection {
inbuf: BytesMut::with_capacity(10 * 1024),
outbuf: BytesMut::with_capacity(10 * 1024),
init_done: false,
appname: None,
appname: None,
conf: conf.clone(),
}
}
@@ -857,7 +857,7 @@ impl Connection {
self.send().await?;
self.init_done = true;
self.set_timeline(m.timelineid)?;
self.appname = m.appname;
self.appname = m.appname;
}
StartupRequestCode::Cancel => return Ok(()),
}
@@ -954,9 +954,9 @@ impl Connection {
if start_pos == 0 {
start_pos = wal_end;
}
if stop_pos == 0 && self.appname == Some("wal_proposer_recovery".to_string()) {
stop_pos = wal_end;
}
if stop_pos == 0 && self.appname == Some("wal_proposer_recovery".to_string()) {
stop_pos = wal_end;
}
info!(
"Start replication from {:X}/{:>08X} till {:X}/{:>08X}",
(start_pos >> 32) as u32,