From 1f3f4cfaf59bfb4470206d4246a287d2b5cda161 Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Fri, 16 Apr 2021 16:55:04 -0700 Subject: [PATCH 1/5] clippy cleanup #2 - remove needless return - remove needless format! - remove a few more needless clone() - from_str_radix(_, 10) -> .parse() - remove needless reference - remove needless `mut` Also manually replaced a match statement with map_err() because after clippy was done with it, there was almost nothing left in the match expression. --- control_plane/src/compute.rs | 3 +- control_plane/src/local_env.rs | 8 ++--- control_plane/src/storage.rs | 8 ++--- pageserver/src/basebackup.rs | 14 ++++---- pageserver/src/bin/pageserver.rs | 6 ++-- pageserver/src/page_cache.rs | 8 ++--- pageserver/src/page_service.rs | 51 +++++++++++++--------------- pageserver/src/restore_local_repo.rs | 12 +++---- pageserver/src/restore_s3.rs | 29 +++++++--------- pageserver/src/tui.rs | 12 +++---- pageserver/src/tui_event.rs | 2 +- pageserver/src/tui_logger.rs | 2 +- pageserver/src/waldecoder.rs | 8 ++--- pageserver/src/walreceiver.rs | 2 +- pageserver/src/walredo.rs | 14 ++++---- postgres_ffi/src/lib.rs | 8 ++--- walkeeper/src/bin/wal_acceptor.rs | 2 +- walkeeper/src/wal_service.rs | 4 +-- walkeeper/src/xlog_utils.rs | 16 ++++----- zenith/src/main.rs | 2 +- 20 files changed, 101 insertions(+), 110 deletions(-) diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 69c9deff7f..f95c2ba7b1 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -11,7 +11,6 @@ use std::{collections::BTreeMap, path::PathBuf}; use anyhow::{Context, Result}; use lazy_static::lazy_static; use regex::Regex; -use tar; use postgres::{Client, NoTls}; @@ -290,7 +289,7 @@ impl PostgresNode { // slot or something proper, to prevent the compute node // from removing WAL that hasn't been streamed to the safekeepr or // page server yet. But this will do for now. - self.append_conf("postgresql.conf", &format!("wal_keep_size='10TB'\n")); + self.append_conf("postgresql.conf", "wal_keep_size='10TB'\n"); // Connect it to the page server. diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index adf5d6164c..10b1b92049 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -108,7 +108,7 @@ pub fn init() -> Result<()> { // ok, we are good to go let mut conf = LocalEnv { - repo_path: repo_path.clone(), + repo_path, pg_distrib_dir, zenith_distrib_dir, systemid: 0, @@ -254,7 +254,7 @@ pub fn test_env(testname: &str) -> LocalEnv { systemid: 0, }; init_repo(&mut local_env).expect("could not initialize zenith repository"); - return local_env; + local_env } // Find the directory where the binaries were put (i.e. target/debug/) @@ -266,7 +266,7 @@ pub fn cargo_bin_dir() -> PathBuf { pathbuf.pop(); } - return pathbuf; + pathbuf } #[derive(Debug, Clone, Copy)] @@ -358,7 +358,7 @@ pub fn find_end_of_wal(local_env: &LocalEnv, timeline: ZTimelineId) -> Result Result<(u32, u32, u32), FilePathError> { u32::from_str_radix(segno_match.unwrap().as_str(), 10)? }; - return Ok((relnode, forknum, segno)); + Ok((relnode, forknum, segno)) } fn parse_rel_file_path(path: &str) -> Result<(), FilePathError> { @@ -172,7 +172,7 @@ fn parse_rel_file_path(path: &str) -> Result<(), FilePathError> { if let Some(fname) = path.strip_prefix("global/") { let (_relnode, _forknum, _segno) = parse_filename(fname)?; - return Ok(()); + Ok(()) } else if let Some(dbpath) = path.strip_prefix("base/") { let mut s = dbpath.split("/"); let dbnode_str = s @@ -188,15 +188,15 @@ fn parse_rel_file_path(path: &str) -> Result<(), FilePathError> { let (_relnode, _forknum, _segno) = parse_filename(fname)?; - return Ok(()); + Ok(()) } else if let Some(_) = path.strip_prefix("pg_tblspc/") { // TODO - return Err(FilePathError::new("tablespaces not supported")); + Err(FilePathError::new("tablespaces not supported")) } else { - return Err(FilePathError::new("invalid relation data file name")); + Err(FilePathError::new("invalid relation data file name")) } } fn is_rel_file_path(path: &str) -> bool { - return parse_rel_file_path(path).is_ok(); + parse_rel_file_path(path).is_ok() } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index f8dfc32c5e..98c5eecee2 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -125,7 +125,7 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> { .with_context(|| format!("failed to open {:?}", &log_filename))?; let daemonize = Daemonize::new() - .pid_file(repodir.clone().join("pageserver.pid")) + .pid_file(repodir.join("pageserver.pid")) .working_directory(repodir) .stdout(stdout) .stderr(stderr); @@ -197,7 +197,7 @@ fn init_logging(conf: &PageServerConf) -> Result Result u64 { let shared = self.shared.lock().unwrap(); - return shared.last_record_lsn; + shared.last_record_lsn } // diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index c23537233d..f95dd84039 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -189,12 +189,11 @@ fn read_null_terminated(buf: &mut Bytes) -> Result { } result.put_u8(byte); } - return Ok(result.freeze()); + Ok(result.freeze()) } impl FeParseMessage { - pub fn parse(body: Bytes) -> Result { - let mut buf = body.clone(); + pub fn parse(mut buf: Bytes) -> Result { let _pstmt_name = read_null_terminated(&mut buf)?; let query_string = read_null_terminated(&mut buf)?; let nparams = buf.get_i16(); @@ -230,8 +229,7 @@ struct FeDescribeMessage { } impl FeDescribeMessage { - pub fn parse(body: Bytes) -> Result { - let mut buf = body.clone(); + pub fn parse(mut buf: Bytes) -> Result { let kind = buf.get_u8(); let _pstmt_name = read_null_terminated(&mut buf)?; @@ -264,8 +262,7 @@ struct FeExecuteMessage { } impl FeExecuteMessage { - pub fn parse(body: Bytes) -> Result { - let mut buf = body.clone(); + pub fn parse(mut buf: Bytes) -> Result { let portal_name = read_null_terminated(&mut buf)?; let maxrows = buf.get_i32(); @@ -292,8 +289,7 @@ impl FeExecuteMessage { struct FeBindMessage {} impl FeBindMessage { - pub fn parse(body: Bytes) -> Result { - let mut buf = body.clone(); + pub fn parse(mut buf: Bytes) -> Result { let portal_name = read_null_terminated(&mut buf)?; let _pstmt_name = read_null_terminated(&mut buf)?; @@ -323,8 +319,7 @@ impl FeBindMessage { struct FeCloseMessage {} impl FeCloseMessage { - pub fn parse(body: Bytes) -> Result { - let mut buf = body.clone(); + pub fn parse(mut buf: Bytes) -> Result { let _kind = buf.get_u8(); let _pstmt_or_portal_name = read_null_terminated(&mut buf)?; @@ -365,7 +360,7 @@ impl FeMessage { let mut body = body.freeze(); match tag { - b'Q' => Ok(Some(FeMessage::Query(FeQueryMessage { body: body }))), + b'Q' => Ok(Some(FeMessage::Query(FeQueryMessage { body }))), b'P' => Ok(Some(FeParseMessage::parse(body)?)), b'D' => Ok(Some(FeDescribeMessage::parse(body)?)), b'E' => Ok(Some(FeExecuteMessage::parse(body)?)), @@ -430,7 +425,7 @@ pub fn thread_main(conf: &PageServerConf) { let runtime_ref = Arc::new(runtime); - runtime_ref.clone().block_on(async { + runtime_ref.block_on(async { let listener = TcpListener::bind(conf.listen_addr).await.unwrap(); loop { @@ -540,7 +535,7 @@ impl Connection { BeMessage::RowDescription => { // XXX - let mut b = Bytes::from("data\0"); + let b = Bytes::from("data\0"); self.stream.write_u8(b'T').await?; self.stream @@ -548,7 +543,7 @@ impl Connection { .await?; self.stream.write_i16(1).await?; - self.stream.write_all(&mut b).await?; + self.stream.write_all(&b).await?; self.stream.write_i32(0).await?; /* table oid */ self.stream.write_i16(0).await?; /* attnum */ self.stream.write_i32(25).await?; /* TEXTOID */ @@ -560,34 +555,34 @@ impl Connection { // XXX: accept some text data BeMessage::DataRow => { // XXX - let mut b = Bytes::from("hello world"); + let b = Bytes::from("hello world"); self.stream.write_u8(b'D').await?; self.stream.write_i32(4 + 2 + 4 + b.len() as i32).await?; self.stream.write_i16(1).await?; self.stream.write_i32(b.len() as i32).await?; - self.stream.write_all(&mut b).await?; + self.stream.write_all(&b).await?; } BeMessage::ControlFile => { // TODO pass checkpoint and xid info in this message - let mut b = Bytes::from("hello pg_control"); + let b = Bytes::from("hello pg_control"); self.stream.write_u8(b'D').await?; self.stream.write_i32(4 + 2 + 4 + b.len() as i32).await?; self.stream.write_i16(1).await?; self.stream.write_i32(b.len() as i32).await?; - self.stream.write_all(&mut b).await?; + self.stream.write_all(&b).await?; } BeMessage::CommandComplete => { - let mut b = Bytes::from("SELECT 1\0"); + let b = Bytes::from("SELECT 1\0"); self.stream.write_u8(b'C').await?; self.stream.write_i32(4 + b.len() as i32).await?; - self.stream.write_all(&mut b).await?; + self.stream.write_all(&b).await?; } BeMessage::ZenithStatusResponse(resp) => { @@ -614,7 +609,7 @@ impl Connection { self.stream.write_u8(102).await?; /* tag from pagestore_client.h */ self.stream.write_u8(resp.ok as u8).await?; self.stream.write_u32(resp.n_blocks).await?; - self.stream.write_all(&mut resp.page.clone()).await?; + self.stream.write_all(&resp.page.clone()).await?; } } @@ -637,8 +632,8 @@ impl Connection { match m.kind { StartupRequestCode::NegotiateGss | StartupRequestCode::NegotiateSsl => { - let mut b = Bytes::from("N"); - self.stream.write_all(&mut b).await?; + let b = Bytes::from("N"); + self.stream.write_all(&b).await?; self.stream.flush().await?; } StartupRequestCode::Normal => { @@ -730,7 +725,7 @@ impl Connection { let caps = re.captures(&query_str); let caps = caps.unwrap(); - let timelineid = ZTimelineId::from_str(caps.get(1).unwrap().as_str().clone()).unwrap(); + let timelineid = ZTimelineId::from_str(caps.get(1).unwrap().as_str()).unwrap(); let connstr: String = String::from(caps.get(2).unwrap().as_str()); // Check that the timeline exists @@ -952,7 +947,7 @@ impl Connection { joinres.unwrap_err(), )); } - return joinres.unwrap(); + joinres.unwrap() }; let f_pump = async move { @@ -961,12 +956,12 @@ impl Connection { if buf.is_none() { break; } - let mut buf = buf.unwrap(); + let buf = buf.unwrap(); // CopyData stream.write_u8(b'd').await?; stream.write_u32((4 + buf.len()) as u32).await?; - stream.write_all(&mut buf).await?; + stream.write_all(&buf).await?; trace!("CopyData sent for {} bytes!", buf.len()); // FIXME: flush isn't really required, but makes it easier diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 5c39d805f6..8716536fb2 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -205,9 +205,9 @@ fn restore_relfile( let tag = page_cache::BufferTag { spcnode: spcoid, dbnode: dboid, - relnode: relnode, + relnode, forknum: forknum as u8, - blknum: blknum, + blknum, }; pcache.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf)); /* @@ -236,7 +236,7 @@ fn restore_relfile( let tag = page_cache::RelTag { spcnode: spcoid, dbnode: dboid, - relnode: relnode, + relnode, forknum: forknum as u8, }; pcache.relsize_inc(&tag, blknum); @@ -254,7 +254,7 @@ fn restore_wal( ) -> Result<()> { let walpath = format!("timelines/{}/wal", timeline); - let mut waldecoder = WalStreamDecoder::new(u64::from(startpoint)); + let mut waldecoder = WalStreamDecoder::new(startpoint); let mut segno = XLByteToSeg(startpoint, 16 * 1024 * 1024); let mut offset = XLogSegmentOffset(startpoint, 16 * 1024 * 1024); @@ -315,7 +315,7 @@ fn restore_wal( }; let rec = page_cache::WALRecord { - lsn: lsn, + lsn, will_init: blk.will_init || blk.apply_image, rec: recdata.clone(), main_data_offset: decoded.main_data_offset, @@ -485,5 +485,5 @@ fn parse_relfilename(fname: &str) -> Result<(u32, u32, u32), FilePathError> { u32::from_str_radix(segno_match.unwrap().as_str(), 10)? }; - return Ok((relnode, forknum, segno)); + Ok((relnode, forknum, segno)) } diff --git a/pageserver/src/restore_s3.rs b/pageserver/src/restore_s3.rs index f3e642df67..d3cc86e4e0 100644 --- a/pageserver/src/restore_s3.rs +++ b/pageserver/src/restore_s3.rs @@ -38,12 +38,9 @@ pub fn restore_main(conf: &PageServerConf) { let result = restore_chunk(conf).await; match result { - Ok(_) => { - return; - } + Ok(_) => {} Err(err) => { error!("S3 error: {}", err); - return; } } }); @@ -199,7 +196,7 @@ fn parse_filename(fname: &str) -> Result<(u32, u32, u32, u64), FilePathError> { .ok_or_else(|| FilePathError::new("invalid relation data file name"))?; let relnode_str = caps.name("relnode").unwrap().as_str(); - let relnode = u32::from_str_radix(relnode_str, 10)?; + let relnode: u32 = relnode_str.parse()?; let forkname_match = caps.name("forkname"); let forkname = if forkname_match.is_none() { @@ -213,14 +210,14 @@ fn parse_filename(fname: &str) -> Result<(u32, u32, u32, u64), FilePathError> { let segno = if segno_match.is_none() { 0 } else { - u32::from_str_radix(segno_match.unwrap().as_str(), 10)? + segno_match.unwrap().as_str().parse::()? }; - let lsn_hi = u64::from_str_radix(caps.name("lsnhi").unwrap().as_str(), 16)?; - let lsn_lo = u64::from_str_radix(caps.name("lsnlo").unwrap().as_str(), 16)?; + let lsn_hi: u64 = caps.name("lsnhi").unwrap().as_str().parse()?; + let lsn_lo: u64 = caps.name("lsnlo").unwrap().as_str().parse()?; let lsn = lsn_hi << 32 | lsn_lo; - return Ok((relnode, forknum, segno, lsn)); + Ok((relnode, forknum, segno, lsn)) } fn parse_rel_file_path(path: &str) -> Result { @@ -244,20 +241,20 @@ fn parse_rel_file_path(path: &str) -> Result Result slog_scope::GlobalLoggerGuard { { return true; } - return false; + false }) .fuse(); @@ -41,7 +41,7 @@ pub fn init_logging() -> slog_scope::GlobalLoggerGuard { { return true; } - return false; + false }) .fuse(); @@ -52,7 +52,7 @@ pub fn init_logging() -> slog_scope::GlobalLoggerGuard { { return true; } - return false; + false }) .fuse(); @@ -65,7 +65,7 @@ pub fn init_logging() -> slog_scope::GlobalLoggerGuard { { return true; } - return false; + false }) .fuse(); @@ -84,11 +84,11 @@ pub fn init_logging() -> slog_scope::GlobalLoggerGuard { return true; } - return false; + false }) .fuse(); let logger = slog::Logger::root(drain, slog::o!()); - return slog_scope::set_global_logger(logger); + slog_scope::set_global_logger(logger) } pub fn ui_main() -> Result<(), Box> { diff --git a/pageserver/src/tui_event.rs b/pageserver/src/tui_event.rs index 5546b680ee..d88cac5d5b 100644 --- a/pageserver/src/tui_event.rs +++ b/pageserver/src/tui_event.rs @@ -76,8 +76,8 @@ impl Events { }; Events { rx, - ignore_exit_key, input_handle, + ignore_exit_key, tick_handle, } } diff --git a/pageserver/src/tui_logger.rs b/pageserver/src/tui_logger.rs index dcb4a23467..663add4065 100644 --- a/pageserver/src/tui_logger.rs +++ b/pageserver/src/tui_logger.rs @@ -51,7 +51,7 @@ impl Drain for TuiLogger { events.pop_back(); } - return Ok(()); + Ok(()) } } diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 61b140eda4..7bd7320691 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -227,7 +227,7 @@ impl WalStreamDecoder { // FIXME: check that hdr.xlp_rem_len matches self.contlen //println!("next xlog page (xlp_rem_len: {})", hdr.xlp_rem_len); - return hdr; + hdr } #[allow(non_snake_case)] @@ -239,7 +239,7 @@ impl WalStreamDecoder { xlp_xlog_blcksz: self.inputbuf.get_u32_le(), }; - return hdr; + hdr } } @@ -350,7 +350,7 @@ fn is_xlog_switch_record(rec: &Bytes) -> bool { buf.advance(2); // 2 bytes of padding let _xl_crc = buf.get_u32_le(); - return xl_info == pg_constants::XLOG_SWITCH && xl_rmid == pg_constants::RM_XLOG_ID; + xl_info == pg_constants::XLOG_SWITCH && xl_rmid == pg_constants::RM_XLOG_ID } #[derive(Clone, Copy)] @@ -651,6 +651,6 @@ pub fn decode_wal_record(rec: Bytes) -> DecodedWALRecord { DecodedWALRecord { record: rec, blocks, - main_data_offset: main_data_offset, + main_data_offset, } } diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 3e72b5e747..50be785aab 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -409,7 +409,7 @@ fn write_wal_file( let mut bytes_written: usize = 0; let mut partial; let mut start_pos = startpos; - const ZERO_BLOCK: &'static [u8] = &[0u8; XLOG_BLCKSZ]; + const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; let wal_dir = PathBuf::from(format!("timelines/{}/wal", timeline)); diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index d942029ee0..86e3ead64b 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -224,7 +224,7 @@ fn handle_apply_request( // Wake up the requester, whether the operation succeeded or not. entry_rc.walredo_condvar.notify_all(); - return result; + result } struct WalRedoProcess { @@ -317,7 +317,7 @@ impl WalRedoProcess { ) -> Result { let mut stdin = self.stdin.borrow_mut(); let mut stdout = self.stdout.borrow_mut(); - return runtime.block_on(async { + runtime.block_on(async { // // This async block sends all the commands to the process. // @@ -380,7 +380,7 @@ impl WalRedoProcess { let buf = res.0; Ok::(Bytes::from(std::vec::Vec::from(buf))) - }); + }) } } @@ -398,7 +398,7 @@ fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes { assert!(buf.len() == 1 + len); - return buf.freeze(); + buf.freeze() } fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes { @@ -418,7 +418,7 @@ fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes { assert!(buf.len() == 1 + len); - return buf.freeze(); + buf.freeze() } fn build_apply_record_msg(endlsn: u64, rec: Bytes) -> Bytes { @@ -432,7 +432,7 @@ fn build_apply_record_msg(endlsn: u64, rec: Bytes) -> Bytes { assert!(buf.len() == 1 + len); - return buf.freeze(); + buf.freeze() } fn build_get_page_msg(tag: BufferTag) -> Bytes { @@ -449,5 +449,5 @@ fn build_get_page_msg(tag: BufferTag) -> Bytes { assert!(buf.len() == 1 + len); - return buf.freeze(); + buf.freeze() } diff --git a/postgres_ffi/src/lib.rs b/postgres_ffi/src/lib.rs index b6cf6bdb2b..59cad0db39 100644 --- a/postgres_ffi/src/lib.rs +++ b/postgres_ffi/src/lib.rs @@ -18,13 +18,13 @@ impl ControlFileData { controlfile = unsafe { std::mem::transmute::<[u8; SIZEOF_CONTROLDATA], ControlFileData>(b) }; - return controlfile; + controlfile } } -pub fn decode_pg_control(buf: Bytes) -> Result { +pub fn decode_pg_control(mut buf: Bytes) -> Result { let mut b: [u8; SIZEOF_CONTROLDATA] = [0u8; SIZEOF_CONTROLDATA]; - buf.clone().copy_to_slice(&mut b); + buf.copy_to_slice(&mut b); let controlfile: ControlFileData; @@ -63,5 +63,5 @@ pub fn encode_pg_control(controlfile: ControlFileData) -> Bytes { // Fill the rest of the control file with zeros. buf.resize(PG_CONTROL_FILE_SIZE as usize, 0); - return buf.into(); + buf.into() } diff --git a/walkeeper/src/bin/wal_acceptor.rs b/walkeeper/src/bin/wal_acceptor.rs index 8dfa31e23b..57503b1912 100644 --- a/walkeeper/src/bin/wal_acceptor.rs +++ b/walkeeper/src/bin/wal_acceptor.rs @@ -69,7 +69,7 @@ fn main() -> Result<()> { let mut conf = WalAcceptorConf { data_dir: PathBuf::from("./"), - systemid: systemid, + systemid, daemonize: false, no_sync: false, pageserver_addr: None, diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index 64627d33b5..b02446cadf 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -444,7 +444,7 @@ impl Timeline { fn get_hs_feedback(&self) -> HotStandbyFeedback { let shared_state = self.mutex.lock().unwrap(); - return shared_state.hs_feedback; + shared_state.hs_feedback } // Load and lock control file (prevent running more than one instance of safekeeper) @@ -527,7 +527,7 @@ impl Timeline { let file = shared_state.control_file.as_mut().unwrap(); file.seek(SeekFrom::Start(0))?; - file.write_all(&mut buf[..])?; + file.write_all(&buf[..])?; if sync { file.sync_all()?; } diff --git a/walkeeper/src/xlog_utils.rs b/walkeeper/src/xlog_utils.rs index 7c18131186..c31a160cce 100644 --- a/walkeeper/src/xlog_utils.rs +++ b/walkeeper/src/xlog_utils.rs @@ -23,17 +23,17 @@ pub type XLogSegNo = u64; #[allow(non_snake_case)] pub fn XLogSegmentOffset(xlogptr: XLogRecPtr, wal_segsz_bytes: usize) -> u32 { - return (xlogptr as u32) & (wal_segsz_bytes as u32 - 1); + (xlogptr as u32) & (wal_segsz_bytes as u32 - 1) } #[allow(non_snake_case)] pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo { - return (0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo; + (0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo } #[allow(non_snake_case)] pub fn XLByteToSeg(xlogptr: XLogRecPtr, wal_segsz_bytes: usize) -> XLogSegNo { - return xlogptr / wal_segsz_bytes as u64; + xlogptr / wal_segsz_bytes as u64 } #[allow(non_snake_case)] @@ -42,7 +42,7 @@ pub fn XLogSegNoOffsetToRecPtr( offset: u32, wal_segsz_bytes: usize, ) -> XLogRecPtr { - return segno * (wal_segsz_bytes as u64) + (offset as u64); + segno * (wal_segsz_bytes as u64) + (offset as u64) } #[allow(non_snake_case)] @@ -60,7 +60,7 @@ pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLin let tli = u32::from_str_radix(&fname[0..8], 16).unwrap(); let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo; let seg = u32::from_str_radix(&fname[16..24], 16).unwrap() as XLogSegNo; - return (log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli); + (log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli) } #[allow(non_snake_case)] @@ -70,7 +70,7 @@ pub fn IsXLogFileName(fname: &str) -> bool { #[allow(non_snake_case)] pub fn IsPartialXLogFileName(fname: &str) -> bool { - return fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8]); + fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8]) } pub fn get_current_timestamp() -> TimestampTz { @@ -181,7 +181,7 @@ fn find_end_of_wal_segment( } } } - return last_valid_rec_pos as u32; + last_valid_rec_pos as u32 } pub fn find_end_of_wal( @@ -237,7 +237,7 @@ pub fn find_end_of_wal( let high_ptr = XLogSegNoOffsetToRecPtr(high_segno, high_offs, wal_seg_size); return (high_ptr, high_tli); } - return (0, 0); + (0, 0) } pub fn main() { diff --git a/zenith/src/main.rs b/zenith/src/main.rs index 53d1528a6b..05055f5c9e 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -76,7 +76,7 @@ fn main() -> Result<()> { // all other commands would need config - let repopath = PathBuf::from(zenith_repo_dir()); + let repopath = zenith_repo_dir(); if !repopath.exists() { bail!( "Zenith repository does not exists in {}.\n\ From 8060e17b50c585001d418e5020c1d2575b63206e Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Tue, 20 Apr 2021 11:40:45 -0700 Subject: [PATCH 2/5] add SeqWait SeqWait adds a way to .await the arrival of some sequence number. It provides wait_for(num) which is an async fn, and advance(num) which is synchronous. This should be useful in solving the page cache deadlocks, and may be useful in other areas too. This implementation still uses a Mutex internally, but only for a brief critical section. If we find this code broadly useful and start to care more about executor stalls due to unfair thread scheduling, there might be ways to make it lock-free. --- Cargo.lock | 4 + zenith_utils/Cargo.toml | 5 + zenith_utils/src/lib.rs | 2 + zenith_utils/src/seqwait.rs | 200 ++++++++++++++++++++++++++++++++++++ 4 files changed, 211 insertions(+) create mode 100644 zenith_utils/src/seqwait.rs diff --git a/Cargo.lock b/Cargo.lock index 585131dbb3..94e181ebe1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2672,3 +2672,7 @@ dependencies = [ [[package]] name = "zenith_utils" version = "0.1.0" +dependencies = [ + "thiserror", + "tokio", +] diff --git a/zenith_utils/Cargo.toml b/zenith_utils/Cargo.toml index 77bc1e9ecb..a26a772c97 100644 --- a/zenith_utils/Cargo.toml +++ b/zenith_utils/Cargo.toml @@ -5,3 +5,8 @@ authors = ["Eric Seppanen "] edition = "2018" [dependencies] +tokio = { version = "1.5", features = ["sync", "time" ] } +thiserror = "1" + +[dev-dependencies] +tokio = { version = "1.5", features = ["macros", "rt"] } diff --git a/zenith_utils/src/lib.rs b/zenith_utils/src/lib.rs index 2d86ad041f..8acd9cb84b 100644 --- a/zenith_utils/src/lib.rs +++ b/zenith_utils/src/lib.rs @@ -1,2 +1,4 @@ //! zenith_utils is intended to be a place to put code that is shared //! between other crates in this repository. + +pub mod seqwait; diff --git a/zenith_utils/src/seqwait.rs b/zenith_utils/src/seqwait.rs new file mode 100644 index 0000000000..91805dc19e --- /dev/null +++ b/zenith_utils/src/seqwait.rs @@ -0,0 +1,200 @@ +use std::collections::BTreeMap; +use std::mem; +use std::sync::Mutex; +use std::time::Duration; +use tokio::sync::watch::{channel, Receiver, Sender}; +use tokio::time::timeout; + +/// An error happened while waiting for a number +#[derive(Debug, PartialEq, thiserror::Error)] +#[error("SeqWaitError")] +pub enum SeqWaitError { + /// The wait timeout was reached + Timeout, + /// [`SeqWait::shutdown`] was called + Shutdown, +} + +/// Internal components of a `SeqWait` +struct SeqWaitInt { + waiters: BTreeMap, Receiver<()>)>, + current: u64, + shutdown: bool, +} + +/// A tool for waiting on a sequence number +/// +/// This provides a way to await the arrival of a number. +/// As soon as the number arrives by another caller calling +/// [`advance`], then the waiter will be woken up. +/// +/// This implementation takes a blocking Mutex on both [`wait_for`] +/// and [`advance`], meaning there may be unexpected executor blocking +/// due to thread scheduling unfairness. There are probably better +/// implementations, but we can probably live with this for now. +/// +/// [`wait_for`]: SeqWait::wait_for +/// [`advance`]: SeqWait::advance +/// +pub struct SeqWait { + internal: Mutex, +} + +impl SeqWait { + /// Create a new `SeqWait`, initialized to a particular number + pub fn new(starting_num: u64) -> Self { + let internal = SeqWaitInt { + waiters: BTreeMap::new(), + current: starting_num, + shutdown: false, + }; + SeqWait { + internal: Mutex::new(internal), + } + } + + /// Shut down a `SeqWait`, causing all waiters (present and + /// future) to return an error. + pub fn shutdown(&self) { + let waiters = { + // Prevent new waiters; wake all those that exist. + // Wake everyone with an error. + let mut internal = self.internal.lock().unwrap(); + + // This will steal the entire waiters map. + // When we drop it all waiters will be woken. + mem::take(&mut internal.waiters); + + // Drop the lock as we exit this scope. + }; + + // When we drop the waiters list, each Receiver will + // be woken with an error. + // This drop doesn't need to be explicit; it's done + // here to make it easier to read the code and understand + // the order of events. + drop(waiters); + } + + /// Wait for a number to arrive + /// + /// This call won't complete until someone has called `advance` + /// with a number greater than or equal to the one we're waiting for. + pub async fn wait_for(&self, num: u64) -> Result<(), SeqWaitError> { + let mut rx = { + let mut internal = self.internal.lock().unwrap(); + if internal.current >= num { + return Ok(()); + } + if internal.shutdown { + return Err(SeqWaitError::Shutdown); + } + + // If we already have a channel for waiting on this number, reuse it. + if let Some((_, rx)) = internal.waiters.get_mut(&num) { + // an Err from changed() means the sender was dropped. + rx.clone() + } else { + // Create a new channel. + let (tx, rx) = channel(()); + internal.waiters.insert(num, (tx, rx.clone())); + rx + } + // Drop the lock as we exit this scope. + }; + rx.changed().await.map_err(|_| SeqWaitError::Shutdown) + } + + /// Wait for a number to arrive + /// + /// This call won't complete until someone has called `advance` + /// with a number greater than or equal to the one we're waiting for. + /// + /// If that hasn't happened after the specified timeout duration, + /// [`SeqWaitError::Timeout`] will be returned. + pub async fn wait_for_timeout( + &self, + num: u64, + timeout_duration: Duration, + ) -> Result<(), SeqWaitError> { + timeout(timeout_duration, self.wait_for(num)) + .await + .unwrap_or(Err(SeqWaitError::Timeout)) + } + + /// Announce a new number has arrived + /// + /// All waiters at this value or below will be woken. + /// + /// `advance` will panic if you send it a lower number than + /// a previous call. + pub fn advance(&self, num: u64) { + let wake_these = { + let mut internal = self.internal.lock().unwrap(); + + if internal.current > num { + panic!( + "tried to advance backwards, from {} to {}", + internal.current, num + ); + } + internal.current = num; + + // split_off will give me all the high-numbered waiters, + // so split and then swap. Everything at or above (num + 1) + // gets to stay. + let mut split = internal.waiters.split_off(&(num + 1)); + std::mem::swap(&mut split, &mut internal.waiters); + split + }; + + for (_wake_num, (tx, _rx)) in wake_these { + // This can fail if there are no receivers. + // We don't care; discard the error. + let _ = tx.send(()); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + use tokio::time::{sleep, Duration}; + + #[tokio::test] + async fn seqwait() { + let seq = Arc::new(SeqWait::new(0)); + let seq2 = Arc::clone(&seq); + let seq3 = Arc::clone(&seq); + tokio::spawn(async move { + seq2.wait_for(42).await.expect("wait_for 42"); + seq2.advance(100); + seq2.wait_for(999).await.expect_err("no 999"); + }); + tokio::spawn(async move { + seq3.wait_for(42).await.expect("wait_for 42"); + seq3.wait_for(0).await.expect("wait_for 0"); + }); + sleep(Duration::from_secs(1)).await; + seq.advance(99); + seq.wait_for(100).await.expect("wait_for 100"); + seq.shutdown(); + } + + #[tokio::test] + async fn seqwait_timeout() { + let seq = Arc::new(SeqWait::new(0)); + let seq2 = Arc::clone(&seq); + tokio::spawn(async move { + let timeout = Duration::from_millis(1); + let res = seq2.wait_for_timeout(42, timeout).await; + assert_eq!(res, Err(SeqWaitError::Timeout)); + }); + sleep(Duration::from_secs(1)).await; + // This will attempt to wake, but nothing will happen + // because the waiter already dropped its Receiver. + seq.advance(99); + } + +} From 2cd730d31fa940ab8699a8a50f396f9fc7035e26 Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Tue, 20 Apr 2021 12:30:21 -0700 Subject: [PATCH 3/5] page_cache: replace long mutex sleep with SeqWait When calling into the page cache, it was possible to wait on a blocking mutex, which can stall the async executor. Replace that sleep with a SeqWait::wait_for(lsn).await so that the executor can go on with other work while we wait. Change walreceiver_works to an AtomicBool to avoid the awkwardness of taking the lock, then dropping it while we call wait_for and then acquiring it again to do real work. --- Cargo.lock | 1 + pageserver/Cargo.toml | 1 + pageserver/src/page_cache.rs | 89 +++++++++++++++------------------- pageserver/src/page_service.rs | 2 +- 4 files changed, 41 insertions(+), 52 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 94e181ebe1..5e8d3dd20f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1339,6 +1339,7 @@ dependencies = [ "tokio-stream", "tui", "walkdir", + "zenith_utils", ] [[package]] diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 05f5213ac4..b161be36fe 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -40,3 +40,4 @@ hex = "0.4.3" tar = "0.4.33" postgres_ffi = { path = "../postgres_ffi" } +zenith_utils = { path = "../zenith_utils" } diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 7eb741e5f2..7db50aee1f 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -9,7 +9,7 @@ use crate::restore_local_repo::restore_timeline; use crate::ZTimelineId; use crate::{walredo, PageServerConf}; -use anyhow::bail; +use anyhow::{bail, Context}; use bytes::Bytes; use core::ops::Bound::Included; use crossbeam_channel::unbounded; @@ -18,12 +18,13 @@ use lazy_static::lazy_static; use log::*; use rand::Rng; use std::collections::{BTreeMap, HashMap}; -use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; +use std::sync::atomic::{AtomicBool, AtomicU64}; use std::sync::{Arc, Condvar, Mutex}; use std::thread; use std::time::Duration; use std::{convert::TryInto, ops::AddAssign}; +use zenith_utils::seqwait::SeqWait; // Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call. static TIMEOUT: Duration = Duration::from_secs(60); @@ -35,7 +36,8 @@ pub struct PageCache { pub walredo_sender: Sender>, pub walredo_receiver: Receiver>, - valid_lsn_condvar: Condvar, + // Allows .await on the arrival of a particular LSN. + seqwait_lsn: SeqWait, // Counters, for metrics collection. pub num_entries: AtomicU64, @@ -48,6 +50,7 @@ pub struct PageCache { pub first_valid_lsn: AtomicU64, pub last_valid_lsn: AtomicU64, pub last_record_lsn: AtomicU64, + walreceiver_works: AtomicBool, } #[derive(Clone)] @@ -106,7 +109,6 @@ struct PageCacheShared { first_valid_lsn: u64, last_valid_lsn: u64, last_record_lsn: u64, - walreceiver_works: bool, } lazy_static! { @@ -170,9 +172,8 @@ fn init_page_cache() -> PageCache { first_valid_lsn: 0, last_valid_lsn: 0, last_record_lsn: 0, - walreceiver_works: false, }), - valid_lsn_condvar: Condvar::new(), + seqwait_lsn: SeqWait::new(0), walredo_sender: s, walredo_receiver: r, @@ -185,6 +186,7 @@ fn init_page_cache() -> PageCache { first_valid_lsn: AtomicU64::new(0), last_valid_lsn: AtomicU64::new(0), last_record_lsn: AtomicU64::new(0), + walreceiver_works: AtomicBool::new(false), } } @@ -276,7 +278,7 @@ impl PageCache { // // Returns an 8k page image // - pub fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> anyhow::Result { + pub async fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> anyhow::Result { self.num_getpage_requests.fetch_add(1, Ordering::Relaxed); // Look up cache entry. If it's a page image, return that. If it's a WAL record, @@ -284,50 +286,32 @@ impl PageCache { let minkey = CacheKey { tag, lsn: 0 }; let maxkey = CacheKey { tag, lsn }; - let entry_rc: Arc; - { - let mut shared = self.shared.lock().unwrap(); - let mut waited = false; - + if self.walreceiver_works.load(Ordering::Acquire) { + self.seqwait_lsn + .wait_for_timeout(lsn, TIMEOUT) + .await + .with_context(|| { + format!( + "Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive", + lsn >> 32, + lsn & 0xffff_ffff + ) + })?; + } else { // There is a a race at postgres instance start // when we request a page before walsender established connection // and was able to stream the page. Just don't wait and return what we have. // TODO is there any corner case when this is incorrect? - if !shared.walreceiver_works { - trace!( - " walreceiver doesn't work yet last_valid_lsn {}, requested {}", - shared.last_valid_lsn, - lsn - ); - } + trace!( + "walreceiver doesn't work yet last_valid_lsn {}, requested {}", + self.last_valid_lsn.load(Ordering::Acquire), + lsn + ); + } - if shared.walreceiver_works { - while lsn > shared.last_valid_lsn { - // TODO: Wait for the WAL receiver to catch up - waited = true; - trace!( - "not caught up yet: {}, requested {}", - shared.last_valid_lsn, - lsn - ); - let wait_result = self - .valid_lsn_condvar - .wait_timeout(shared, TIMEOUT) - .unwrap(); - - shared = wait_result.0; - if wait_result.1.timed_out() { - bail!( - "Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive", - lsn >> 32, - lsn & 0xffff_ffff - ); - } - } - } - if waited { - trace!("caught up now, continuing"); - } + let entry_rc: Arc; + { + let shared = self.shared.lock().unwrap(); if lsn < shared.first_valid_lsn { bail!( @@ -540,11 +524,11 @@ impl PageCache { if lsn >= oldlsn { // Now we receive entries from walreceiver and should wait if from_walreceiver { - shared.walreceiver_works = true; + self.walreceiver_works.store(true, Ordering::Release); } shared.last_valid_lsn = lsn; - self.valid_lsn_condvar.notify_all(); + self.seqwait_lsn.advance(lsn); self.last_valid_lsn.store(lsn, Ordering::Relaxed); } else { @@ -570,7 +554,7 @@ impl PageCache { shared.last_valid_lsn = lsn; shared.last_record_lsn = lsn; - self.valid_lsn_condvar.notify_all(); + self.seqwait_lsn.advance(lsn); self.last_valid_lsn.store(lsn, Ordering::Relaxed); self.last_record_lsn.store(lsn, Ordering::Relaxed); @@ -620,7 +604,7 @@ impl PageCache { // 2. Request that page with GetPage@LSN, using Max LSN (i.e. get the latest page version) // // - pub fn _test_get_page_at_lsn(&self) { + pub async fn _test_get_page_at_lsn(&self) { // for quick testing of the get_page_at_lsn() funcion. // // Get a random page from the page cache. Apply all its WAL, by requesting @@ -650,7 +634,10 @@ impl PageCache { } info!("testing GetPage@LSN for block {}", tag.unwrap().blknum); - match self.get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee) { + match self + .get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee) + .await + { Ok(_img) => { // This prints out the whole page image. //println!("{:X?}", img); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index f95dd84039..99e93c3925 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -852,7 +852,7 @@ impl Connection { blknum: req.blkno, }; - let msg = match pcache.get_page_at_lsn(buf_tag, req.lsn) { + let msg = match pcache.get_page_at_lsn(buf_tag, req.lsn).await { Ok(p) => BeMessage::ZenithReadResponse(ZenithReadResponse { ok: true, n_blocks: 0, From 9b71ae7dce74da148f186baa4ce185d8b7a9b06c Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Wed, 21 Apr 2021 12:45:22 -0700 Subject: [PATCH 4/5] page_cache: add an assert on the last_valid_lsn --- pageserver/src/page_cache.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 7db50aee1f..85980b08ff 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -286,7 +286,8 @@ impl PageCache { let minkey = CacheKey { tag, lsn: 0 }; let maxkey = CacheKey { tag, lsn }; - if self.walreceiver_works.load(Ordering::Acquire) { + let walreceiver_works = self.walreceiver_works.load(Ordering::Acquire); + if walreceiver_works { self.seqwait_lsn .wait_for_timeout(lsn, TIMEOUT) .await @@ -313,6 +314,10 @@ impl PageCache { { let shared = self.shared.lock().unwrap(); + if walreceiver_works { + assert!(lsn <= shared.last_valid_lsn); + } + if lsn < shared.first_valid_lsn { bail!( "LSN {:X}/{:X} has already been removed", From a4fd1e1a808e6acf159ea298cb1c9434bb05880b Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 22 Apr 2021 09:20:05 +0300 Subject: [PATCH 5/5] Cleanup more issues noted by 'clippy' Mostly stuff that was introduced by commit 3600b33f1c. --- control_plane/src/compute.rs | 6 +++--- pageserver/src/basebackup.rs | 2 +- pageserver/src/bin/pageserver.rs | 4 ++-- pageserver/src/page_cache.rs | 2 +- pageserver/src/page_service.rs | 14 +++++++------- pageserver/src/restore_local_repo.rs | 7 +++---- pageserver/src/walredo.rs | 8 ++++---- walkeeper/src/pq_protocol.rs | 4 ++-- walkeeper/src/wal_service.rs | 16 ++++++++-------- zenith/src/main.rs | 19 ++++++++----------- 10 files changed, 39 insertions(+), 43 deletions(-) diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index f95c2ba7b1..0b38af94b1 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -189,11 +189,11 @@ impl PostgresNode { ); let port: u16 = CONF_PORT_RE .captures(config.as_str()) - .ok_or(anyhow::Error::msg(err_msg.clone() + " 1"))? + .ok_or_else(|| anyhow::Error::msg(err_msg.clone() + " 1"))? .iter() .last() - .ok_or(anyhow::Error::msg(err_msg.clone() + " 2"))? - .ok_or(anyhow::Error::msg(err_msg.clone() + " 3"))? + .ok_or_else(|| anyhow::Error::msg(err_msg.clone() + " 2"))? + .ok_or_else(|| anyhow::Error::msg(err_msg.clone() + " 3"))? .as_str() .parse() .with_context(|| err_msg)?; diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 6df812cde9..9141155d86 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -174,7 +174,7 @@ fn parse_rel_file_path(path: &str) -> Result<(), FilePathError> { Ok(()) } else if let Some(dbpath) = path.strip_prefix("base/") { - let mut s = dbpath.split("/"); + let mut s = dbpath.split('/'); let dbnode_str = s .next() .ok_or_else(|| FilePathError::new("invalid relation data file name"))?; diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 98c5eecee2..495ec41b65 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -169,9 +169,9 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> { .unwrap(); threads.push(page_server_thread); - if tui_thread.is_some() { + if let Some(tui_thread) = tui_thread { // The TUI thread exits when the user asks to Quit. - tui_thread.unwrap().join().unwrap(); + tui_thread.join().unwrap(); } else { // In non-interactive mode, wait forever. for t in threads { diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 85980b08ff..8b9692aaff 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -487,7 +487,7 @@ impl PageCache { let oldentry = shared.pagecache.insert(key, Arc::new(entry)); self.num_entries.fetch_add(1, Ordering::Relaxed); - if !oldentry.is_none() { + if oldentry.is_some() { error!( "overwriting WAL record with LSN {:X}/{:X} in page cache", lsn >> 32, diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 99e93c3925..db81333747 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -203,7 +203,7 @@ impl FeParseMessage { // now, just ignore the statement name, assuming that the client never // uses more than one prepared statement at a time. /* - if pstmt_name.len() != 0 { + if !pstmt_name.is_empty() { return Err(io::Error::new( io::ErrorKind::InvalidInput, "named prepared statements not implemented in Parse", @@ -235,7 +235,7 @@ impl FeDescribeMessage { // FIXME: see FeParseMessage::parse /* - if pstmt_name.len() != 0 { + if !pstmt_name.is_empty() { return Err(io::Error::new( io::ErrorKind::InvalidInput, "named prepared statements not implemented in Describe", @@ -266,7 +266,7 @@ impl FeExecuteMessage { let portal_name = read_null_terminated(&mut buf)?; let maxrows = buf.get_i32(); - if portal_name.len() != 0 { + if !portal_name.is_empty() { return Err(io::Error::new( io::ErrorKind::InvalidInput, "named portals not implemented", @@ -293,7 +293,7 @@ impl FeBindMessage { let portal_name = read_null_terminated(&mut buf)?; let _pstmt_name = read_null_terminated(&mut buf)?; - if portal_name.len() != 0 { + if !portal_name.is_empty() { return Err(io::Error::new( io::ErrorKind::InvalidInput, "named portals not implemented", @@ -302,7 +302,7 @@ impl FeBindMessage { // FIXME: see FeParseMessage::parse /* - if pstmt_name.len() != 0 { + if !pstmt_name.is_empty() { return Err(io::Error::new( io::ErrorKind::InvalidInput, "named prepared statements not implemented", @@ -941,10 +941,10 @@ impl Connection { let f_tar2 = async { let joinres = f_tar.await; - if joinres.is_err() { + if let Err(joinreserr) = joinres { return Err(io::Error::new( io::ErrorKind::InvalidData, - joinres.unwrap_err(), + joinreserr, )); } joinres.unwrap() diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 8716536fb2..5cd927486a 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -186,10 +186,9 @@ fn restore_relfile( // Does it look like a relation file? let p = parse_relfilename(path.file_name().unwrap().to_str().unwrap()); - if p.is_err() { - let e = p.unwrap_err(); + if let Err(e) = p { warn!("unrecognized file in snapshot: {:?} ({})", path, e); - return Err(e)?; + return Err(e.into()); } let (relnode, forknum, segno) = p.unwrap(); @@ -266,7 +265,7 @@ fn restore_wal( // It could be as .partial if !PathBuf::from(&path).exists() { - path = path + ".partial"; + path += ".partial"; } // Slurp the WAL file diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 86e3ead64b..bf0159af4d 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -388,7 +388,7 @@ fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes { let len = 4 + 5 * 4; let mut buf = BytesMut::with_capacity(1 + len); - buf.put_u8('B' as u8); + buf.put_u8(b'B'); buf.put_u32(len as u32); buf.put_u32(tag.spcnode); buf.put_u32(tag.dbnode); @@ -407,7 +407,7 @@ fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes { let len = 4 + 5 * 4 + base_img.len(); let mut buf = BytesMut::with_capacity(1 + len); - buf.put_u8('P' as u8); + buf.put_u8(b'P'); buf.put_u32(len as u32); buf.put_u32(tag.spcnode); buf.put_u32(tag.dbnode); @@ -425,7 +425,7 @@ fn build_apply_record_msg(endlsn: u64, rec: Bytes) -> Bytes { let len = 4 + 8 + rec.len(); let mut buf = BytesMut::with_capacity(1 + len); - buf.put_u8('A' as u8); + buf.put_u8(b'A'); buf.put_u32(len as u32); buf.put_u64(endlsn); buf.put(rec); @@ -439,7 +439,7 @@ fn build_get_page_msg(tag: BufferTag) -> Bytes { let len = 4 + 5 * 4; let mut buf = BytesMut::with_capacity(1 + len); - buf.put_u8('G' as u8); + buf.put_u8(b'G'); buf.put_u32(len as u32); buf.put_u32(tag.spcnode); buf.put_u32(tag.dbnode); diff --git a/walkeeper/src/pq_protocol.rs b/walkeeper/src/pq_protocol.rs index f6e18d9aa4..57517c322f 100644 --- a/walkeeper/src/pq_protocol.rs +++ b/walkeeper/src/pq_protocol.rs @@ -91,9 +91,9 @@ impl FeStartupMessage { options = true; } else if options { for opt in p.split(' ') { - if opt.starts_with("ztimelineid=") { + if let Some(ztimelineid_str) = opt.strip_prefix("ztimelineid=") { // FIXME: rethrow parsing error, don't unwrap - timelineid = Some(ZTimelineId::from_str(&opt[12..]).unwrap()); + timelineid = Some(ZTimelineId::from_str(ztimelineid_str).unwrap()); break; } } diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index b02446cadf..9d7e6a8bfc 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -554,7 +554,7 @@ impl Connection { async fn run(&mut self) -> Result<()> { self.inbuf.resize(4, 0u8); self.stream.read_exact(&mut self.inbuf[0..4]).await?; - let startup_pkg_len = BigEndian::read_u32(&mut self.inbuf[0..4]); + let startup_pkg_len = BigEndian::read_u32(&self.inbuf[0..4]); if startup_pkg_len == 0 { self.receive_wal().await?; // internal protocol between wal_proposer and wal_acceptor } else { @@ -997,12 +997,12 @@ impl Connection { // Try to fetch replica's feedback match self.stream.try_read_buf(&mut self.inbuf) { Ok(0) => break, - Ok(_) => match self.parse_message()? { - Some(FeMessage::CopyData(m)) => self - .timeline() - .add_hs_feedback(HotStandbyFeedback::parse(&m.body)), - _ => {} - }, + Ok(_) => { + if let Some(FeMessage::CopyData(m)) = self.parse_message()? { + self.timeline() + .add_hs_feedback(HotStandbyFeedback::parse(&m.body)) + } + } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} Err(e) => { return Err(e); @@ -1102,7 +1102,7 @@ impl Connection { let mut bytes_written: usize = 0; let mut partial; let mut start_pos = startpos; - const ZERO_BLOCK: &'static [u8] = &[0u8; XLOG_BLCKSZ]; + const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; /* Extract WAL location for this block */ let mut xlogoff = XLogSegmentOffset(start_pos, wal_seg_size) as usize; diff --git a/zenith/src/main.rs b/zenith/src/main.rs index 05055f5c9e..8cbd97e1ea 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -186,7 +186,7 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { let node = cplane .nodes .get(name) - .ok_or(anyhow!("postgres {} is not found", name))?; + .ok_or_else(|| anyhow!("postgres {} is not found", name))?; node.start()?; } ("stop", Some(sub_m)) => { @@ -194,7 +194,7 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { let node = cplane .nodes .get(name) - .ok_or(anyhow!("postgres {} is not found", name))?; + .ok_or_else(|| anyhow!("postgres {} is not found", name))?; node.stop()?; } @@ -277,19 +277,19 @@ fn list_branches() -> Result<()> { // // fn parse_point_in_time(s: &str) -> Result { - let mut strings = s.split("@"); + let mut strings = s.split('@'); let name = strings.next().unwrap(); let lsn: Option; if let Some(lsnstr) = strings.next() { - let mut s = lsnstr.split("/"); + let mut s = lsnstr.split('/'); let lsn_hi: u64 = s .next() - .ok_or(anyhow!("invalid LSN in point-in-time specification"))? + .ok_or_else(|| anyhow!("invalid LSN in point-in-time specification"))? .parse()?; let lsn_lo: u64 = s .next() - .ok_or(anyhow!("invalid LSN in point-in-time specification"))? + .ok_or_else(|| anyhow!("invalid LSN in point-in-time specification"))? .parse()?; lsn = Some(lsn_hi << 32 | lsn_lo); } else { @@ -312,11 +312,8 @@ fn parse_point_in_time(s: &str) -> Result { let pointstr = fs::read_to_string(branchpath)?; let mut result = parse_point_in_time(&pointstr)?; - if lsn.is_some() { - result.lsn = lsn.unwrap(); - } else { - result.lsn = 0; - } + + result.lsn = lsn.unwrap_or(0); return Ok(result); }