mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 14:02:55 +00:00
Improve walkeeper replication error messages & context (#585)
This commit is contained in:
@@ -3,7 +3,7 @@
|
||||
|
||||
use crate::send_wal::SendWalHandler;
|
||||
use crate::timeline::{Timeline, TimelineTools};
|
||||
use anyhow::{anyhow, Result};
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use log::*;
|
||||
use postgres_ffi::xlog_utils::{get_current_timestamp, TimestampTz, XLogFileName, MAX_SEND_SIZE};
|
||||
@@ -83,7 +83,8 @@ impl ReplicationConn {
|
||||
while let Some(msg) = FeMessage::read(&mut stream_in)? {
|
||||
match msg {
|
||||
FeMessage::CopyData(m) => {
|
||||
let feedback = HotStandbyFeedback::des(&m)?;
|
||||
let feedback = HotStandbyFeedback::des(&m)
|
||||
.context("failed to deserialize HotStandbyFeedback")?;
|
||||
subscriber.add_hs_feedback(feedback);
|
||||
}
|
||||
FeMessage::Sync => {}
|
||||
@@ -105,7 +106,7 @@ impl ReplicationConn {
|
||||
let mut lsns = caps.map(|cap| cap[1].parse::<Lsn>());
|
||||
let start_pos = lsns
|
||||
.next()
|
||||
.ok_or_else(|| anyhow!("failed to find start LSN"))??;
|
||||
.ok_or_else(|| anyhow!("Failed to parse start LSN from command"))??;
|
||||
let stop_pos = lsns.next().transpose()?.unwrap_or(Lsn(0));
|
||||
Ok((start_pos, stop_pos))
|
||||
}
|
||||
@@ -120,13 +121,12 @@ impl ReplicationConn {
|
||||
}
|
||||
|
||||
// If that failed, try it without the .partial extension.
|
||||
match File::open(&wal_file_path) {
|
||||
Ok(opened_file) => Ok(opened_file),
|
||||
Err(e) => {
|
||||
error!("Failed to open log file {:?}: {}", &wal_file_path, e);
|
||||
Err(e.into())
|
||||
}
|
||||
}
|
||||
File::open(&wal_file_path)
|
||||
.with_context(|| format!("Failed to open WAL file {:?}", wal_file_path))
|
||||
.map_err(|e| {
|
||||
error!("{}", e);
|
||||
e
|
||||
})
|
||||
}
|
||||
|
||||
///
|
||||
@@ -144,7 +144,7 @@ impl ReplicationConn {
|
||||
|
||||
thread::spawn(move || {
|
||||
if let Err(err) = Self::background_thread(bg_stream_in, bg_timeline) {
|
||||
error!("socket error: {}", err);
|
||||
error!("Replication background thread failed: {}", err);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -154,7 +154,7 @@ impl ReplicationConn {
|
||||
loop {
|
||||
wal_seg_size = swh.timeline.get().get_info().server.wal_seg_size as usize;
|
||||
if wal_seg_size == 0 {
|
||||
error!("Can not start replication before connecting to wal_proposer");
|
||||
error!("Cannot start replication before connecting to wal_proposer");
|
||||
sleep(Duration::from_secs(1));
|
||||
} else {
|
||||
break;
|
||||
@@ -216,8 +216,9 @@ impl ReplicationConn {
|
||||
|
||||
// Read some data from the file.
|
||||
let mut file_buf = vec![0u8; send_size];
|
||||
file.seek(SeekFrom::Start(xlogoff as u64))?;
|
||||
file.read_exact(&mut file_buf)?;
|
||||
file.seek(SeekFrom::Start(xlogoff as u64))
|
||||
.and_then(|_| file.read_exact(&mut file_buf))
|
||||
.context("Failed to read data from WAL file")?;
|
||||
|
||||
// Write some data to the network socket.
|
||||
pgb.write_message(&BeMessage::XLogData(XLogDataBody {
|
||||
@@ -225,7 +226,8 @@ impl ReplicationConn {
|
||||
wal_end: end_pos.0,
|
||||
timestamp: get_current_timestamp(),
|
||||
data: &file_buf,
|
||||
}))?;
|
||||
}))
|
||||
.context("Failed to send XLogData")?;
|
||||
|
||||
start_pos += send_size as u64;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user