From 1d16ee92d4a41882047902a0cf9eaab7f9332ce6 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Thu, 2 Jun 2022 21:21:01 +0300 Subject: [PATCH] Fix the Lsn difference reconnection --- pageserver/src/tenant_config.rs | 2 +- pageserver/src/walreceiver.rs | 188 +++++++++++++++++--------------- 2 files changed, 99 insertions(+), 91 deletions(-) diff --git a/pageserver/src/tenant_config.rs b/pageserver/src/tenant_config.rs index f68a820e95..1722c1a13a 100644 --- a/pageserver/src/tenant_config.rs +++ b/pageserver/src/tenant_config.rs @@ -37,7 +37,7 @@ pub mod defaults { pub const DEFAULT_PITR_INTERVAL: &str = "30 days"; pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds"; pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds"; - pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 1_000_000; + pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10_000; } /// Per-tenant configuration options diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 527fb137cd..11c8617a57 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -478,7 +478,11 @@ async fn timeline_wal_broker_loop_step( None => debug!("No connection candidate was selected for timeline"), } } - None => warn!("Timeline has an active broker subscription, but got no updates. Other data length: {}", all_timeline_updates.len()), + // XXX: If we subscribe for a certain timeline, we expect only its data to come. + // But somebody could propagate a new etcd key, that has the same prefix as the subscribed one, then we'll get odd data. + // This is an error, we don't want to have overlapping prefixes for timelines, but we can complain and thow those away instead of panicking, + // since the next poll might bring the correct data. + None => error!("Timeline has an active broker subscription, but got no updates. Other data length: {}", all_timeline_updates.len()), } }, None => { @@ -625,18 +629,28 @@ impl WalConnectionManager { /// Checks current state against every fetched safekeeper state of a given timeline. /// Returns a new candidate, if the current state is somewhat lagging, or `None` otherwise. /// The current rules for approving new candidates: - /// * pick the safekeeper with biggest `commit_lsn` that's after than pageserver's latest Lsn for the timeline - /// * if the leader is a different SK and either - /// * no WAL updates happened after certain time (either none since the connection time or none since the last event after the connection) — reconnect - /// * same time amount had passed since the connection, WAL updates happened recently, but the new leader SK has timeline Lsn way ahead of the old one — reconnect + /// * pick from the input data from etcd for currently connected safekeeper (if any) + /// * out of the rest input entries, pick one with biggest `commit_lsn` that's after than pageserver's latest Lsn for the timeline + /// * if there's no such entry, no new candidate found, abort + /// * otherwise, check if etcd updates contain currently connected safekeeper + /// * if not, that means no WAL updates happened after certain time (either none since the connection time or none since the last event after the connection) + /// Reconnect if the time exceeds the threshold. + /// * if there's one, compare its Lsn with the other candidate's, reconnect if candidate's over threshold /// /// This way we ensure to keep up with the most up-to-date safekeeper and don't try to jump from one safekeeper to another too frequently. /// Both thresholds are configured per tenant. fn select_connection_candidate( &self, - safekeeper_timelines: HashMap, + mut safekeeper_timelines: HashMap, ) -> Option { - let (&new_sk_id, new_sk_timeline, new_wal_producer_connstr) = safekeeper_timelines + let current_sk_data_updated = + self.wal_connection_data + .as_ref() + .and_then(|connection_data| { + safekeeper_timelines.remove(&connection_data.safekeeper_id) + }); + + let candidate_sk_data = safekeeper_timelines .iter() .filter(|(_, info)| { info.commit_lsn > Some(self.timeline.tline.get_last_record_lsn()) @@ -654,68 +668,78 @@ impl WalConnectionManager { } } }) - .max_by_key(|(_, info, _)| info.commit_lsn)?; + .max_by_key(|(_, info, _)| info.commit_lsn); - match self.wal_connection_data.as_ref() { - None => Some(NewWalConnectionCandidate { - safekeeper_id: new_sk_id, - wal_producer_connstr: new_wal_producer_connstr, - reason: ReconnectReason::NoExistingConnection, - }), - Some(current_connection) => { - if current_connection.safekeeper_id == new_sk_id { - None - } else { - self.reason_to_reconnect(current_connection, new_sk_timeline) - .map(|reason| NewWalConnectionCandidate { - safekeeper_id: new_sk_id, - wal_producer_connstr: new_wal_producer_connstr, - reason, - }) + match (current_sk_data_updated, candidate_sk_data) { + // No better candidate than one we're already connected to: + // whatever data update comes for the connected one, we don't have a better candidate + (_, None) => None, + + // No updates from the old SK in this batch, but some candidate is available: + // check how long time ago did we receive updates from the current SK, switch connections in case it's over the threshold + (None, Some((&new_sk_id, _, new_wal_producer_connstr))) => { + match self.wal_connection_data.as_ref() { + Some(current_connection) => { + let last_sk_interaction_time = + match current_connection.last_wal_receiver_data.as_ref() { + Some((_, data_submission_time)) => *data_submission_time, + None => current_connection.connection_init_time, + }; + + let now = Utc::now().naive_utc(); + match (now - last_sk_interaction_time).to_std() { + Ok(last_interaction) => { + if last_interaction > self.lagging_wal_timeout { + return Some(NewWalConnectionCandidate { + safekeeper_id: new_sk_id, + wal_producer_connstr: new_wal_producer_connstr, + reason: ReconnectReason::NoWalTimeout { + last_wal_interaction: last_sk_interaction_time, + check_time: now, + threshold: self.lagging_wal_timeout, + }, + }); + } + } + Err(_e) => { + warn!("Last interaction with safekeeper {} happened in the future, ignoring the candidate. Interaction time: {last_sk_interaction_time}, now: {now}", current_connection.safekeeper_id); + } + } + None + } + None => Some(NewWalConnectionCandidate { + safekeeper_id: new_sk_id, + wal_producer_connstr: new_wal_producer_connstr, + reason: ReconnectReason::NoExistingConnection, + }), } } - } - } - - fn reason_to_reconnect( - &self, - current_connection: &WalConnectionData, - new_sk_timeline: &SkTimelineInfo, - ) -> Option { - let last_sk_interaction_time = match current_connection.last_wal_receiver_data.as_ref() { - Some((last_wal_receiver_data, data_submission_time)) => { - let new_lsn = new_sk_timeline.commit_lsn?; - match new_lsn.0.checked_sub(last_wal_receiver_data.ps_writelsn) + // Both current SK got updated via etcd and there's another candidate with suitable Lsn: + // check how bigger the new SK Lsn is in the future compared to the current SK, switch connections in case it's over the threshold + ( + Some(current_sk_timeline), + Some((&new_sk_id, new_sk_timeline, new_wal_producer_connstr)), + ) => { + let new_lsn = new_sk_timeline.commit_lsn.unwrap_or(Lsn(0)); + let current_lsn = current_sk_timeline.commit_lsn.unwrap_or(Lsn(0)); + match new_lsn.0.checked_sub(current_lsn.0) { - Some(sk_lsn_advantage) => { - if sk_lsn_advantage >= self.max_lsn_wal_lag.get() { - return Some(ReconnectReason::LaggingWal { current_lsn: Lsn(last_wal_receiver_data.ps_writelsn), new_lsn, threshold: self.max_lsn_wal_lag }); + Some(new_sk_lsn_advantage) => { + if new_sk_lsn_advantage >= self.max_lsn_wal_lag.get() { + return Some( + NewWalConnectionCandidate { + safekeeper_id: new_sk_id, + wal_producer_connstr: new_wal_producer_connstr, + reason: ReconnectReason::LaggingWal { current_lsn, new_lsn, threshold: self.max_lsn_wal_lag }, + }); } } None => debug!("Best SK candidate has its commit Lsn behind the current timeline's latest consistent Lsn"), } - *data_submission_time - } - None => current_connection.connection_init_time, - }; - let now = Utc::now().naive_utc(); - match (now - last_sk_interaction_time).to_std() { - Ok(last_interaction) => { - if last_interaction > self.lagging_wal_timeout { - return Some(ReconnectReason::NoWalTimeout { - last_wal_interaction: last_sk_interaction_time, - check_time: now, - threshold: self.lagging_wal_timeout, - }); - } - } - Err(_e) => { - warn!("Last interaction with safekeeper {} happened in the future, ignoring the candidate. Interaction time: {last_sk_interaction_time}, now: {now}", - current_connection.safekeeper_id); + None } } - None } } @@ -1017,7 +1041,7 @@ mod tests { let mut data_manager_with_connection = dummy_wal_connection_manager(&harness); let connected_sk_id = NodeId(0); - let mut dummy_connection_data = dummy_connection_data(id, NodeId(0)).await; + let mut dummy_connection_data = dummy_connection_data(id, connected_sk_id).await; let lagging_wal_timeout = chrono::Duration::from_std(data_manager_with_connection.lagging_wal_timeout)?; let time_over_threshold = @@ -1092,8 +1116,8 @@ mod tests { } #[tokio::test] - async fn timeout_wal_over_threshcurrent_candidate() -> anyhow::Result<()> { - let harness = RepoHarness::create("timeout_wal_over_threshcurrent_candidate")?; + async fn timeout_wal_over_threshhold_current_candidate() -> anyhow::Result<()> { + let harness = RepoHarness::create("timeout_wal_over_threshhold_current_candidate")?; let current_lsn = Lsn(100_000).align(); let id = ZTenantTimelineId { @@ -1111,36 +1135,20 @@ mod tests { dummy_connection_data.connection_init_time = time_over_threshold; data_manager_with_connection.wal_connection_data = Some(dummy_connection_data); - let new_lsn = Lsn(current_lsn.0 + data_manager_with_connection.max_lsn_wal_lag.get() + 1); let over_threshcurrent_candidate = data_manager_with_connection - .select_connection_candidate(HashMap::from([ - ( - NodeId(0), - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: Some(new_lsn), - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), - pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()), - }, - ), - ( - NodeId(1), - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: Some(current_lsn), - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: Some("not advanced by Lsn safekeeper".to_string()), - pageserver_connstr: Some("not advanced by Lsn safekeeper".to_string()), - }, - ), - ])) + .select_connection_candidate(HashMap::from([( + NodeId(0), + SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(current_lsn), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()), + }, + )])) .expect( "Expected one candidate selected out of multiple valid data options, but got none", );