diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index a4a456d8cb..baf8962ee1 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -42,6 +42,19 @@ pub struct ReplicationConn { stream_in: Option>, } +// TODO: move this to crate::timeline when there's more users +// TODO: design a proper Timeline mock api +trait HsFeedbackSubscriber { + fn add_hs_feedback(&self, _feedback: HotStandbyFeedback) {} +} + +impl HsFeedbackSubscriber for Arc { + #[inline(always)] + fn add_hs_feedback(&self, feedback: HotStandbyFeedback) { + Timeline::add_hs_feedback(self, feedback); + } +} + impl ReplicationConn { /// Create a new `ReplicationConn` pub fn new(pgb: &mut PostgresBackend) -> Self { @@ -52,13 +65,16 @@ impl ReplicationConn { /// Handle incoming messages from the network. /// This is spawned into the background by `handle_start_replication`. - fn background_thread(mut stream_in: impl Read, timeline: Arc) -> Result<()> { + fn background_thread( + mut stream_in: impl Read, + subscriber: impl HsFeedbackSubscriber, + ) -> Result<()> { // Wait for replica's feedback. while let Some(msg) = FeMessage::read(&mut stream_in)? { match msg { FeMessage::CopyData(m) => { let feedback = HotStandbyFeedback::des(&m)?; - timeline.add_hs_feedback(feedback); + subscriber.add_hs_feedback(feedback); } _ => { // We only handle `CopyData` messages. Anything else is ignored. @@ -212,3 +228,15 @@ impl ReplicationConn { Ok(()) } } + +mod tests { + // A no-op impl for tests + impl super::HsFeedbackSubscriber for () {} + + #[test] + fn test_replication_conn_background_thread_eof() { + // Test that background_thread recognizes EOF + let stream: &[u8] = &[]; + super::ReplicationConn::background_thread(stream, ()).unwrap(); + } +}