diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 45195ec706..a521785c37 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -265,6 +265,9 @@ fn walreceiver_main( if let Some(last_lsn) = status_update { // TODO: More thought should go into what values are sent here. let last_lsn = PgLsn::from(u64::from(last_lsn)); + // We are using disk consistent LSN as `write_lsn`, i.e. LSN at which age server + // may guarantee persistence of all received data. Safekeeper is not free to remove + // WAL preceding `write_lsn`: it should not be requested by this page server. let write_lsn = PgLsn::from(u64::from(timeline.get_disk_consistent_lsn())); let flush_lsn = last_lsn; let apply_lsn = PgLsn::from(0); diff --git a/vendor/postgres b/vendor/postgres index 324a67700b..93b1dd0055 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 324a67700b92460e52ece9e719d1b57b9e63e3b6 +Subproject commit 93b1dd005527f3c82aec2dbf3b220aba8c9eab2c diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index ddb72022e0..29532f9942 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -90,7 +90,7 @@ impl ReplicationConn { /// This is spawned into the background by `handle_start_replication`. fn background_thread(mut stream_in: impl Read, timeline: Arc) -> Result<()> { let mut state = ReplicaState::new(); - let replica = timeline.add_replica(); + let replica = timeline.add_replica(state); let _guard = ReplicationConnGuard { replica, timeline: timeline.clone(), @@ -100,8 +100,7 @@ impl ReplicationConn { match &msg { FeMessage::CopyData(m) => { // There's two possible data messages that the client is supposed to send here: - // `HotStandbyFeedback` and `StandbyStatusUpdate`. We only handle hot standby - // feedback. + // `HotStandbyFeedback` and `StandbyStatusUpdate`. match m.first().cloned() { Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => { diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index d2a885e1ce..42e8afabb8 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -89,15 +89,14 @@ impl SharedState { /// Assign new replica ID. We choose first empty cell in the replicas vector /// or extend the vector if there are not free items. - pub fn add_replica(&mut self) -> usize { - let len = self.replicas.len(); - for i in 0..len { - if self.replicas[i].is_none() { - return i; - } + pub fn add_replica(&mut self, state: ReplicaState) -> usize { + if let Some(pos) = self.replicas.iter().position(|r| r.is_none()) { + self.replicas[pos] = Some(state); + return pos; } - self.replicas.push(None); - len + let pos = self.replicas.len(); + self.replicas.push(Some(state)); + pos } /// Restore SharedState from control file. Locks the control file along the @@ -286,9 +285,9 @@ impl Timeline { self.mutex.lock().unwrap().sk.s.clone() } - pub fn add_replica(&self) -> usize { + pub fn add_replica(&self, state: ReplicaState) -> usize { let mut shared_state = self.mutex.lock().unwrap(); - shared_state.add_replica() + shared_state.add_replica(state) } pub fn update_replica_state(&self, id: usize, state: Option) {