mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 20:12:54 +00:00
clippy cleanup #2
- remove needless return - remove needless format! - remove a few more needless clone() - from_str_radix(_, 10) -> .parse() - remove needless reference - remove needless `mut` Also manually replaced a match statement with map_err() because after clippy was done with it, there was almost nothing left in the match expression.
This commit is contained in:
@@ -11,7 +11,6 @@ use std::{collections::BTreeMap, path::PathBuf};
|
||||
use anyhow::{Context, Result};
|
||||
use lazy_static::lazy_static;
|
||||
use regex::Regex;
|
||||
use tar;
|
||||
|
||||
use postgres::{Client, NoTls};
|
||||
|
||||
@@ -290,7 +289,7 @@ impl PostgresNode {
|
||||
// slot or something proper, to prevent the compute node
|
||||
// from removing WAL that hasn't been streamed to the safekeepr or
|
||||
// page server yet. But this will do for now.
|
||||
self.append_conf("postgresql.conf", &format!("wal_keep_size='10TB'\n"));
|
||||
self.append_conf("postgresql.conf", "wal_keep_size='10TB'\n");
|
||||
|
||||
// Connect it to the page server.
|
||||
|
||||
|
||||
@@ -108,7 +108,7 @@ pub fn init() -> Result<()> {
|
||||
|
||||
// ok, we are good to go
|
||||
let mut conf = LocalEnv {
|
||||
repo_path: repo_path.clone(),
|
||||
repo_path,
|
||||
pg_distrib_dir,
|
||||
zenith_distrib_dir,
|
||||
systemid: 0,
|
||||
@@ -254,7 +254,7 @@ pub fn test_env(testname: &str) -> LocalEnv {
|
||||
systemid: 0,
|
||||
};
|
||||
init_repo(&mut local_env).expect("could not initialize zenith repository");
|
||||
return local_env;
|
||||
local_env
|
||||
}
|
||||
|
||||
// Find the directory where the binaries were put (i.e. target/debug/)
|
||||
@@ -266,7 +266,7 @@ pub fn cargo_bin_dir() -> PathBuf {
|
||||
pathbuf.pop();
|
||||
}
|
||||
|
||||
return pathbuf;
|
||||
pathbuf
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
@@ -358,7 +358,7 @@ pub fn find_end_of_wal(local_env: &LocalEnv, timeline: ZTimelineId) -> Result<u6
|
||||
|
||||
let (lsn, _tli) = xlog_utils::find_end_of_wal(&waldir, 16 * 1024 * 1024, true);
|
||||
|
||||
return Ok(lsn);
|
||||
Ok(lsn)
|
||||
}
|
||||
|
||||
// Find the latest snapshot for a timeline
|
||||
|
||||
@@ -56,7 +56,7 @@ impl TestStorageControlPlane {
|
||||
wal_acceptors: Vec::new(),
|
||||
pageserver: pserver,
|
||||
test_done: AtomicBool::new(false),
|
||||
repopath: repopath,
|
||||
repopath,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,7 +73,7 @@ impl TestStorageControlPlane {
|
||||
wal_acceptors: Vec::new(),
|
||||
pageserver: pserver,
|
||||
test_done: AtomicBool::new(false),
|
||||
repopath: repopath,
|
||||
repopath,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -89,7 +89,7 @@ impl TestStorageControlPlane {
|
||||
listen_address: None,
|
||||
}),
|
||||
test_done: AtomicBool::new(false),
|
||||
repopath: repopath,
|
||||
repopath,
|
||||
};
|
||||
cplane.pageserver.start().unwrap();
|
||||
|
||||
@@ -233,7 +233,7 @@ impl PageServerNode {
|
||||
if !status.success() {
|
||||
anyhow::bail!("Failed to stop pageserver with pid {}", pid);
|
||||
} else {
|
||||
return Ok(());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -66,7 +66,7 @@ pub fn send_snapshot_tarball(
|
||||
continue;
|
||||
}
|
||||
|
||||
let archive_fname = relpath.to_str().unwrap().clone();
|
||||
let archive_fname = relpath.to_str().unwrap();
|
||||
let archive_fname = archive_fname
|
||||
.strip_suffix(".partial")
|
||||
.unwrap_or(&archive_fname);
|
||||
@@ -148,7 +148,7 @@ fn parse_filename(fname: &str) -> Result<(u32, u32, u32), FilePathError> {
|
||||
u32::from_str_radix(segno_match.unwrap().as_str(), 10)?
|
||||
};
|
||||
|
||||
return Ok((relnode, forknum, segno));
|
||||
Ok((relnode, forknum, segno))
|
||||
}
|
||||
|
||||
fn parse_rel_file_path(path: &str) -> Result<(), FilePathError> {
|
||||
@@ -172,7 +172,7 @@ fn parse_rel_file_path(path: &str) -> Result<(), FilePathError> {
|
||||
if let Some(fname) = path.strip_prefix("global/") {
|
||||
let (_relnode, _forknum, _segno) = parse_filename(fname)?;
|
||||
|
||||
return Ok(());
|
||||
Ok(())
|
||||
} else if let Some(dbpath) = path.strip_prefix("base/") {
|
||||
let mut s = dbpath.split("/");
|
||||
let dbnode_str = s
|
||||
@@ -188,15 +188,15 @@ fn parse_rel_file_path(path: &str) -> Result<(), FilePathError> {
|
||||
|
||||
let (_relnode, _forknum, _segno) = parse_filename(fname)?;
|
||||
|
||||
return Ok(());
|
||||
Ok(())
|
||||
} else if let Some(_) = path.strip_prefix("pg_tblspc/") {
|
||||
// TODO
|
||||
return Err(FilePathError::new("tablespaces not supported"));
|
||||
Err(FilePathError::new("tablespaces not supported"))
|
||||
} else {
|
||||
return Err(FilePathError::new("invalid relation data file name"));
|
||||
Err(FilePathError::new("invalid relation data file name"))
|
||||
}
|
||||
}
|
||||
|
||||
fn is_rel_file_path(path: &str) -> bool {
|
||||
return parse_rel_file_path(path).is_ok();
|
||||
parse_rel_file_path(path).is_ok()
|
||||
}
|
||||
|
||||
@@ -125,7 +125,7 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> {
|
||||
.with_context(|| format!("failed to open {:?}", &log_filename))?;
|
||||
|
||||
let daemonize = Daemonize::new()
|
||||
.pid_file(repodir.clone().join("pageserver.pid"))
|
||||
.pid_file(repodir.join("pageserver.pid"))
|
||||
.working_directory(repodir)
|
||||
.stdout(stdout)
|
||||
.stderr(stderr);
|
||||
@@ -197,7 +197,7 @@ fn init_logging(conf: &PageServerConf) -> Result<slog_scope::GlobalLoggerGuard,
|
||||
if record.level().is_at_least(slog::Level::Debug) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
false
|
||||
});
|
||||
let drain = std::sync::Mutex::new(drain).fuse();
|
||||
let logger = slog::Logger::root(drain, slog::o!());
|
||||
@@ -215,7 +215,7 @@ fn init_logging(conf: &PageServerConf) -> Result<slog_scope::GlobalLoggerGuard,
|
||||
{
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
false
|
||||
})
|
||||
.fuse();
|
||||
let logger = slog::Logger::root(drain, slog::o!());
|
||||
|
||||
@@ -154,7 +154,7 @@ pub fn get_or_restore_pagecache(
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
return Ok(result);
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -412,7 +412,7 @@ impl PageCache {
|
||||
tag.blknum
|
||||
);
|
||||
|
||||
return Ok(page_img);
|
||||
Ok(page_img)
|
||||
}
|
||||
|
||||
//
|
||||
@@ -467,7 +467,7 @@ impl PageCache {
|
||||
}
|
||||
|
||||
records.reverse();
|
||||
return (base_img, records);
|
||||
(base_img, records)
|
||||
}
|
||||
|
||||
//
|
||||
@@ -610,7 +610,7 @@ impl PageCache {
|
||||
pub fn get_last_valid_lsn(&self) -> u64 {
|
||||
let shared = self.shared.lock().unwrap();
|
||||
|
||||
return shared.last_record_lsn;
|
||||
shared.last_record_lsn
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
@@ -189,12 +189,11 @@ fn read_null_terminated(buf: &mut Bytes) -> Result<Bytes> {
|
||||
}
|
||||
result.put_u8(byte);
|
||||
}
|
||||
return Ok(result.freeze());
|
||||
Ok(result.freeze())
|
||||
}
|
||||
|
||||
impl FeParseMessage {
|
||||
pub fn parse(body: Bytes) -> Result<FeMessage> {
|
||||
let mut buf = body.clone();
|
||||
pub fn parse(mut buf: Bytes) -> Result<FeMessage> {
|
||||
let _pstmt_name = read_null_terminated(&mut buf)?;
|
||||
let query_string = read_null_terminated(&mut buf)?;
|
||||
let nparams = buf.get_i16();
|
||||
@@ -230,8 +229,7 @@ struct FeDescribeMessage {
|
||||
}
|
||||
|
||||
impl FeDescribeMessage {
|
||||
pub fn parse(body: Bytes) -> Result<FeMessage> {
|
||||
let mut buf = body.clone();
|
||||
pub fn parse(mut buf: Bytes) -> Result<FeMessage> {
|
||||
let kind = buf.get_u8();
|
||||
let _pstmt_name = read_null_terminated(&mut buf)?;
|
||||
|
||||
@@ -264,8 +262,7 @@ struct FeExecuteMessage {
|
||||
}
|
||||
|
||||
impl FeExecuteMessage {
|
||||
pub fn parse(body: Bytes) -> Result<FeMessage> {
|
||||
let mut buf = body.clone();
|
||||
pub fn parse(mut buf: Bytes) -> Result<FeMessage> {
|
||||
let portal_name = read_null_terminated(&mut buf)?;
|
||||
let maxrows = buf.get_i32();
|
||||
|
||||
@@ -292,8 +289,7 @@ impl FeExecuteMessage {
|
||||
struct FeBindMessage {}
|
||||
|
||||
impl FeBindMessage {
|
||||
pub fn parse(body: Bytes) -> Result<FeMessage> {
|
||||
let mut buf = body.clone();
|
||||
pub fn parse(mut buf: Bytes) -> Result<FeMessage> {
|
||||
let portal_name = read_null_terminated(&mut buf)?;
|
||||
let _pstmt_name = read_null_terminated(&mut buf)?;
|
||||
|
||||
@@ -323,8 +319,7 @@ impl FeBindMessage {
|
||||
struct FeCloseMessage {}
|
||||
|
||||
impl FeCloseMessage {
|
||||
pub fn parse(body: Bytes) -> Result<FeMessage> {
|
||||
let mut buf = body.clone();
|
||||
pub fn parse(mut buf: Bytes) -> Result<FeMessage> {
|
||||
let _kind = buf.get_u8();
|
||||
let _pstmt_or_portal_name = read_null_terminated(&mut buf)?;
|
||||
|
||||
@@ -365,7 +360,7 @@ impl FeMessage {
|
||||
let mut body = body.freeze();
|
||||
|
||||
match tag {
|
||||
b'Q' => Ok(Some(FeMessage::Query(FeQueryMessage { body: body }))),
|
||||
b'Q' => Ok(Some(FeMessage::Query(FeQueryMessage { body }))),
|
||||
b'P' => Ok(Some(FeParseMessage::parse(body)?)),
|
||||
b'D' => Ok(Some(FeDescribeMessage::parse(body)?)),
|
||||
b'E' => Ok(Some(FeExecuteMessage::parse(body)?)),
|
||||
@@ -430,7 +425,7 @@ pub fn thread_main(conf: &PageServerConf) {
|
||||
|
||||
let runtime_ref = Arc::new(runtime);
|
||||
|
||||
runtime_ref.clone().block_on(async {
|
||||
runtime_ref.block_on(async {
|
||||
let listener = TcpListener::bind(conf.listen_addr).await.unwrap();
|
||||
|
||||
loop {
|
||||
@@ -540,7 +535,7 @@ impl Connection {
|
||||
|
||||
BeMessage::RowDescription => {
|
||||
// XXX
|
||||
let mut b = Bytes::from("data\0");
|
||||
let b = Bytes::from("data\0");
|
||||
|
||||
self.stream.write_u8(b'T').await?;
|
||||
self.stream
|
||||
@@ -548,7 +543,7 @@ impl Connection {
|
||||
.await?;
|
||||
|
||||
self.stream.write_i16(1).await?;
|
||||
self.stream.write_all(&mut b).await?;
|
||||
self.stream.write_all(&b).await?;
|
||||
self.stream.write_i32(0).await?; /* table oid */
|
||||
self.stream.write_i16(0).await?; /* attnum */
|
||||
self.stream.write_i32(25).await?; /* TEXTOID */
|
||||
@@ -560,34 +555,34 @@ impl Connection {
|
||||
// XXX: accept some text data
|
||||
BeMessage::DataRow => {
|
||||
// XXX
|
||||
let mut b = Bytes::from("hello world");
|
||||
let b = Bytes::from("hello world");
|
||||
|
||||
self.stream.write_u8(b'D').await?;
|
||||
self.stream.write_i32(4 + 2 + 4 + b.len() as i32).await?;
|
||||
|
||||
self.stream.write_i16(1).await?;
|
||||
self.stream.write_i32(b.len() as i32).await?;
|
||||
self.stream.write_all(&mut b).await?;
|
||||
self.stream.write_all(&b).await?;
|
||||
}
|
||||
|
||||
BeMessage::ControlFile => {
|
||||
// TODO pass checkpoint and xid info in this message
|
||||
let mut b = Bytes::from("hello pg_control");
|
||||
let b = Bytes::from("hello pg_control");
|
||||
|
||||
self.stream.write_u8(b'D').await?;
|
||||
self.stream.write_i32(4 + 2 + 4 + b.len() as i32).await?;
|
||||
|
||||
self.stream.write_i16(1).await?;
|
||||
self.stream.write_i32(b.len() as i32).await?;
|
||||
self.stream.write_all(&mut b).await?;
|
||||
self.stream.write_all(&b).await?;
|
||||
}
|
||||
|
||||
BeMessage::CommandComplete => {
|
||||
let mut b = Bytes::from("SELECT 1\0");
|
||||
let b = Bytes::from("SELECT 1\0");
|
||||
|
||||
self.stream.write_u8(b'C').await?;
|
||||
self.stream.write_i32(4 + b.len() as i32).await?;
|
||||
self.stream.write_all(&mut b).await?;
|
||||
self.stream.write_all(&b).await?;
|
||||
}
|
||||
|
||||
BeMessage::ZenithStatusResponse(resp) => {
|
||||
@@ -614,7 +609,7 @@ impl Connection {
|
||||
self.stream.write_u8(102).await?; /* tag from pagestore_client.h */
|
||||
self.stream.write_u8(resp.ok as u8).await?;
|
||||
self.stream.write_u32(resp.n_blocks).await?;
|
||||
self.stream.write_all(&mut resp.page.clone()).await?;
|
||||
self.stream.write_all(&resp.page.clone()).await?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -637,8 +632,8 @@ impl Connection {
|
||||
|
||||
match m.kind {
|
||||
StartupRequestCode::NegotiateGss | StartupRequestCode::NegotiateSsl => {
|
||||
let mut b = Bytes::from("N");
|
||||
self.stream.write_all(&mut b).await?;
|
||||
let b = Bytes::from("N");
|
||||
self.stream.write_all(&b).await?;
|
||||
self.stream.flush().await?;
|
||||
}
|
||||
StartupRequestCode::Normal => {
|
||||
@@ -730,7 +725,7 @@ impl Connection {
|
||||
let caps = re.captures(&query_str);
|
||||
let caps = caps.unwrap();
|
||||
|
||||
let timelineid = ZTimelineId::from_str(caps.get(1).unwrap().as_str().clone()).unwrap();
|
||||
let timelineid = ZTimelineId::from_str(caps.get(1).unwrap().as_str()).unwrap();
|
||||
let connstr: String = String::from(caps.get(2).unwrap().as_str());
|
||||
|
||||
// Check that the timeline exists
|
||||
@@ -952,7 +947,7 @@ impl Connection {
|
||||
joinres.unwrap_err(),
|
||||
));
|
||||
}
|
||||
return joinres.unwrap();
|
||||
joinres.unwrap()
|
||||
};
|
||||
|
||||
let f_pump = async move {
|
||||
@@ -961,12 +956,12 @@ impl Connection {
|
||||
if buf.is_none() {
|
||||
break;
|
||||
}
|
||||
let mut buf = buf.unwrap();
|
||||
let buf = buf.unwrap();
|
||||
|
||||
// CopyData
|
||||
stream.write_u8(b'd').await?;
|
||||
stream.write_u32((4 + buf.len()) as u32).await?;
|
||||
stream.write_all(&mut buf).await?;
|
||||
stream.write_all(&buf).await?;
|
||||
trace!("CopyData sent for {} bytes!", buf.len());
|
||||
|
||||
// FIXME: flush isn't really required, but makes it easier
|
||||
|
||||
@@ -205,9 +205,9 @@ fn restore_relfile(
|
||||
let tag = page_cache::BufferTag {
|
||||
spcnode: spcoid,
|
||||
dbnode: dboid,
|
||||
relnode: relnode,
|
||||
relnode,
|
||||
forknum: forknum as u8,
|
||||
blknum: blknum,
|
||||
blknum,
|
||||
};
|
||||
pcache.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf));
|
||||
/*
|
||||
@@ -236,7 +236,7 @@ fn restore_relfile(
|
||||
let tag = page_cache::RelTag {
|
||||
spcnode: spcoid,
|
||||
dbnode: dboid,
|
||||
relnode: relnode,
|
||||
relnode,
|
||||
forknum: forknum as u8,
|
||||
};
|
||||
pcache.relsize_inc(&tag, blknum);
|
||||
@@ -254,7 +254,7 @@ fn restore_wal(
|
||||
) -> Result<()> {
|
||||
let walpath = format!("timelines/{}/wal", timeline);
|
||||
|
||||
let mut waldecoder = WalStreamDecoder::new(u64::from(startpoint));
|
||||
let mut waldecoder = WalStreamDecoder::new(startpoint);
|
||||
|
||||
let mut segno = XLByteToSeg(startpoint, 16 * 1024 * 1024);
|
||||
let mut offset = XLogSegmentOffset(startpoint, 16 * 1024 * 1024);
|
||||
@@ -315,7 +315,7 @@ fn restore_wal(
|
||||
};
|
||||
|
||||
let rec = page_cache::WALRecord {
|
||||
lsn: lsn,
|
||||
lsn,
|
||||
will_init: blk.will_init || blk.apply_image,
|
||||
rec: recdata.clone(),
|
||||
main_data_offset: decoded.main_data_offset,
|
||||
@@ -485,5 +485,5 @@ fn parse_relfilename(fname: &str) -> Result<(u32, u32, u32), FilePathError> {
|
||||
u32::from_str_radix(segno_match.unwrap().as_str(), 10)?
|
||||
};
|
||||
|
||||
return Ok((relnode, forknum, segno));
|
||||
Ok((relnode, forknum, segno))
|
||||
}
|
||||
|
||||
@@ -38,12 +38,9 @@ pub fn restore_main(conf: &PageServerConf) {
|
||||
let result = restore_chunk(conf).await;
|
||||
|
||||
match result {
|
||||
Ok(_) => {
|
||||
return;
|
||||
}
|
||||
Ok(_) => {}
|
||||
Err(err) => {
|
||||
error!("S3 error: {}", err);
|
||||
return;
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -199,7 +196,7 @@ fn parse_filename(fname: &str) -> Result<(u32, u32, u32, u64), FilePathError> {
|
||||
.ok_or_else(|| FilePathError::new("invalid relation data file name"))?;
|
||||
|
||||
let relnode_str = caps.name("relnode").unwrap().as_str();
|
||||
let relnode = u32::from_str_radix(relnode_str, 10)?;
|
||||
let relnode: u32 = relnode_str.parse()?;
|
||||
|
||||
let forkname_match = caps.name("forkname");
|
||||
let forkname = if forkname_match.is_none() {
|
||||
@@ -213,14 +210,14 @@ fn parse_filename(fname: &str) -> Result<(u32, u32, u32, u64), FilePathError> {
|
||||
let segno = if segno_match.is_none() {
|
||||
0
|
||||
} else {
|
||||
u32::from_str_radix(segno_match.unwrap().as_str(), 10)?
|
||||
segno_match.unwrap().as_str().parse::<u32>()?
|
||||
};
|
||||
|
||||
let lsn_hi = u64::from_str_radix(caps.name("lsnhi").unwrap().as_str(), 16)?;
|
||||
let lsn_lo = u64::from_str_radix(caps.name("lsnlo").unwrap().as_str(), 16)?;
|
||||
let lsn_hi: u64 = caps.name("lsnhi").unwrap().as_str().parse()?;
|
||||
let lsn_lo: u64 = caps.name("lsnlo").unwrap().as_str().parse()?;
|
||||
let lsn = lsn_hi << 32 | lsn_lo;
|
||||
|
||||
return Ok((relnode, forknum, segno, lsn));
|
||||
Ok((relnode, forknum, segno, lsn))
|
||||
}
|
||||
|
||||
fn parse_rel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathError> {
|
||||
@@ -244,20 +241,20 @@ fn parse_rel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathEr
|
||||
if let Some(fname) = path.strip_prefix("global/") {
|
||||
let (relnode, forknum, segno, lsn) = parse_filename(fname)?;
|
||||
|
||||
return Ok(ParsedBaseImageFileName {
|
||||
Ok(ParsedBaseImageFileName {
|
||||
spcnode: GLOBALTABLESPACE_OID,
|
||||
dbnode: 0,
|
||||
relnode,
|
||||
forknum,
|
||||
segno,
|
||||
lsn,
|
||||
});
|
||||
})
|
||||
} else if let Some(dbpath) = path.strip_prefix("base/") {
|
||||
let mut s = dbpath.split("/");
|
||||
let dbnode_str = s
|
||||
.next()
|
||||
.ok_or_else(|| FilePathError::new("invalid relation data file name"))?;
|
||||
let dbnode = u32::from_str_radix(dbnode_str, 10)?;
|
||||
let dbnode: u32 = dbnode_str.parse()?;
|
||||
let fname = s
|
||||
.next()
|
||||
.ok_or_else(|| FilePathError::new("invalid relation data file name"))?;
|
||||
@@ -267,19 +264,19 @@ fn parse_rel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathEr
|
||||
|
||||
let (relnode, forknum, segno, lsn) = parse_filename(fname)?;
|
||||
|
||||
return Ok(ParsedBaseImageFileName {
|
||||
Ok(ParsedBaseImageFileName {
|
||||
spcnode: DEFAULTTABLESPACE_OID,
|
||||
dbnode,
|
||||
relnode,
|
||||
forknum,
|
||||
segno,
|
||||
lsn,
|
||||
});
|
||||
})
|
||||
} else if let Some(_) = path.strip_prefix("pg_tblspc/") {
|
||||
// TODO
|
||||
return Err(FilePathError::new("tablespaces not supported"));
|
||||
Err(FilePathError::new("tablespaces not supported"))
|
||||
} else {
|
||||
return Err(FilePathError::new("invalid relation data file name"));
|
||||
Err(FilePathError::new("invalid relation data file name"))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -31,7 +31,7 @@ pub fn init_logging() -> slog_scope::GlobalLoggerGuard {
|
||||
{
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
false
|
||||
})
|
||||
.fuse();
|
||||
|
||||
@@ -41,7 +41,7 @@ pub fn init_logging() -> slog_scope::GlobalLoggerGuard {
|
||||
{
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
false
|
||||
})
|
||||
.fuse();
|
||||
|
||||
@@ -52,7 +52,7 @@ pub fn init_logging() -> slog_scope::GlobalLoggerGuard {
|
||||
{
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
false
|
||||
})
|
||||
.fuse();
|
||||
|
||||
@@ -65,7 +65,7 @@ pub fn init_logging() -> slog_scope::GlobalLoggerGuard {
|
||||
{
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
false
|
||||
})
|
||||
.fuse();
|
||||
|
||||
@@ -84,11 +84,11 @@ pub fn init_logging() -> slog_scope::GlobalLoggerGuard {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
false
|
||||
})
|
||||
.fuse();
|
||||
let logger = slog::Logger::root(drain, slog::o!());
|
||||
return slog_scope::set_global_logger(logger);
|
||||
slog_scope::set_global_logger(logger)
|
||||
}
|
||||
|
||||
pub fn ui_main() -> Result<(), Box<dyn Error>> {
|
||||
|
||||
@@ -76,8 +76,8 @@ impl Events {
|
||||
};
|
||||
Events {
|
||||
rx,
|
||||
ignore_exit_key,
|
||||
input_handle,
|
||||
ignore_exit_key,
|
||||
tick_handle,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,7 +51,7 @@ impl Drain for TuiLogger {
|
||||
events.pop_back();
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -227,7 +227,7 @@ impl WalStreamDecoder {
|
||||
// FIXME: check that hdr.xlp_rem_len matches self.contlen
|
||||
//println!("next xlog page (xlp_rem_len: {})", hdr.xlp_rem_len);
|
||||
|
||||
return hdr;
|
||||
hdr
|
||||
}
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
@@ -239,7 +239,7 @@ impl WalStreamDecoder {
|
||||
xlp_xlog_blcksz: self.inputbuf.get_u32_le(),
|
||||
};
|
||||
|
||||
return hdr;
|
||||
hdr
|
||||
}
|
||||
}
|
||||
|
||||
@@ -350,7 +350,7 @@ fn is_xlog_switch_record(rec: &Bytes) -> bool {
|
||||
buf.advance(2); // 2 bytes of padding
|
||||
let _xl_crc = buf.get_u32_le();
|
||||
|
||||
return xl_info == pg_constants::XLOG_SWITCH && xl_rmid == pg_constants::RM_XLOG_ID;
|
||||
xl_info == pg_constants::XLOG_SWITCH && xl_rmid == pg_constants::RM_XLOG_ID
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
@@ -651,6 +651,6 @@ pub fn decode_wal_record(rec: Bytes) -> DecodedWALRecord {
|
||||
DecodedWALRecord {
|
||||
record: rec,
|
||||
blocks,
|
||||
main_data_offset: main_data_offset,
|
||||
main_data_offset,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -409,7 +409,7 @@ fn write_wal_file(
|
||||
let mut bytes_written: usize = 0;
|
||||
let mut partial;
|
||||
let mut start_pos = startpos;
|
||||
const ZERO_BLOCK: &'static [u8] = &[0u8; XLOG_BLCKSZ];
|
||||
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
|
||||
|
||||
let wal_dir = PathBuf::from(format!("timelines/{}/wal", timeline));
|
||||
|
||||
|
||||
@@ -224,7 +224,7 @@ fn handle_apply_request(
|
||||
// Wake up the requester, whether the operation succeeded or not.
|
||||
entry_rc.walredo_condvar.notify_all();
|
||||
|
||||
return result;
|
||||
result
|
||||
}
|
||||
|
||||
struct WalRedoProcess {
|
||||
@@ -317,7 +317,7 @@ impl WalRedoProcess {
|
||||
) -> Result<Bytes, Error> {
|
||||
let mut stdin = self.stdin.borrow_mut();
|
||||
let mut stdout = self.stdout.borrow_mut();
|
||||
return runtime.block_on(async {
|
||||
runtime.block_on(async {
|
||||
//
|
||||
// This async block sends all the commands to the process.
|
||||
//
|
||||
@@ -380,7 +380,7 @@ impl WalRedoProcess {
|
||||
let buf = res.0;
|
||||
|
||||
Ok::<Bytes, Error>(Bytes::from(std::vec::Vec::from(buf)))
|
||||
});
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -398,7 +398,7 @@ fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes {
|
||||
|
||||
assert!(buf.len() == 1 + len);
|
||||
|
||||
return buf.freeze();
|
||||
buf.freeze()
|
||||
}
|
||||
|
||||
fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes {
|
||||
@@ -418,7 +418,7 @@ fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes {
|
||||
|
||||
assert!(buf.len() == 1 + len);
|
||||
|
||||
return buf.freeze();
|
||||
buf.freeze()
|
||||
}
|
||||
|
||||
fn build_apply_record_msg(endlsn: u64, rec: Bytes) -> Bytes {
|
||||
@@ -432,7 +432,7 @@ fn build_apply_record_msg(endlsn: u64, rec: Bytes) -> Bytes {
|
||||
|
||||
assert!(buf.len() == 1 + len);
|
||||
|
||||
return buf.freeze();
|
||||
buf.freeze()
|
||||
}
|
||||
|
||||
fn build_get_page_msg(tag: BufferTag) -> Bytes {
|
||||
@@ -449,5 +449,5 @@ fn build_get_page_msg(tag: BufferTag) -> Bytes {
|
||||
|
||||
assert!(buf.len() == 1 + len);
|
||||
|
||||
return buf.freeze();
|
||||
buf.freeze()
|
||||
}
|
||||
|
||||
@@ -18,13 +18,13 @@ impl ControlFileData {
|
||||
controlfile =
|
||||
unsafe { std::mem::transmute::<[u8; SIZEOF_CONTROLDATA], ControlFileData>(b) };
|
||||
|
||||
return controlfile;
|
||||
controlfile
|
||||
}
|
||||
}
|
||||
|
||||
pub fn decode_pg_control(buf: Bytes) -> Result<ControlFileData, anyhow::Error> {
|
||||
pub fn decode_pg_control(mut buf: Bytes) -> Result<ControlFileData, anyhow::Error> {
|
||||
let mut b: [u8; SIZEOF_CONTROLDATA] = [0u8; SIZEOF_CONTROLDATA];
|
||||
buf.clone().copy_to_slice(&mut b);
|
||||
buf.copy_to_slice(&mut b);
|
||||
|
||||
let controlfile: ControlFileData;
|
||||
|
||||
@@ -63,5 +63,5 @@ pub fn encode_pg_control(controlfile: ControlFileData) -> Bytes {
|
||||
// Fill the rest of the control file with zeros.
|
||||
buf.resize(PG_CONTROL_FILE_SIZE as usize, 0);
|
||||
|
||||
return buf.into();
|
||||
buf.into()
|
||||
}
|
||||
|
||||
@@ -69,7 +69,7 @@ fn main() -> Result<()> {
|
||||
|
||||
let mut conf = WalAcceptorConf {
|
||||
data_dir: PathBuf::from("./"),
|
||||
systemid: systemid,
|
||||
systemid,
|
||||
daemonize: false,
|
||||
no_sync: false,
|
||||
pageserver_addr: None,
|
||||
|
||||
@@ -444,7 +444,7 @@ impl Timeline {
|
||||
|
||||
fn get_hs_feedback(&self) -> HotStandbyFeedback {
|
||||
let shared_state = self.mutex.lock().unwrap();
|
||||
return shared_state.hs_feedback;
|
||||
shared_state.hs_feedback
|
||||
}
|
||||
|
||||
// Load and lock control file (prevent running more than one instance of safekeeper)
|
||||
@@ -527,7 +527,7 @@ impl Timeline {
|
||||
|
||||
let file = shared_state.control_file.as_mut().unwrap();
|
||||
file.seek(SeekFrom::Start(0))?;
|
||||
file.write_all(&mut buf[..])?;
|
||||
file.write_all(&buf[..])?;
|
||||
if sync {
|
||||
file.sync_all()?;
|
||||
}
|
||||
|
||||
@@ -23,17 +23,17 @@ pub type XLogSegNo = u64;
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
pub fn XLogSegmentOffset(xlogptr: XLogRecPtr, wal_segsz_bytes: usize) -> u32 {
|
||||
return (xlogptr as u32) & (wal_segsz_bytes as u32 - 1);
|
||||
(xlogptr as u32) & (wal_segsz_bytes as u32 - 1)
|
||||
}
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
|
||||
return (0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo;
|
||||
(0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
|
||||
}
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
pub fn XLByteToSeg(xlogptr: XLogRecPtr, wal_segsz_bytes: usize) -> XLogSegNo {
|
||||
return xlogptr / wal_segsz_bytes as u64;
|
||||
xlogptr / wal_segsz_bytes as u64
|
||||
}
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
@@ -42,7 +42,7 @@ pub fn XLogSegNoOffsetToRecPtr(
|
||||
offset: u32,
|
||||
wal_segsz_bytes: usize,
|
||||
) -> XLogRecPtr {
|
||||
return segno * (wal_segsz_bytes as u64) + (offset as u64);
|
||||
segno * (wal_segsz_bytes as u64) + (offset as u64)
|
||||
}
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
@@ -60,7 +60,7 @@ pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLin
|
||||
let tli = u32::from_str_radix(&fname[0..8], 16).unwrap();
|
||||
let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo;
|
||||
let seg = u32::from_str_radix(&fname[16..24], 16).unwrap() as XLogSegNo;
|
||||
return (log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli);
|
||||
(log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli)
|
||||
}
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
@@ -70,7 +70,7 @@ pub fn IsXLogFileName(fname: &str) -> bool {
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
pub fn IsPartialXLogFileName(fname: &str) -> bool {
|
||||
return fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8]);
|
||||
fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8])
|
||||
}
|
||||
|
||||
pub fn get_current_timestamp() -> TimestampTz {
|
||||
@@ -181,7 +181,7 @@ fn find_end_of_wal_segment(
|
||||
}
|
||||
}
|
||||
}
|
||||
return last_valid_rec_pos as u32;
|
||||
last_valid_rec_pos as u32
|
||||
}
|
||||
|
||||
pub fn find_end_of_wal(
|
||||
@@ -237,7 +237,7 @@ pub fn find_end_of_wal(
|
||||
let high_ptr = XLogSegNoOffsetToRecPtr(high_segno, high_offs, wal_seg_size);
|
||||
return (high_ptr, high_tli);
|
||||
}
|
||||
return (0, 0);
|
||||
(0, 0)
|
||||
}
|
||||
|
||||
pub fn main() {
|
||||
|
||||
@@ -76,7 +76,7 @@ fn main() -> Result<()> {
|
||||
|
||||
// all other commands would need config
|
||||
|
||||
let repopath = PathBuf::from(zenith_repo_dir());
|
||||
let repopath = zenith_repo_dir();
|
||||
if !repopath.exists() {
|
||||
bail!(
|
||||
"Zenith repository does not exists in {}.\n\
|
||||
|
||||
Reference in New Issue
Block a user