diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index 0cb619b9d3..6fd156f868 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -79,7 +79,7 @@ struct ReplicationConnGuard { impl Drop for ReplicationConnGuard { fn drop(&mut self) { - self.timeline.update_replica_state(self.replica, None); + self.timeline.remove_replica(self.replica); } } @@ -120,14 +120,12 @@ impl ReplicationConn { /// This is spawned into the background by `handle_start_replication`. fn background_thread( mut stream_in: ReadStream, - timeline: Arc, - replica_id: usize, + replica_guard: Arc, ) -> Result<()> { + let replica_id = replica_guard.replica; + let timeline = &replica_guard.timeline; + let mut state = ReplicaState::new(); - let _guard = ReplicationConnGuard { - replica: replica_id, - timeline: timeline.clone(), - }; // Wait for replica's feedback. while let Some(msg) = FeMessage::read(&mut stream_in)? { match &msg { @@ -140,7 +138,7 @@ impl ReplicationConn { // Note: deserializing is on m[1..] because we skip the tag byte. state.hs_feedback = HotStandbyFeedback::des(&m[1..]) .context("failed to deserialize HotStandbyFeedback")?; - timeline.update_replica_state(replica_id, Some(state)); + timeline.update_replica_state(replica_id, state); } Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => { let reply = StandbyReply::des(&m[1..]) @@ -148,7 +146,7 @@ impl ReplicationConn { state.last_received_lsn = reply.write_lsn; state.disk_consistent_lsn = reply.flush_lsn; state.remote_consistent_lsn = reply.apply_lsn; - timeline.update_replica_state(replica_id, Some(state)); + timeline.update_replica_state(replica_id, state); } _ => warn!("unexpected message {:?}", msg), } @@ -207,16 +205,23 @@ impl ReplicationConn { // This replica_id is used below to check if it's time to stop replication. let replica_id = bg_timeline.add_replica(state); + // Use a guard object to remove our entry from the timeline, when the background + // thread and us have both finished using it. + let replica_guard = Arc::new(ReplicationConnGuard { + replica: replica_id, + timeline: bg_timeline, + }); + let bg_replica_guard = Arc::clone(&replica_guard); + // TODO: here we got two threads, one for writing WAL and one for receiving // feedback. If one of them fails, we should shutdown the other one too. let _ = thread::Builder::new() .name("HotStandbyFeedback thread".into()) .spawn(move || { - if let Err(err) = Self::background_thread(bg_stream_in, bg_timeline, replica_id) { + if let Err(err) = Self::background_thread(bg_stream_in, bg_replica_guard) { error!("Replication background thread failed: {}", err); } - }) - .unwrap(); + })?; let mut wal_seg_size: usize; loop { diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index 1b7356538d..c7217b97e8 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -121,7 +121,7 @@ impl SharedState { } /// Assign new replica ID. We choose first empty cell in the replicas vector - /// or extend the vector if there are not free items. + /// or extend the vector if there are no free slots. pub fn add_replica(&mut self, state: ReplicaState) -> usize { if let Some(pos) = self.replicas.iter().position(|r| r.is_none()) { self.replicas[pos] = Some(state); @@ -298,9 +298,15 @@ impl Timeline { shared_state.add_replica(state) } - pub fn update_replica_state(&self, id: usize, state: Option) { + pub fn update_replica_state(&self, id: usize, state: ReplicaState) { let mut shared_state = self.mutex.lock().unwrap(); - shared_state.replicas[id] = state; + shared_state.replicas[id] = Some(state); + } + + pub fn remove_replica(&self, id: usize) { + let mut shared_state = self.mutex.lock().unwrap(); + assert!(shared_state.replicas[id].is_some()); + shared_state.replicas[id] = None; } pub fn get_end_of_wal(&self) -> Lsn {