diff --git a/walkeeper/src/pq_protocol.rs b/walkeeper/src/pq_protocol.rs index 97fae0936c..25d801f714 100644 --- a/walkeeper/src/pq_protocol.rs +++ b/walkeeper/src/pq_protocol.rs @@ -7,7 +7,6 @@ use std::str::FromStr; pub type Oid = u32; pub type SystemId = u64; -pub type Result = std::result::Result; #[derive(Debug)] pub enum FeMessage { @@ -52,7 +51,7 @@ pub enum StartupRequestCode { } impl FeStartupMessage { - pub fn parse(buf: &mut BytesMut) -> Result> { + pub fn parse(buf: &mut BytesMut) -> io::Result> { const MAX_STARTUP_PACKET_LENGTH: usize = 10000; const CANCEL_REQUEST_CODE: u32 = (1234 << 16) | 5678; const NEGOTIATE_SSL_CODE: u32 = (1234 << 16) | 5679; @@ -202,7 +201,7 @@ impl<'a> BeMessage<'a> { } impl FeMessage { - pub fn parse(buf: &mut BytesMut) -> Result> { + pub fn parse(buf: &mut BytesMut) -> io::Result> { if buf.len() < 5 { let to_read = 5 - buf.len(); buf.reserve(to_read); diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index 8d790634ac..2a4dc7f7c5 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -2,6 +2,7 @@ /// WAL service listens for client connections and /// receive WAL from wal_proposer and send it to WAL receivers /// +use anyhow::{bail, Result}; use byteorder::{BigEndian, ByteOrder}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use fs2::FileExt; @@ -13,7 +14,7 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::cmp::{max, min}; use std::collections::HashMap; use std::fs::{self, File, OpenOptions}; -use std::io::{self, prelude::*, SeekFrom}; +use std::io::{self, Read, Seek, SeekFrom, Write}; use std::mem; use std::net::{TcpListener, TcpStream}; use std::str; @@ -220,18 +221,9 @@ trait NewSerializer: Serialize + DeserializeOwned { impl NewSerializer for T where T: Serialize + DeserializeOwned {} -/// Report and return IO error -macro_rules! io_error { - ($($arg:tt)*) => (error!($($arg)*); return Err(io::Error::new(io::ErrorKind::Other,format!($($arg)*)))) -} - /// Safe hex string parser returning proper result fn parse_hex_str(s: &str) -> Result { - if let Ok(val) = u32::from_str_radix(s, 16) { - Ok(val as u64) - } else { - io_error!("Invalid hex number {}", s); - } + Ok(u64::from_str_radix(s, 16)?) } impl SafeKeeperInfo { @@ -381,7 +373,7 @@ impl Timeline { match file.try_lock_exclusive() { Ok(()) => {} Err(e) => { - io_error!( + bail!( "Control file {:?} is locked by some other process: {}", &control_file_path, e @@ -404,10 +396,10 @@ impl Timeline { let my_info = SafeKeeperInfo::unpack(&mut input); if my_info.magic != SK_MAGIC { - io_error!("Invalid control file magic: {}", my_info.magic); + bail!("Invalid control file magic: {}", my_info.magic); } if my_info.format_version != SK_FORMAT_VERSION { - io_error!( + bail!( "Incompatible format version: {} vs. {}", my_info.format_version, SK_FORMAT_VERSION @@ -532,7 +524,7 @@ impl Connection { /* Check protocol compatibility */ if server_info.protocol_version != SK_PROTOCOL_VERSION { - io_error!( + bail!( "Incompatible protocol version {} vs. {}", server_info.protocol_version, SK_PROTOCOL_VERSION @@ -570,7 +562,7 @@ impl Connection { self.start_sending(); my_info.server.node_id.pack(&mut self.outbuf); self.send()?; - io_error!( + bail!( "Reject connection attempt with term {} because my term is {}", hex::encode(prop.node_id.term), hex::encode(my_info.server.node_id.term) @@ -610,7 +602,7 @@ impl Connection { /* Receive message header */ let req = self.read_req::()?; if req.sender_id != my_info.server.node_id { - io_error!("Sender NodeId is changed"); + bail!("Sender NodeId is changed"); } if req.begin_lsn == END_OF_STREAM { info!("Server stops streaming"); @@ -700,7 +692,7 @@ impl Connection { if self.inbuf.is_empty() { return Ok(None); } else { - io_error!("connection reset by peer"); + bail!("connection reset by peer"); } } } @@ -710,11 +702,12 @@ impl Connection { // Parse libpq message // fn parse_message(&mut self) -> Result> { - if !self.init_done { - FeStartupMessage::parse(&mut self.inbuf) + let msg = if !self.init_done { + FeStartupMessage::parse(&mut self.inbuf)? } else { - FeMessage::parse(&mut self.inbuf) - } + FeMessage::parse(&mut self.inbuf)? + }; + Ok(msg) } // @@ -728,7 +721,7 @@ impl Connection { // Send buffered messages // fn send(&mut self) -> Result<()> { - self.stream.write_all(&self.outbuf) + Ok(self.stream.write_all(&self.outbuf)?) } // @@ -772,7 +765,7 @@ impl Connection { break; } _ => { - io_error!("unexpected message"); + bail!("unexpected message"); } } } @@ -845,7 +838,7 @@ impl Connection { }; let wal_seg_size = self.timeline().get_info().server.wal_seg_size as usize; if wal_seg_size == 0 { - io_error!("Can not start replication before connecting to wal_proposer"); + bail!("Can not start replication before connecting to wal_proposer"); } let (wal_end, timeline) = self.find_end_of_wal(false); if start_pos == 0 { @@ -913,7 +906,7 @@ impl Connection { } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} Err(e) => { - return Err(e); + return Err(e.into()); } } @@ -942,7 +935,7 @@ impl Connection { Ok(opened_file) => file = opened_file, Err(e) => { error!("Failed to open log file {:?}: {}", &wal_file_path, e); - return Err(e); + return Err(e.into()); } } } @@ -995,7 +988,7 @@ impl Connection { } else if q.body.starts_with(b"START_REPLICATION") { self.handle_start_replication(&q.body) } else { - io_error!("Unexpected command {:?}", q.body); + bail!("Unexpected command {:?}", q.body); } } @@ -1069,7 +1062,7 @@ impl Connection { } Err(e) => { error!("Failed to open log file {:?}: {}", &wal_file_path, e); - return Err(e); + return Err(e.into()); } } }