wal_service: use anyhow for error handling

We may eventually want precise error types for some of this, but
anyhow::Error is a lot easier than trying to force io::Error.
This commit is contained in:
Eric Seppanen
2021-05-07 12:53:54 -07:00
parent 8d8bc304c1
commit 28b4d9abb3
2 changed files with 24 additions and 32 deletions

View File

@@ -7,7 +7,6 @@ use std::str::FromStr;
pub type Oid = u32;
pub type SystemId = u64;
pub type Result<T> = std::result::Result<T, io::Error>;
#[derive(Debug)]
pub enum FeMessage {
@@ -52,7 +51,7 @@ pub enum StartupRequestCode {
}
impl FeStartupMessage {
pub fn parse(buf: &mut BytesMut) -> Result<Option<FeMessage>> {
pub fn parse(buf: &mut BytesMut) -> io::Result<Option<FeMessage>> {
const MAX_STARTUP_PACKET_LENGTH: usize = 10000;
const CANCEL_REQUEST_CODE: u32 = (1234 << 16) | 5678;
const NEGOTIATE_SSL_CODE: u32 = (1234 << 16) | 5679;
@@ -202,7 +201,7 @@ impl<'a> BeMessage<'a> {
}
impl FeMessage {
pub fn parse(buf: &mut BytesMut) -> Result<Option<FeMessage>> {
pub fn parse(buf: &mut BytesMut) -> io::Result<Option<FeMessage>> {
if buf.len() < 5 {
let to_read = 5 - buf.len();
buf.reserve(to_read);

View File

@@ -2,6 +2,7 @@
/// WAL service listens for client connections and
/// receive WAL from wal_proposer and send it to WAL receivers
///
use anyhow::{bail, Result};
use byteorder::{BigEndian, ByteOrder};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use fs2::FileExt;
@@ -13,7 +14,7 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::cmp::{max, min};
use std::collections::HashMap;
use std::fs::{self, File, OpenOptions};
use std::io::{self, prelude::*, SeekFrom};
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::mem;
use std::net::{TcpListener, TcpStream};
use std::str;
@@ -220,18 +221,9 @@ trait NewSerializer: Serialize + DeserializeOwned {
impl<T> NewSerializer for T where T: Serialize + DeserializeOwned {}
/// Report and return IO error
macro_rules! io_error {
($($arg:tt)*) => (error!($($arg)*); return Err(io::Error::new(io::ErrorKind::Other,format!($($arg)*))))
}
/// Safe hex string parser returning proper result
fn parse_hex_str(s: &str) -> Result<u64> {
if let Ok(val) = u32::from_str_radix(s, 16) {
Ok(val as u64)
} else {
io_error!("Invalid hex number {}", s);
}
Ok(u64::from_str_radix(s, 16)?)
}
impl SafeKeeperInfo {
@@ -381,7 +373,7 @@ impl Timeline {
match file.try_lock_exclusive() {
Ok(()) => {}
Err(e) => {
io_error!(
bail!(
"Control file {:?} is locked by some other process: {}",
&control_file_path,
e
@@ -404,10 +396,10 @@ impl Timeline {
let my_info = SafeKeeperInfo::unpack(&mut input);
if my_info.magic != SK_MAGIC {
io_error!("Invalid control file magic: {}", my_info.magic);
bail!("Invalid control file magic: {}", my_info.magic);
}
if my_info.format_version != SK_FORMAT_VERSION {
io_error!(
bail!(
"Incompatible format version: {} vs. {}",
my_info.format_version,
SK_FORMAT_VERSION
@@ -532,7 +524,7 @@ impl Connection {
/* Check protocol compatibility */
if server_info.protocol_version != SK_PROTOCOL_VERSION {
io_error!(
bail!(
"Incompatible protocol version {} vs. {}",
server_info.protocol_version,
SK_PROTOCOL_VERSION
@@ -570,7 +562,7 @@ impl Connection {
self.start_sending();
my_info.server.node_id.pack(&mut self.outbuf);
self.send()?;
io_error!(
bail!(
"Reject connection attempt with term {} because my term is {}",
hex::encode(prop.node_id.term),
hex::encode(my_info.server.node_id.term)
@@ -610,7 +602,7 @@ impl Connection {
/* Receive message header */
let req = self.read_req::<SafeKeeperRequest>()?;
if req.sender_id != my_info.server.node_id {
io_error!("Sender NodeId is changed");
bail!("Sender NodeId is changed");
}
if req.begin_lsn == END_OF_STREAM {
info!("Server stops streaming");
@@ -700,7 +692,7 @@ impl Connection {
if self.inbuf.is_empty() {
return Ok(None);
} else {
io_error!("connection reset by peer");
bail!("connection reset by peer");
}
}
}
@@ -710,11 +702,12 @@ impl Connection {
// Parse libpq message
//
fn parse_message(&mut self) -> Result<Option<FeMessage>> {
if !self.init_done {
FeStartupMessage::parse(&mut self.inbuf)
let msg = if !self.init_done {
FeStartupMessage::parse(&mut self.inbuf)?
} else {
FeMessage::parse(&mut self.inbuf)
}
FeMessage::parse(&mut self.inbuf)?
};
Ok(msg)
}
//
@@ -728,7 +721,7 @@ impl Connection {
// Send buffered messages
//
fn send(&mut self) -> Result<()> {
self.stream.write_all(&self.outbuf)
Ok(self.stream.write_all(&self.outbuf)?)
}
//
@@ -772,7 +765,7 @@ impl Connection {
break;
}
_ => {
io_error!("unexpected message");
bail!("unexpected message");
}
}
}
@@ -845,7 +838,7 @@ impl Connection {
};
let wal_seg_size = self.timeline().get_info().server.wal_seg_size as usize;
if wal_seg_size == 0 {
io_error!("Can not start replication before connecting to wal_proposer");
bail!("Can not start replication before connecting to wal_proposer");
}
let (wal_end, timeline) = self.find_end_of_wal(false);
if start_pos == 0 {
@@ -913,7 +906,7 @@ impl Connection {
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
Err(e) => {
return Err(e);
return Err(e.into());
}
}
@@ -942,7 +935,7 @@ impl Connection {
Ok(opened_file) => file = opened_file,
Err(e) => {
error!("Failed to open log file {:?}: {}", &wal_file_path, e);
return Err(e);
return Err(e.into());
}
}
}
@@ -995,7 +988,7 @@ impl Connection {
} else if q.body.starts_with(b"START_REPLICATION") {
self.handle_start_replication(&q.body)
} else {
io_error!("Unexpected command {:?}", q.body);
bail!("Unexpected command {:?}", q.body);
}
}
@@ -1069,7 +1062,7 @@ impl Connection {
}
Err(e) => {
error!("Failed to open log file {:?}: {}", &wal_file_path, e);
return Err(e);
return Err(e.into());
}
}
}