diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index 4aadc4885f..9f9e974336 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -14,6 +14,7 @@ use serde::{Deserialize, Serialize}; use std::cmp::min; use std::fs::File; use std::io::{Read, Seek, SeekFrom}; +use std::net::Shutdown; use std::path::Path; use std::sync::Arc; use std::thread::sleep; @@ -90,7 +91,7 @@ impl ReplicationConn { /// Handle incoming messages from the network. /// This is spawned into the background by `handle_start_replication`. - fn background_thread(mut stream_in: impl Read, timeline: Arc) -> Result<()> { + fn background_thread(mut stream_in: ReadStream, timeline: Arc) -> Result<()> { let mut state = ReplicaState::new(); let replica = timeline.add_replica(state); let _guard = ReplicationConnGuard { @@ -121,7 +122,12 @@ impl ReplicationConn { } } FeMessage::Sync => {} - FeMessage::CopyFail => return Err(anyhow!("Copy failed")), + FeMessage::CopyFail => { + // Shutdown the connection, because rust-postgres client cannot be dropped + // when connection is alive. + let _ = stream_in.shutdown(Shutdown::Both); + return Err(anyhow!("Copy failed")); + } _ => { // We only handle `CopyData`, 'Sync', 'CopyFail' messages. Anything else is ignored. info!("unexpected message {:?}", msg); @@ -175,6 +181,8 @@ impl ReplicationConn { let bg_timeline = Arc::clone(swh.timeline.get()); let bg_stream_in = self.stream_in.take().unwrap(); + // TODO: here we got two threads, one for writing WAL and one for receiving + // feedback. If one of them fails, we should shutdown the other one too. let _ = thread::Builder::new() .name("HotStandbyFeedback thread".into()) .spawn(move || {