Fix race condition in add_replica

This commit is contained in:
Konstantin Knizhnik
2021-10-19 12:01:24 +03:00
parent 4ee30b6c36
commit 57da296356
4 changed files with 15 additions and 14 deletions

View File

@@ -265,6 +265,9 @@ fn walreceiver_main(
if let Some(last_lsn) = status_update {
// TODO: More thought should go into what values are sent here.
let last_lsn = PgLsn::from(u64::from(last_lsn));
// We are using disk consistent LSN as `write_lsn`, i.e. LSN at which age server
// may guarantee persistence of all received data. Safekeeper is not free to remove
// WAL preceding `write_lsn`: it should not be requested by this page server.
let write_lsn = PgLsn::from(u64::from(timeline.get_disk_consistent_lsn()));
let flush_lsn = last_lsn;
let apply_lsn = PgLsn::from(0);

View File

@@ -90,7 +90,7 @@ impl ReplicationConn {
/// This is spawned into the background by `handle_start_replication`.
fn background_thread(mut stream_in: impl Read, timeline: Arc<Timeline>) -> Result<()> {
let mut state = ReplicaState::new();
let replica = timeline.add_replica();
let replica = timeline.add_replica(state);
let _guard = ReplicationConnGuard {
replica,
timeline: timeline.clone(),
@@ -100,8 +100,7 @@ impl ReplicationConn {
match &msg {
FeMessage::CopyData(m) => {
// There's two possible data messages that the client is supposed to send here:
// `HotStandbyFeedback` and `StandbyStatusUpdate`. We only handle hot standby
// feedback.
// `HotStandbyFeedback` and `StandbyStatusUpdate`.
match m.first().cloned() {
Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {

View File

@@ -89,15 +89,14 @@ impl SharedState {
/// Assign new replica ID. We choose first empty cell in the replicas vector
/// or extend the vector if there are not free items.
pub fn add_replica(&mut self) -> usize {
let len = self.replicas.len();
for i in 0..len {
if self.replicas[i].is_none() {
return i;
}
pub fn add_replica(&mut self, state: ReplicaState) -> usize {
if let Some(pos) = self.replicas.iter().position(|r| r.is_none()) {
self.replicas[pos] = Some(state);
return pos;
}
self.replicas.push(None);
len
let pos = self.replicas.len();
self.replicas.push(Some(state));
pos
}
/// Restore SharedState from control file. Locks the control file along the
@@ -286,9 +285,9 @@ impl Timeline {
self.mutex.lock().unwrap().sk.s.clone()
}
pub fn add_replica(&self) -> usize {
pub fn add_replica(&self, state: ReplicaState) -> usize {
let mut shared_state = self.mutex.lock().unwrap();
shared_state.add_replica()
shared_state.add_replica(state)
}
pub fn update_replica_state(&self, id: usize, state: Option<ReplicaState>) {