diff --git a/pageserver/src/walreceiver/connection_manager.rs b/pageserver/src/walreceiver/connection_manager.rs index 0f11a2197a..e8e0a7c52b 100644 --- a/pageserver/src/walreceiver/connection_manager.rs +++ b/pageserver/src/walreceiver/connection_manager.rs @@ -96,6 +96,8 @@ async fn connection_manager_loop_step( info!("Subscribed for etcd timeline changes, waiting for new etcd data"); loop { + let time_until_next_retry = walreceiver_state.time_until_next_retry(); + select! { broker_connection_result = &mut broker_subscription.watcher_handle => { cleanup_broker_connection(broker_connection_result, walreceiver_state); @@ -110,27 +112,23 @@ async fn connection_manager_loop_step( } => { let wal_connection = walreceiver_state.wal_connection.as_mut().expect("Should have a connection, as checked by the corresponding select! guard"); match wal_connection_update { - TaskEvent::Started => { - *walreceiver_state.wal_connection_attempts.entry(wal_connection.sk_id).or_insert(0) += 1; - }, + TaskEvent::Started => {}, TaskEvent::NewEvent(status) => { - if status.has_received_wal { - // Reset connection attempts here only, we know that safekeeper is healthy - // because it can send us a WAL update. - walreceiver_state.wal_connection_attempts.remove(&wal_connection.sk_id); + if status.has_processed_wal { + // We have advanced last_record_lsn by processing the WAL received + // from this safekeeper. This is good enough to clean unsuccessful + // retries history and allow reconnecting to this safekeeper without + // sleeping for a long time. + walreceiver_state.wal_connection_retries.remove(&wal_connection.sk_id); } wal_connection.status = status; }, TaskEvent::End(end_result) => { match end_result { Ok(()) => debug!("WAL receiving task finished"), - Err(e) => { - warn!("WAL receiving task failed: {e}"); - // If the task failed, set the connection attempts to at least 1, to try other safekeepers. - let _ = *walreceiver_state.wal_connection_attempts.entry(wal_connection.sk_id).or_insert(1); - } + Err(e) => warn!("WAL receiving task failed: {e}"), }; - walreceiver_state.wal_connection = None; + walreceiver_state.drop_old_connection(false).await; }, } }, @@ -154,6 +152,8 @@ async fn connection_manager_loop_step( } } }, + + _ = async { tokio::time::sleep(time_until_next_retry.unwrap()).await }, if time_until_next_retry.is_some() => {} } // Fetch more etcd timeline updates, but limit ourselves since they may arrive quickly. @@ -234,6 +234,10 @@ async fn subscribe_for_timeline_updates( } } +const WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS: f64 = 0.1; +const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0; +const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5; + /// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible. struct WalreceiverState { id: ZTenantTimelineId, @@ -247,7 +251,8 @@ struct WalreceiverState { max_lsn_wal_lag: NonZeroU64, /// Current connection to safekeeper for WAL streaming. wal_connection: Option, - wal_connection_attempts: HashMap, + /// Info about retries and unsuccessful attempts to connect to safekeepers. + wal_connection_retries: HashMap, /// Data about all timelines, available for connection, fetched from etcd, grouped by their corresponding safekeeper node id. wal_stream_candidates: HashMap, } @@ -255,6 +260,8 @@ struct WalreceiverState { /// Current connection data. #[derive(Debug)] struct WalConnection { + /// Time when the connection was initiated. + started_at: NaiveDateTime, /// Current safekeeper pageserver is connected to for WAL streaming. sk_id: NodeId, /// Status of the connection. @@ -274,6 +281,12 @@ struct NewCommittedWAL { discovered_at: NaiveDateTime, } +#[derive(Debug)] +struct RetryInfo { + next_retry_at: Option, + retry_duration_seconds: f64, +} + /// Data about the timeline to connect to, received from etcd. #[derive(Debug)] struct EtcdSkTimeline { @@ -300,31 +313,18 @@ impl WalreceiverState { max_lsn_wal_lag, wal_connection: None, wal_stream_candidates: HashMap::new(), - wal_connection_attempts: HashMap::new(), + wal_connection_retries: HashMap::new(), } } /// Shuts down the current connection (if any) and immediately starts another one with the given connection string. async fn change_connection(&mut self, new_sk_id: NodeId, new_wal_source_connstr: String) { - if let Some(old_connection) = self.wal_connection.take() { - old_connection.connection_task.shutdown().await - } + self.drop_old_connection(true).await; let id = self.id; let connect_timeout = self.wal_connect_timeout; - let connection_attempt = self - .wal_connection_attempts - .get(&new_sk_id) - .copied() - .unwrap_or(0); let connection_handle = TaskHandle::spawn(move |events_sender, cancellation| { async move { - exponential_backoff( - connection_attempt, - DEFAULT_BASE_BACKOFF_SECONDS, - DEFAULT_MAX_BACKOFF_SECONDS, - ) - .await; super::walreceiver_connection::handle_walreceiver_connection( id, &new_wal_source_connstr, @@ -340,10 +340,11 @@ impl WalreceiverState { let now = Utc::now().naive_utc(); self.wal_connection = Some(WalConnection { + started_at: now, sk_id: new_sk_id, status: WalConnectionStatus { is_connected: false, - has_received_wal: false, + has_processed_wal: false, latest_connection_update: now, latest_wal_update: now, streaming_lsn: None, @@ -354,6 +355,71 @@ impl WalreceiverState { }); } + /// Drops the current connection (if any) and updates retry timeout for the next + /// connection attempt to the same safekeeper. + async fn drop_old_connection(&mut self, needs_shutdown: bool) { + let wal_connection = match self.wal_connection.take() { + Some(wal_connection) => wal_connection, + None => return, + }; + + if needs_shutdown { + wal_connection.connection_task.shutdown().await; + } + + let retry = self + .wal_connection_retries + .entry(wal_connection.sk_id) + .or_insert(RetryInfo { + next_retry_at: None, + retry_duration_seconds: WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS, + }); + + let now = Utc::now().naive_utc(); + + // Schedule the next retry attempt. We want to have exponential backoff for connection attempts, + // and we add backoff to the time when we started the connection attempt. If the connection + // was active for a long time, then next_retry_at will be in the past. + retry.next_retry_at = + wal_connection + .started_at + .checked_add_signed(chrono::Duration::milliseconds( + (retry.retry_duration_seconds * 1000.0) as i64, + )); + + if let Some(next) = &retry.next_retry_at { + if next > &now { + info!( + "Next connection retry to {:?} is at {}", + wal_connection.sk_id, next + ); + } + } + + let next_retry_duration = + retry.retry_duration_seconds * WALCONNECTION_RETRY_BACKOFF_MULTIPLIER; + // Clamp the next retry duration to the maximum allowed. + let next_retry_duration = next_retry_duration.min(WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS); + // Clamp the next retry duration to the minimum allowed. + let next_retry_duration = next_retry_duration.max(WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS); + + retry.retry_duration_seconds = next_retry_duration; + } + + /// Returns time needed to wait to have a new candidate for WAL streaming. + fn time_until_next_retry(&self) -> Option { + let now = Utc::now().naive_utc(); + + let next_retry_at = self + .wal_connection_retries + .values() + .filter_map(|retry| retry.next_retry_at) + .filter(|next_retry_at| next_retry_at > &now) + .min(); + + next_retry_at.and_then(|next_retry_at| (next_retry_at - now).to_std().ok()) + } + /// Adds another etcd timeline into the state, if its more recent than the one already added there for the same key. fn register_timeline_update(&mut self, timeline_update: BrokerUpdate) { match self @@ -547,52 +613,37 @@ impl WalreceiverState { /// Optionally, omits the given node, to support gracefully switching from a healthy safekeeper to another. /// /// The candidate that is chosen: - /// * has fewest connection attempts from pageserver to safekeeper node (reset every time we receive a WAL message from the node) - /// * has greatest data Lsn among the ones that are left - /// - /// NOTE: - /// We evict timeline data received from etcd based on time passed since it was registered, along with its connection attempts values, but - /// otherwise to reset the connection attempts, a successful connection to that node is needed. - /// That won't happen now, before all nodes with less connection attempts are connected to first, which might leave the sk node with more advanced state to be ignored. + /// * has no pending retry cooldown + /// * has greatest commit_lsn among the ones that are left fn select_connection_candidate( &self, node_to_omit: Option, ) -> Option<(NodeId, &SkTimelineInfo, String)> { - let all_candidates = self - .applicable_connection_candidates() + self.applicable_connection_candidates() .filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit) - .collect::>(); - - let smallest_attempts_allowed = all_candidates - .iter() - .map(|(sk_id, _, _)| { - self.wal_connection_attempts - .get(sk_id) - .copied() - .unwrap_or(0) - }) - .min()?; - - all_candidates - .into_iter() - .filter(|(sk_id, _, _)| { - smallest_attempts_allowed - >= self - .wal_connection_attempts - .get(sk_id) - .copied() - .unwrap_or(0) - }) .max_by_key(|(_, info, _)| info.commit_lsn) } /// Returns a list of safekeepers that have valid info and ready for connection. + /// Some safekeepers are filtered by the retry cooldown. fn applicable_connection_candidates( &self, ) -> impl Iterator { + let now = Utc::now().naive_utc(); + self.wal_stream_candidates .iter() .filter(|(_, info)| info.timeline.commit_lsn.is_some()) + .filter(move |(sk_id, _)| { + let next_retry_at = self + .wal_connection_retries + .get(sk_id) + .and_then(|retry_info| { + retry_info.next_retry_at + }); + + next_retry_at.is_none() || next_retry_at.unwrap() <= now + }) .filter_map(|(sk_id, etcd_info)| { let info = &etcd_info.timeline; match wal_stream_connection_string( @@ -627,7 +678,7 @@ impl WalreceiverState { }); for node_id in node_ids_to_remove { - self.wal_connection_attempts.remove(&node_id); + self.wal_connection_retries.remove(&node_id); } } } @@ -684,7 +735,6 @@ fn wal_stream_connection_string( #[cfg(test)] mod tests { - use crate::repository::{ repo_harness::{RepoHarness, TIMELINE_ID}, Repository, @@ -789,7 +839,7 @@ mod tests { let connection_status = WalConnectionStatus { is_connected: true, - has_received_wal: true, + has_processed_wal: true, latest_connection_update: now, latest_wal_update: now, commit_lsn: Some(Lsn(current_lsn)), @@ -798,6 +848,7 @@ mod tests { state.max_lsn_wal_lag = NonZeroU64::new(100).unwrap(); state.wal_connection = Some(WalConnection { + started_at: now, sk_id: connected_sk_id, status: connection_status.clone(), connection_task: TaskHandle::spawn(move |sender, _| async move { @@ -1017,7 +1068,13 @@ mod tests { }, ), ]); - state.wal_connection_attempts = HashMap::from([(NodeId(0), 1), (NodeId(1), 0)]); + state.wal_connection_retries = HashMap::from([( + NodeId(0), + RetryInfo { + next_retry_at: now.checked_add_signed(chrono::Duration::hours(1)), + retry_duration_seconds: WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS, + }, + )]); let candidate_with_less_errors = state .next_connection_candidate() @@ -1025,7 +1082,7 @@ mod tests { assert_eq!( candidate_with_less_errors.safekeeper_id, NodeId(1), - "Should select the node with less connection errors" + "Should select the node with no pending retry cooldown" ); Ok(()) @@ -1043,7 +1100,7 @@ mod tests { let connection_status = WalConnectionStatus { is_connected: true, - has_received_wal: true, + has_processed_wal: true, latest_connection_update: now, latest_wal_update: now, commit_lsn: Some(current_lsn), @@ -1051,6 +1108,7 @@ mod tests { }; state.wal_connection = Some(WalConnection { + started_at: now, sk_id: connected_sk_id, status: connection_status.clone(), connection_task: TaskHandle::spawn(move |sender, _| async move { @@ -1130,7 +1188,7 @@ mod tests { let connection_status = WalConnectionStatus { is_connected: true, - has_received_wal: true, + has_processed_wal: true, latest_connection_update: time_over_threshold, latest_wal_update: time_over_threshold, commit_lsn: Some(current_lsn), @@ -1138,6 +1196,7 @@ mod tests { }; state.wal_connection = Some(WalConnection { + started_at: now, sk_id: NodeId(1), status: connection_status.clone(), connection_task: TaskHandle::spawn(move |sender, _| async move { @@ -1202,7 +1261,7 @@ mod tests { let connection_status = WalConnectionStatus { is_connected: true, - has_received_wal: true, + has_processed_wal: true, latest_connection_update: now, latest_wal_update: time_over_threshold, commit_lsn: Some(current_lsn), @@ -1210,6 +1269,7 @@ mod tests { }; state.wal_connection = Some(WalConnection { + started_at: now, sk_id: NodeId(1), status: connection_status, connection_task: TaskHandle::spawn(move |_, _| async move { Ok(()) }), @@ -1281,7 +1341,7 @@ mod tests { max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(), wal_connection: None, wal_stream_candidates: HashMap::new(), - wal_connection_attempts: HashMap::new(), + wal_connection_retries: HashMap::new(), } } } diff --git a/pageserver/src/walreceiver/walreceiver_connection.rs b/pageserver/src/walreceiver/walreceiver_connection.rs index 0688086117..025bfeb506 100644 --- a/pageserver/src/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -35,8 +35,9 @@ use utils::{lsn::Lsn, pq_proto::ReplicationFeedback, zid::ZTenantTimelineId}; pub struct WalConnectionStatus { /// If we were able to initiate a postgres connection, this means that safekeeper process is at least running. pub is_connected: bool, - /// Defines a healthy connection as one on which we have received at least some WAL bytes. - pub has_received_wal: bool, + /// Defines a healthy connection as one on which pageserver received WAL from safekeeper + /// and is able to process it in walingest without errors. + pub has_processed_wal: bool, /// Connection establishment time or the timestamp of a latest connection message received. pub latest_connection_update: NaiveDateTime, /// Time of the latest WAL message received. @@ -71,7 +72,7 @@ pub async fn handle_walreceiver_connection( info!("connected!"); let mut connection_status = WalConnectionStatus { is_connected: true, - has_received_wal: false, + has_processed_wal: false, latest_connection_update: Utc::now().naive_utc(), latest_wal_update: Utc::now().naive_utc(), streaming_lsn: None, @@ -117,13 +118,6 @@ pub async fn handle_walreceiver_connection( let identify = identify_system(&mut replication_client).await?; info!("{identify:?}"); - connection_status.latest_connection_update = Utc::now().naive_utc(); - if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) { - warn!("Wal connection event listener dropped after IDENTIFY_SYSTEM, aborting the connection: {e}"); - return Ok(()); - } - - // NB: this is a flush_lsn, not a commit_lsn. let end_of_wal = Lsn::from(u64::from(identify.xlogpos)); let mut caught_up = false; let ZTenantTimelineId { @@ -131,6 +125,14 @@ pub async fn handle_walreceiver_connection( timeline_id, } = id; + connection_status.latest_connection_update = Utc::now().naive_utc(); + connection_status.latest_wal_update = Utc::now().naive_utc(); + connection_status.commit_lsn = Some(end_of_wal); + if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) { + warn!("Wal connection event listener dropped after IDENTIFY_SYSTEM, aborting the connection: {e}"); + return Ok(()); + } + let (repo, timeline) = tokio::task::spawn_blocking(move || { let repo = tenant_mgr::get_repository_for_tenant(tenant_id) .with_context(|| format!("no repository found for tenant {tenant_id}"))?; @@ -181,6 +183,7 @@ pub async fn handle_walreceiver_connection( } { let replication_message = replication_message?; let now = Utc::now().naive_utc(); + let last_rec_lsn_before_msg = last_rec_lsn; // Update the connection status before processing the message. If the message processing // fails (e.g. in walingest), we still want to know latests LSNs from the safekeeper. @@ -193,7 +196,6 @@ pub async fn handle_walreceiver_connection( )); if !xlog_data.data().is_empty() { connection_status.latest_wal_update = now; - connection_status.has_received_wal = true; } } ReplicationMessage::PrimaryKeepAlive(keepalive) => { @@ -265,6 +267,15 @@ pub async fn handle_walreceiver_connection( _ => None, }; + if !connection_status.has_processed_wal && last_rec_lsn > last_rec_lsn_before_msg { + // We have successfully processed at least one WAL record. + connection_status.has_processed_wal = true; + if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) { + warn!("Wal connection event listener dropped, aborting the connection: {e}"); + return Ok(()); + } + } + let timeline_to_check = Arc::clone(&timeline); tokio::task::spawn_blocking(move || timeline_to_check.check_checkpoint_distance()) .await diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index 63bc9bd517..c90c2a0446 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -90,7 +90,10 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler { fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: &str) -> Result<()> { let cmd = parse_cmd(query_string)?; - info!("got query {:?}", query_string); + info!( + "got query {:?} in timeline {:?}", + query_string, self.ztimelineid + ); let create = !(matches!(cmd, SafekeeperPostgresCommand::StartReplication { .. }) || matches!(cmd, SafekeeperPostgresCommand::IdentifySystem)); @@ -106,23 +109,17 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler { } match cmd { - SafekeeperPostgresCommand::StartWalPush => { - ReceiveWalConn::new(pgb) - .run(self) - .context("failed to run ReceiveWalConn")?; - } - SafekeeperPostgresCommand::StartReplication { start_lsn } => { - ReplicationConn::new(pgb) - .run(self, pgb, start_lsn) - .context("failed to run ReplicationConn")?; - } - SafekeeperPostgresCommand::IdentifySystem => { - self.handle_identify_system(pgb)?; - } - SafekeeperPostgresCommand::JSONCtrl { ref cmd } => { - handle_json_ctrl(self, pgb, cmd)?; - } + SafekeeperPostgresCommand::StartWalPush => ReceiveWalConn::new(pgb) + .run(self) + .context("failed to run ReceiveWalConn"), + SafekeeperPostgresCommand::StartReplication { start_lsn } => ReplicationConn::new(pgb) + .run(self, pgb, start_lsn) + .context("failed to run ReplicationConn"), + SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb), + SafekeeperPostgresCommand::JSONCtrl { ref cmd } => handle_json_ctrl(self, pgb, cmd), } + .context(format!("timeline {timelineid}"))?; + Ok(()) } } @@ -153,8 +150,15 @@ impl SafekeeperPostgresHandler { /// Handle IDENTIFY_SYSTEM replication command /// fn handle_identify_system(&mut self, pgb: &mut PostgresBackend) -> Result<()> { - let start_pos = self.timeline.get().get_end_of_wal(); - let lsn = start_pos.to_string(); + let lsn = if self.is_walproposer_recovery() { + // walproposer should get all local WAL until flush_lsn + self.timeline.get().get_end_of_wal() + } else { + // other clients shouldn't get any uncommitted WAL + self.timeline.get().get_state().0.commit_lsn + } + .to_string(); + let sysid = self .timeline .get() @@ -203,4 +207,11 @@ impl SafekeeperPostgresHandler { .write_message(&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM"))?; Ok(()) } + + /// Returns true if current connection is a replication connection, originating + /// from a walproposer recovery function. This connection gets a special handling: + /// safekeeper must stream all local WAL till the flush_lsn, whether committed or not. + pub fn is_walproposer_recovery(&self) -> bool { + self.appname == Some("wal_proposer_recovery".to_string()) + } } diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 243d7bf7d0..97ec945c3e 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -170,6 +170,7 @@ impl ReplicationConn { // spawn the background thread which receives HotStandbyFeedback messages. let bg_timeline = Arc::clone(spg.timeline.get()); let bg_stream_in = self.stream_in.take().unwrap(); + let bg_timeline_id = spg.ztimelineid.unwrap(); let state = ReplicaState::new(); // This replica_id is used below to check if it's time to stop replication. @@ -188,6 +189,8 @@ impl ReplicationConn { let _ = thread::Builder::new() .name("HotStandbyFeedback thread".into()) .spawn(move || { + let _enter = + info_span!("HotStandbyFeedback thread", timeline = %bg_timeline_id).entered(); if let Err(err) = Self::background_thread(bg_stream_in, bg_replica_guard) { error!("Replication background thread failed: {}", err); } @@ -198,13 +201,12 @@ impl ReplicationConn { .build()?; runtime.block_on(async move { - let (_, persisted_state) = spg.timeline.get().get_state(); + let (inmem_state, persisted_state) = spg.timeline.get().get_state(); // add persisted_state.timeline_start_lsn == Lsn(0) check if persisted_state.server.wal_seg_size == 0 { bail!("Cannot start replication before connecting to walproposer"); } - let wal_end = spg.timeline.get().get_end_of_wal(); // Walproposer gets special handling: safekeeper must give proposer all // local WAL till the end, whether committed or not (walproposer will // hang otherwise). That's because walproposer runs the consensus and @@ -214,8 +216,8 @@ impl ReplicationConn { // another compute rises which collects majority and starts fixing log // on this safekeeper itself. That's ok as (old) proposer will never be // able to commit such WAL. - let stop_pos: Option = if spg.appname == Some("wal_proposer_recovery".to_string()) - { + let stop_pos: Option = if spg.is_walproposer_recovery() { + let wal_end = spg.timeline.get().get_end_of_wal(); Some(wal_end) } else { None @@ -226,7 +228,7 @@ impl ReplicationConn { // switch to copy pgb.write_message(&BeMessage::CopyBothResponse)?; - let mut end_pos = Lsn(0); + let mut end_pos = stop_pos.unwrap_or(inmem_state.commit_lsn); let mut wal_reader = WalReader::new( spg.conf.timeline_dir(&spg.timeline.get().zttid),