Fix race condition leading to panic in walkeeper.

The walkeeper launch two threads for each connection, and uses a guard
object to remove entry from 'replicas' array, when finishes. But only
the background thread held onto the guard object, so if the background
thread finished before the other thread, the array entry would be
removed prematurely, which lead to panic in the check_stop_streaming()
call.

Fixes https://github.com/zenithdb/zenith/issues/1103
This commit is contained in:
Heikki Linnakangas
2022-01-13 11:21:11 +02:00
parent ab4d272149
commit 772d853dcf
2 changed files with 26 additions and 15 deletions

View File

@@ -79,7 +79,7 @@ struct ReplicationConnGuard {
impl Drop for ReplicationConnGuard {
fn drop(&mut self) {
self.timeline.update_replica_state(self.replica, None);
self.timeline.remove_replica(self.replica);
}
}
@@ -120,14 +120,12 @@ impl ReplicationConn {
/// This is spawned into the background by `handle_start_replication`.
fn background_thread(
mut stream_in: ReadStream,
timeline: Arc<Timeline>,
replica_id: usize,
replica_guard: Arc<ReplicationConnGuard>,
) -> Result<()> {
let replica_id = replica_guard.replica;
let timeline = &replica_guard.timeline;
let mut state = ReplicaState::new();
let _guard = ReplicationConnGuard {
replica: replica_id,
timeline: timeline.clone(),
};
// Wait for replica's feedback.
while let Some(msg) = FeMessage::read(&mut stream_in)? {
match &msg {
@@ -140,7 +138,7 @@ impl ReplicationConn {
// Note: deserializing is on m[1..] because we skip the tag byte.
state.hs_feedback = HotStandbyFeedback::des(&m[1..])
.context("failed to deserialize HotStandbyFeedback")?;
timeline.update_replica_state(replica_id, Some(state));
timeline.update_replica_state(replica_id, state);
}
Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
let reply = StandbyReply::des(&m[1..])
@@ -148,7 +146,7 @@ impl ReplicationConn {
state.last_received_lsn = reply.write_lsn;
state.disk_consistent_lsn = reply.flush_lsn;
state.remote_consistent_lsn = reply.apply_lsn;
timeline.update_replica_state(replica_id, Some(state));
timeline.update_replica_state(replica_id, state);
}
_ => warn!("unexpected message {:?}", msg),
}
@@ -207,16 +205,23 @@ impl ReplicationConn {
// This replica_id is used below to check if it's time to stop replication.
let replica_id = bg_timeline.add_replica(state);
// Use a guard object to remove our entry from the timeline, when the background
// thread and us have both finished using it.
let replica_guard = Arc::new(ReplicationConnGuard {
replica: replica_id,
timeline: bg_timeline,
});
let bg_replica_guard = Arc::clone(&replica_guard);
// TODO: here we got two threads, one for writing WAL and one for receiving
// feedback. If one of them fails, we should shutdown the other one too.
let _ = thread::Builder::new()
.name("HotStandbyFeedback thread".into())
.spawn(move || {
if let Err(err) = Self::background_thread(bg_stream_in, bg_timeline, replica_id) {
if let Err(err) = Self::background_thread(bg_stream_in, bg_replica_guard) {
error!("Replication background thread failed: {}", err);
}
})
.unwrap();
})?;
let mut wal_seg_size: usize;
loop {

View File

@@ -121,7 +121,7 @@ impl SharedState {
}
/// Assign new replica ID. We choose first empty cell in the replicas vector
/// or extend the vector if there are not free items.
/// or extend the vector if there are no free slots.
pub fn add_replica(&mut self, state: ReplicaState) -> usize {
if let Some(pos) = self.replicas.iter().position(|r| r.is_none()) {
self.replicas[pos] = Some(state);
@@ -298,9 +298,15 @@ impl Timeline {
shared_state.add_replica(state)
}
pub fn update_replica_state(&self, id: usize, state: Option<ReplicaState>) {
pub fn update_replica_state(&self, id: usize, state: ReplicaState) {
let mut shared_state = self.mutex.lock().unwrap();
shared_state.replicas[id] = state;
shared_state.replicas[id] = Some(state);
}
pub fn remove_replica(&self, id: usize) {
let mut shared_state = self.mutex.lock().unwrap();
assert!(shared_state.replicas[id].is_some());
shared_state.replicas[id] = None;
}
pub fn get_end_of_wal(&self) -> Lsn {