From 1d0706cf250b16f2c5052f446807b83fd41013fe Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Fri, 24 Jun 2022 21:26:53 +0300 Subject: [PATCH] Fix walreceiver connection selection mechanism * Avoid reconnecting to safekeeper immediately after its failure by limiting candidates to those with fewest connection attempts. Thus we don't have to wait lagging_wal_timeout (10s by default) before switch happens even if no new changes are generated, and current test_restarts_under_load expects some commits to happen within 4s. * Make default max_lsn_wal_lag larger, otherwise we constant reconnections happen during normal work. * Fix wal_connection_attempts maintanance, preventing busy loop of reconnections. --- pageserver/src/tenant_config.rs | 2 +- pageserver/src/walreceiver.rs | 29 ++- .../src/walreceiver/connection_manager.rs | 178 +++++++++++++----- 3 files changed, 160 insertions(+), 49 deletions(-) diff --git a/pageserver/src/tenant_config.rs b/pageserver/src/tenant_config.rs index 1722c1a13a..8811009743 100644 --- a/pageserver/src/tenant_config.rs +++ b/pageserver/src/tenant_config.rs @@ -37,7 +37,7 @@ pub mod defaults { pub const DEFAULT_PITR_INTERVAL: &str = "30 days"; pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds"; pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds"; - pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10_000; + pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024; } /// Per-tenant configuration options diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index fd9468a101..2b5a3123c1 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -178,7 +178,7 @@ async fn shutdown_all_wal_connections( /// That may lead to certain events not being observed by the listener. #[derive(Debug)] struct TaskHandle { - handle: JoinHandle<()>, + handle: JoinHandle>, events_receiver: watch::Receiver>, cancellation: watch::Sender<()>, } @@ -205,8 +205,8 @@ impl TaskHandle { let sender = Arc::clone(&events_sender); let handle = tokio::task::spawn(async move { - let task_result = task(sender, cancellation_receiver).await; - events_sender.send(TaskEvent::End(task_result)).ok(); + events_sender.send(TaskEvent::Started).ok(); + task(sender, cancellation_receiver).await }); TaskHandle { @@ -216,6 +216,16 @@ impl TaskHandle { } } + async fn next_task_event(&mut self) -> TaskEvent { + select! { + next_task_event = self.events_receiver.changed() => match next_task_event { + Ok(()) => self.events_receiver.borrow().clone(), + Err(_task_channel_part_dropped) => join_on_handle(&mut self.handle).await, + }, + task_completion_result = join_on_handle(&mut self.handle) => task_completion_result, + } + } + /// Aborts current task, waiting for it to finish. async fn shutdown(self) { self.cancellation.send(()).ok(); @@ -225,6 +235,19 @@ impl TaskHandle { } } +async fn join_on_handle(handle: &mut JoinHandle>) -> TaskEvent { + match handle.await { + Ok(task_result) => TaskEvent::End(task_result), + Err(e) => { + if e.is_cancelled() { + TaskEvent::End(Ok(())) + } else { + TaskEvent::End(Err(format!("WAL receiver task panicked: {e}"))) + } + } + } +} + /// A step to process timeline attach/detach events to enable/disable the corresponding WAL receiver machinery. /// In addition to WAL streaming management, the step ensures that corresponding tenant has its service threads enabled or disabled. /// This is done here, since only walreceiver knows when a certain tenant has no streaming enabled. diff --git a/pageserver/src/walreceiver/connection_manager.rs b/pageserver/src/walreceiver/connection_manager.rs index d5ca1d5159..614bca50ad 100644 --- a/pageserver/src/walreceiver/connection_manager.rs +++ b/pageserver/src/walreceiver/connection_manager.rs @@ -104,49 +104,29 @@ async fn connection_manager_loop_step( Some(wal_connection_update) = async { match walreceiver_state.wal_connection.as_mut() { - Some(wal_connection) => { - let receiver = &mut wal_connection.connection_task.events_receiver; - Some(match receiver.changed().await { - Ok(()) => receiver.borrow().clone(), - Err(_cancellation_error) => TaskEvent::End(Ok(())), - }) - } + Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await), None => None, } } => { - let (connection_update, reset_connection_attempts) = match &wal_connection_update { - TaskEvent::Started => (Some(Utc::now().naive_utc()), true), - TaskEvent::NewEvent(replication_feedback) => (Some(DateTime::::from(replication_feedback.ps_replytime).naive_utc()), true), + let wal_connection = walreceiver_state.wal_connection.as_mut().expect("Should have a connection, as checked by the corresponding select! guard"); + match &wal_connection_update { + TaskEvent::Started => { + wal_connection.latest_connection_update = Utc::now().naive_utc(); + *walreceiver_state.wal_connection_attempts.entry(wal_connection.sk_id).or_insert(0) += 1; + }, + TaskEvent::NewEvent(replication_feedback) => { + wal_connection.latest_connection_update = DateTime::::from(replication_feedback.ps_replytime).naive_utc(); + // reset connection attempts here only, the only place where both nodes + // explicitly confirmn with replication feedback that they are connected to each other + walreceiver_state.wal_connection_attempts.remove(&wal_connection.sk_id); + }, TaskEvent::End(end_result) => { - let should_reset_connection_attempts = match end_result { - Ok(()) => { - debug!("WAL receiving task finished"); - true - }, - Err(e) => { - warn!("WAL receiving task failed: {e}"); - false - }, + match end_result { + Ok(()) => debug!("WAL receiving task finished"), + Err(e) => warn!("WAL receiving task failed: {e}"), }; walreceiver_state.wal_connection = None; - (None, should_reset_connection_attempts) }, - }; - - if let Some(connection_update) = connection_update { - match &mut walreceiver_state.wal_connection { - Some(wal_connection) => { - wal_connection.latest_connection_update = connection_update; - - let attempts_entry = walreceiver_state.wal_connection_attempts.entry(wal_connection.sk_id).or_insert(0); - if reset_connection_attempts { - *attempts_entry = 0; - } else { - *attempts_entry += 1; - } - }, - None => error!("Received connection update for WAL connection that is not active, update: {wal_connection_update:?}"), - } } }, @@ -406,10 +386,8 @@ impl WalreceiverState { Some(existing_wal_connection) => { let connected_sk_node = existing_wal_connection.sk_id; - let (new_sk_id, new_safekeeper_etcd_data, new_wal_producer_connstr) = self - .applicable_connection_candidates() - .filter(|&(sk_id, _, _)| sk_id != connected_sk_node) - .max_by_key(|(_, info, _)| info.commit_lsn)?; + let (new_sk_id, new_safekeeper_etcd_data, new_wal_producer_connstr) = + self.select_connection_candidate(Some(connected_sk_node))?; let now = Utc::now().naive_utc(); if let Ok(latest_interaciton) = @@ -462,9 +440,8 @@ impl WalreceiverState { } } None => { - let (new_sk_id, _, new_wal_producer_connstr) = self - .applicable_connection_candidates() - .max_by_key(|(_, info, _)| info.commit_lsn)?; + let (new_sk_id, _, new_wal_producer_connstr) = + self.select_connection_candidate(None)?; return Some(NewWalConnectionCandidate { safekeeper_id: new_sk_id, wal_producer_connstr: new_wal_producer_connstr, @@ -476,6 +453,49 @@ impl WalreceiverState { None } + /// Selects the best possible candidate, based on the data collected from etcd updates about the safekeepers. + /// Optionally, omits the given node, to support gracefully switching from a healthy safekeeper to another. + /// + /// The candidate that is chosen: + /// * has fewest connection attempts from pageserver to safekeeper node (reset every time the WAL replication feedback is sent) + /// * has greatest data Lsn among the ones that are left + /// + /// NOTE: + /// We evict timeline data received from etcd based on time passed since it was registered, along with its connection attempts values, but + /// otherwise to reset the connection attempts, a successful connection to that node is needed. + /// That won't happen now, before all nodes with less connection attempts are connected to first, which might leave the sk node with more advanced state to be ignored. + fn select_connection_candidate( + &self, + node_to_omit: Option, + ) -> Option<(NodeId, &SkTimelineInfo, String)> { + let all_candidates = self + .applicable_connection_candidates() + .filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit) + .collect::>(); + + let smallest_attempts_allowed = all_candidates + .iter() + .map(|(sk_id, _, _)| { + self.wal_connection_attempts + .get(sk_id) + .copied() + .unwrap_or(0) + }) + .min()?; + + all_candidates + .into_iter() + .filter(|(sk_id, _, _)| { + smallest_attempts_allowed + >= self + .wal_connection_attempts + .get(sk_id) + .copied() + .unwrap_or(0) + }) + .max_by_key(|(_, info, _)| info.commit_lsn) + } + fn applicable_connection_candidates( &self, ) -> impl Iterator { @@ -500,15 +520,25 @@ impl WalreceiverState { } fn cleanup_old_candidates(&mut self) { - self.wal_stream_candidates.retain(|_, etcd_info| { + let mut node_ids_to_remove = Vec::with_capacity(self.wal_stream_candidates.len()); + + self.wal_stream_candidates.retain(|node_id, etcd_info| { if let Ok(time_since_latest_etcd_update) = (Utc::now().naive_utc() - etcd_info.latest_update).to_std() { - time_since_latest_etcd_update < self.lagging_wal_timeout + let should_retain = time_since_latest_etcd_update < self.lagging_wal_timeout; + if !should_retain { + node_ids_to_remove.push(*node_id); + } + should_retain } else { true } }); + + for node_id in node_ids_to_remove { + self.wal_connection_attempts.remove(&node_id); + } } } @@ -843,6 +873,64 @@ mod tests { Ok(()) } + #[tokio::test] + async fn candidate_with_many_connection_failures() -> anyhow::Result<()> { + let harness = RepoHarness::create("candidate_with_many_connection_failures")?; + let mut state = dummy_state(&harness); + let now = Utc::now().naive_utc(); + + let current_lsn = Lsn(100_000).align(); + let bigger_lsn = Lsn(current_lsn.0 + 100).align(); + + state.wal_connection = None; + state.wal_stream_candidates = HashMap::from([ + ( + NodeId(0), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(bigger_lsn), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + ), + ( + NodeId(1), + EtcdSkTimeline { + timeline: SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(current_lsn), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + }, + etcd_version: 0, + latest_update: now, + }, + ), + ]); + state.wal_connection_attempts = HashMap::from([(NodeId(0), 1), (NodeId(1), 0)]); + + let candidate_with_less_errors = state + .next_connection_candidate() + .expect("Expected one candidate selected, but got none"); + assert_eq!( + candidate_with_less_errors.safekeeper_id, + NodeId(1), + "Should select the node with less connection errors" + ); + + Ok(()) + } + #[tokio::test] async fn connection_no_etcd_data_candidate() -> anyhow::Result<()> { let harness = RepoHarness::create("connection_no_etcd_data_candidate")?;