diff --git a/pageserver/src/walreceiver/connection_manager.rs b/pageserver/src/walreceiver/connection_manager.rs index 8048707480..a65703bca9 100644 --- a/pageserver/src/walreceiver/connection_manager.rs +++ b/pageserver/src/walreceiver/connection_manager.rs @@ -145,21 +145,17 @@ async fn connection_manager_loop_step( let wal_connection = walreceiver_state.wal_connection.as_mut() .expect("Should have a connection, as checked by the corresponding select! guard"); match wal_connection_update { - TaskEvent::Update(c) => { - match c { - TaskStateUpdate::Init | TaskStateUpdate::Started => {}, - TaskStateUpdate::Progress(status) => { - if status.has_processed_wal { - // We have advanced last_record_lsn by processing the WAL received - // from this safekeeper. This is good enough to clean unsuccessful - // retries history and allow reconnecting to this safekeeper without - // sleeping for a long time. - walreceiver_state.wal_connection_retries.remove(&wal_connection.sk_id); - } - wal_connection.status = status.to_owned(); - } + TaskEvent::Update(TaskStateUpdate::Init | TaskStateUpdate::Started) => {}, + TaskEvent::Update(TaskStateUpdate::Progress(new_status)) => { + if new_status.has_processed_wal { + // We have advanced last_record_lsn by processing the WAL received + // from this safekeeper. This is good enough to clean unsuccessful + // retries history and allow reconnecting to this safekeeper without + // sleeping for a long time. + walreceiver_state.wal_connection_retries.remove(&wal_connection.sk_id); } - }, + wal_connection.status = new_status; + } TaskEvent::End(walreceiver_task_result) => { match walreceiver_task_result { Ok(()) => debug!("WAL receiving task finished"), @@ -210,7 +206,18 @@ async fn connection_manager_loop_step( } }, - _ = async { tokio::time::sleep(time_until_next_retry.unwrap()).await }, if time_until_next_retry.is_some() => {} + Some(()) = async { + match time_until_next_retry { + Some(sleep_time) => { + tokio::time::sleep(sleep_time).await; + Some(()) + }, + None => { + debug!("No candidates to retry, waiting indefinitely for the broker events"); + None + } + } + } => debug!("Waking up for the next retry after waiting for {time_until_next_retry:?}"), } if let Some(new_candidate) = walreceiver_state.next_connection_candidate() { @@ -480,20 +487,25 @@ impl WalreceiverState { .values() .filter_map(|retry| retry.next_retry_at) .filter(|next_retry_at| next_retry_at > &now) - .min(); + .min()?; - next_retry_at.and_then(|next_retry_at| (next_retry_at - now).to_std().ok()) + (next_retry_at - now).to_std().ok() } /// Adds another broker timeline into the state, if its more recent than the one already added there for the same key. fn register_timeline_update(&mut self, timeline_update: SafekeeperTimelineInfo) { - self.wal_stream_candidates.insert( - NodeId(timeline_update.safekeeper_id), + let new_safekeeper_id = NodeId(timeline_update.safekeeper_id); + let old_entry = self.wal_stream_candidates.insert( + new_safekeeper_id, BrokerSkTimeline { timeline: timeline_update, latest_update: Utc::now().naive_utc(), }, ); + + if old_entry.is_none() { + info!("New SK node was added: {new_safekeeper_id}"); + } } /// Cleans up stale broker records and checks the rest for the new connection candidate. @@ -720,12 +732,13 @@ impl WalreceiverState { /// Remove candidates which haven't sent broker updates for a while. fn cleanup_old_candidates(&mut self) { let mut node_ids_to_remove = Vec::with_capacity(self.wal_stream_candidates.len()); + let lagging_wal_timeout = self.lagging_wal_timeout; self.wal_stream_candidates.retain(|node_id, broker_info| { if let Ok(time_since_latest_broker_update) = (Utc::now().naive_utc() - broker_info.latest_update).to_std() { - let should_retain = time_since_latest_broker_update < self.lagging_wal_timeout; + let should_retain = time_since_latest_broker_update < lagging_wal_timeout; if !should_retain { node_ids_to_remove.push(*node_id); } @@ -735,8 +748,11 @@ impl WalreceiverState { } }); - for node_id in node_ids_to_remove { - self.wal_connection_retries.remove(&node_id); + if !node_ids_to_remove.is_empty() { + for node_id in node_ids_to_remove { + info!("Safekeeper node {node_id} did not send events for over {lagging_wal_timeout:?}, not retrying the connections"); + self.wal_connection_retries.remove(&node_id); + } } } @@ -883,10 +899,10 @@ mod tests { state.wal_connection = Some(WalConnection { started_at: now, sk_id: connected_sk_id, - status: connection_status.clone(), + status: connection_status, connection_task: TaskHandle::spawn(move |sender, _| async move { sender - .send(TaskStateUpdate::Progress(connection_status.clone())) + .send(TaskStateUpdate::Progress(connection_status)) .ok(); Ok(()) }), @@ -1045,10 +1061,10 @@ mod tests { state.wal_connection = Some(WalConnection { started_at: now, sk_id: connected_sk_id, - status: connection_status.clone(), + status: connection_status, connection_task: TaskHandle::spawn(move |sender, _| async move { sender - .send(TaskStateUpdate::Progress(connection_status.clone())) + .send(TaskStateUpdate::Progress(connection_status)) .ok(); Ok(()) }), @@ -1110,10 +1126,10 @@ mod tests { state.wal_connection = Some(WalConnection { started_at: now, sk_id: NodeId(1), - status: connection_status.clone(), + status: connection_status, connection_task: TaskHandle::spawn(move |sender, _| async move { sender - .send(TaskStateUpdate::Progress(connection_status.clone())) + .send(TaskStateUpdate::Progress(connection_status)) .ok(); Ok(()) }), diff --git a/pageserver/src/walreceiver/walreceiver_connection.rs b/pageserver/src/walreceiver/walreceiver_connection.rs index cf2a99f1b5..5b7e60aa5e 100644 --- a/pageserver/src/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -35,7 +35,7 @@ use pq_proto::ReplicationFeedback; use utils::lsn::Lsn; /// Status of the connection. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] pub struct WalConnectionStatus { /// If we were able to initiate a postgres connection, this means that safekeeper process is at least running. pub is_connected: bool, @@ -83,7 +83,7 @@ pub async fn handle_walreceiver_connection( streaming_lsn: None, commit_lsn: None, }; - if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status.clone())) { + if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) { warn!("Wal connection event listener dropped right after connection init, aborting the connection: {e}"); return Ok(()); } @@ -135,7 +135,7 @@ pub async fn handle_walreceiver_connection( connection_status.latest_connection_update = Utc::now().naive_utc(); connection_status.latest_wal_update = Utc::now().naive_utc(); connection_status.commit_lsn = Some(end_of_wal); - if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status.clone())) { + if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) { warn!("Wal connection event listener dropped after IDENTIFY_SYSTEM, aborting the connection: {e}"); return Ok(()); } @@ -184,7 +184,20 @@ pub async fn handle_walreceiver_connection( replication_message = physical_stream.next() => replication_message, } } { - let replication_message = replication_message?; + let replication_message = match replication_message { + Ok(message) => message, + Err(replication_error) => { + if replication_error.is_closed() { + info!("Replication stream got closed"); + return Ok(()); + } else { + return Err( + anyhow::Error::new(replication_error).context("replication stream error") + ); + } + } + }; + let now = Utc::now().naive_utc(); let last_rec_lsn_before_msg = last_rec_lsn; @@ -207,7 +220,7 @@ pub async fn handle_walreceiver_connection( } &_ => {} }; - if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status.clone())) { + if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) { warn!("Wal connection event listener dropped, aborting the connection: {e}"); return Ok(()); } @@ -273,8 +286,7 @@ pub async fn handle_walreceiver_connection( if !connection_status.has_processed_wal && last_rec_lsn > last_rec_lsn_before_msg { // We have successfully processed at least one WAL record. connection_status.has_processed_wal = true; - if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status.clone())) - { + if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) { warn!("Wal connection event listener dropped, aborting the connection: {e}"); return Ok(()); }