From a2498f3e6707f041f03220fc14cc333255274c2d Mon Sep 17 00:00:00 2001 From: Max Sharnoff Date: Tue, 14 Sep 2021 11:59:14 -0700 Subject: [PATCH] Improve walkeeper replication error messages & context (#585) --- walkeeper/src/replication.rs | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index 589f4298fa..46bcebbd45 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -3,7 +3,7 @@ use crate::send_wal::SendWalHandler; use crate::timeline::{Timeline, TimelineTools}; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; use bytes::Bytes; use log::*; use postgres_ffi::xlog_utils::{get_current_timestamp, TimestampTz, XLogFileName, MAX_SEND_SIZE}; @@ -83,7 +83,8 @@ impl ReplicationConn { while let Some(msg) = FeMessage::read(&mut stream_in)? { match msg { FeMessage::CopyData(m) => { - let feedback = HotStandbyFeedback::des(&m)?; + let feedback = HotStandbyFeedback::des(&m) + .context("failed to deserialize HotStandbyFeedback")?; subscriber.add_hs_feedback(feedback); } FeMessage::Sync => {} @@ -105,7 +106,7 @@ impl ReplicationConn { let mut lsns = caps.map(|cap| cap[1].parse::()); let start_pos = lsns .next() - .ok_or_else(|| anyhow!("failed to find start LSN"))??; + .ok_or_else(|| anyhow!("Failed to parse start LSN from command"))??; let stop_pos = lsns.next().transpose()?.unwrap_or(Lsn(0)); Ok((start_pos, stop_pos)) } @@ -120,13 +121,12 @@ impl ReplicationConn { } // If that failed, try it without the .partial extension. - match File::open(&wal_file_path) { - Ok(opened_file) => Ok(opened_file), - Err(e) => { - error!("Failed to open log file {:?}: {}", &wal_file_path, e); - Err(e.into()) - } - } + File::open(&wal_file_path) + .with_context(|| format!("Failed to open WAL file {:?}", wal_file_path)) + .map_err(|e| { + error!("{}", e); + e + }) } /// @@ -144,7 +144,7 @@ impl ReplicationConn { thread::spawn(move || { if let Err(err) = Self::background_thread(bg_stream_in, bg_timeline) { - error!("socket error: {}", err); + error!("Replication background thread failed: {}", err); } }); @@ -154,7 +154,7 @@ impl ReplicationConn { loop { wal_seg_size = swh.timeline.get().get_info().server.wal_seg_size as usize; if wal_seg_size == 0 { - error!("Can not start replication before connecting to wal_proposer"); + error!("Cannot start replication before connecting to wal_proposer"); sleep(Duration::from_secs(1)); } else { break; @@ -216,8 +216,9 @@ impl ReplicationConn { // Read some data from the file. let mut file_buf = vec![0u8; send_size]; - file.seek(SeekFrom::Start(xlogoff as u64))?; - file.read_exact(&mut file_buf)?; + file.seek(SeekFrom::Start(xlogoff as u64)) + .and_then(|_| file.read_exact(&mut file_buf)) + .context("Failed to read data from WAL file")?; // Write some data to the network socket. pgb.write_message(&BeMessage::XLogData(XLogDataBody { @@ -225,7 +226,8 @@ impl ReplicationConn { wal_end: end_pos.0, timestamp: get_current_timestamp(), data: &file_buf, - }))?; + })) + .context("Failed to send XLogData")?; start_pos += send_size as u64;