From 116ecdf87a94d486b60911c0a95ec3e949f03202 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Mon, 15 Aug 2022 13:31:26 +0300 Subject: [PATCH] Improve walreceiver logic (#2253) This patch makes walreceiver logic more complicated, but it should work better in most cases. Added `test_wal_lagging` to test scenarios where alive safekeepers can lag behind other alive safekeepers. - There was a bug which looks like `etcd_info.timeline.commit_lsn > Some(self.local_timeline.get_last_record_lsn())` filtered all safekeepers in some strange cases. I removed this filter, it should probably help with #2237 - Now walreceiver_connection reports status, including commit_lsn. This allows keeping safekeeper connection even when etcd is down. - Safekeeper connection now fails if pageserver doesn't receive safekeeper messages for some time. Usually safekeeper sends messages at least once per second. - `LaggingWal` check now uses `commit_lsn` directly from safekeeper. This fixes the issue with often reconnects, when compute generates WAL really fast. - `NoWalTimeout` is rewritten to trigger only when we know about the new WAL and the connected safekeeper doesn't stream any WAL. This allows setting a small `lagging_wal_timeout` because it will trigger only when we observe that the connected safekeeper has stuck. --- pageserver/src/tenant_config.rs | 2 +- .../src/walreceiver/connection_manager.rs | 402 +++++++++++------- .../src/walreceiver/walreceiver_connection.rs | 77 +++- test_runner/batch_others/test_wal_acceptor.py | 8 +- .../batch_others/test_wal_acceptor_async.py | 65 +++ 5 files changed, 380 insertions(+), 174 deletions(-) diff --git a/pageserver/src/tenant_config.rs b/pageserver/src/tenant_config.rs index eff5272837..73bf3636d2 100644 --- a/pageserver/src/tenant_config.rs +++ b/pageserver/src/tenant_config.rs @@ -37,7 +37,7 @@ pub mod defaults { pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3; pub const DEFAULT_PITR_INTERVAL: &str = "30 days"; pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds"; - pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds"; + pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "3 seconds"; pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024; } diff --git a/pageserver/src/walreceiver/connection_manager.rs b/pageserver/src/walreceiver/connection_manager.rs index 09142c4d44..2722bc7320 100644 --- a/pageserver/src/walreceiver/connection_manager.rs +++ b/pageserver/src/walreceiver/connection_manager.rs @@ -17,7 +17,7 @@ use std::{ }; use anyhow::Context; -use chrono::{DateTime, Local, NaiveDateTime, Utc}; +use chrono::{NaiveDateTime, Utc}; use etcd_broker::{ subscription_key::SubscriptionKey, subscription_value::SkTimelineInfo, BrokerSubscription, BrokerUpdate, Client, @@ -33,11 +33,10 @@ use crate::{ use crate::{RepositoryImpl, TimelineImpl}; use utils::{ lsn::Lsn, - pq_proto::ReplicationFeedback, zid::{NodeId, ZTenantTimelineId}, }; -use super::{TaskEvent, TaskHandle}; +use super::{walreceiver_connection::WalConnectionStatus, TaskEvent, TaskHandle}; /// Spawns the loop to take care of the timeline's WAL streaming connection. pub(super) fn spawn_connection_manager_task( @@ -114,21 +113,26 @@ async fn connection_manager_loop_step( } } => { let wal_connection = walreceiver_state.wal_connection.as_mut().expect("Should have a connection, as checked by the corresponding select! guard"); - match &wal_connection_update { + match wal_connection_update { TaskEvent::Started => { - wal_connection.latest_connection_update = Utc::now().naive_utc(); *walreceiver_state.wal_connection_attempts.entry(wal_connection.sk_id).or_insert(0) += 1; }, - TaskEvent::NewEvent(replication_feedback) => { - wal_connection.latest_connection_update = DateTime::::from(replication_feedback.ps_replytime).naive_utc(); - // reset connection attempts here only, the only place where both nodes - // explicitly confirmn with replication feedback that they are connected to each other - walreceiver_state.wal_connection_attempts.remove(&wal_connection.sk_id); + TaskEvent::NewEvent(status) => { + if status.has_received_wal { + // Reset connection attempts here only, we know that safekeeper is healthy + // because it can send us a WAL update. + walreceiver_state.wal_connection_attempts.remove(&wal_connection.sk_id); + } + wal_connection.status = status; }, TaskEvent::End(end_result) => { match end_result { Ok(()) => debug!("WAL receiving task finished"), - Err(e) => warn!("WAL receiving task failed: {e}"), + Err(e) => { + warn!("WAL receiving task failed: {e}"); + // If the task failed, set the connection attempts to at least 1, to try other safekeepers. + let _ = *walreceiver_state.wal_connection_attempts.entry(wal_connection.sk_id).or_insert(1); + } }; walreceiver_state.wal_connection = None; }, @@ -257,10 +261,21 @@ struct WalreceiverState { struct WalConnection { /// Current safekeeper pageserver is connected to for WAL streaming. sk_id: NodeId, - /// Connection task start time or the timestamp of a latest connection message received. - latest_connection_update: NaiveDateTime, + /// Status of the connection. + status: WalConnectionStatus, /// WAL streaming task handle. - connection_task: TaskHandle, + connection_task: TaskHandle, + /// Have we discovered that other safekeeper has more recent WAL than we do? + discovered_new_wal: Option, +} + +/// Notion of a new committed WAL, which exists on other safekeeper. +#[derive(Debug, Clone, Copy)] +struct NewCommittedWAL { + /// LSN of the new committed WAL. + lsn: Lsn, + /// When we discovered that the new committed WAL exists on other safekeeper. + discovered_at: NaiveDateTime, } /// Data about the timeline to connect to, received from etcd. @@ -327,10 +342,19 @@ impl WalreceiverState { .instrument(info_span!("walreceiver_connection", id = %id)) }); + let now = Utc::now().naive_utc(); self.wal_connection = Some(WalConnection { sk_id: new_sk_id, - latest_connection_update: Utc::now().naive_utc(), + status: WalConnectionStatus { + is_connected: false, + has_received_wal: false, + latest_connection_update: now, + latest_wal_update: now, + streaming_lsn: None, + commit_lsn: None, + }, connection_task: connection_handle, + discovered_new_wal: None, }); } @@ -361,14 +385,16 @@ impl WalreceiverState { /// Cleans up stale etcd records and checks the rest for the new connection candidate. /// Returns a new candidate, if the current state is absent or somewhat lagging, `None` otherwise. /// The current rules for approving new candidates: - /// * pick from the input data from etcd for currently connected safekeeper (if any) - /// * out of the rest input entries, pick one with biggest `commit_lsn` that's after than pageserver's latest Lsn for the timeline + /// * pick a candidate different from the connected safekeeper with biggest `commit_lsn` and lowest failed connection attemps /// * if there's no such entry, no new candidate found, abort - /// * check the current connection time data for staleness, reconnect if stale - /// * otherwise, check if etcd updates contain currently connected safekeeper - /// * if not, that means no WAL updates happened after certain time (either none since the connection time or none since the last event after the connection) - /// Reconnect if the time exceeds the threshold. - /// * if there's one, compare its Lsn with the other candidate's, reconnect if candidate's over threshold + /// * otherwise check if the candidate is much better than the current one + /// + /// To understand exact rules for determining if the candidate is better than the current one, refer to this function's implementation. + /// General rules are following: + /// * if connected safekeeper is not present, pick the candidate + /// * if we haven't received any updates for some time, pick the candidate + /// * if the candidate commit_lsn is much higher than the current one, pick the candidate + /// * if connected safekeeper stopped sending us new WAL which is available on other safekeeper, pick the candidate /// /// This way we ensure to keep up with the most up-to-date safekeeper and don't try to jump from one safekeeper to another too frequently. /// Both thresholds are configured per tenant. @@ -384,53 +410,128 @@ impl WalreceiverState { let now = Utc::now().naive_utc(); if let Ok(latest_interaciton) = - (now - existing_wal_connection.latest_connection_update).to_std() + (now - existing_wal_connection.status.latest_connection_update).to_std() { - if latest_interaciton > self.lagging_wal_timeout { + // Drop connection if we haven't received keepalive message for a while. + if latest_interaciton > self.wal_connect_timeout { return Some(NewWalConnectionCandidate { safekeeper_id: new_sk_id, wal_source_connstr: new_wal_source_connstr, - reason: ReconnectReason::NoWalTimeout { - last_wal_interaction: Some( - existing_wal_connection.latest_connection_update, + reason: ReconnectReason::NoKeepAlives { + last_keep_alive: Some( + existing_wal_connection.status.latest_connection_update, ), check_time: now, - threshold: self.lagging_wal_timeout, + threshold: self.wal_connect_timeout, }, }); } } - match self.wal_stream_candidates.get(&connected_sk_node) { - Some(current_connection_etcd_data) => { - let new_lsn = new_safekeeper_etcd_data.commit_lsn.unwrap_or(Lsn(0)); - let current_lsn = current_connection_etcd_data - .timeline - .commit_lsn - .unwrap_or(Lsn(0)); - match new_lsn.0.checked_sub(current_lsn.0) - { - Some(new_sk_lsn_advantage) => { - if new_sk_lsn_advantage >= self.max_lsn_wal_lag.get() { - return Some( - NewWalConnectionCandidate { - safekeeper_id: new_sk_id, - wal_source_connstr: new_wal_source_connstr, - reason: ReconnectReason::LaggingWal { current_lsn, new_lsn, threshold: self.max_lsn_wal_lag }, - }); - } - } - None => debug!("Best SK candidate has its commit Lsn behind the current timeline's latest consistent Lsn"), + if !existing_wal_connection.status.is_connected { + // We haven't connected yet and we shouldn't switch until connection timeout (condition above). + return None; + } + + if let Some(current_commit_lsn) = existing_wal_connection.status.commit_lsn { + let new_commit_lsn = new_safekeeper_etcd_data.commit_lsn.unwrap_or(Lsn(0)); + // Check if the new candidate has much more WAL than the current one. + match new_commit_lsn.0.checked_sub(current_commit_lsn.0) { + Some(new_sk_lsn_advantage) => { + if new_sk_lsn_advantage >= self.max_lsn_wal_lag.get() { + return Some(NewWalConnectionCandidate { + safekeeper_id: new_sk_id, + wal_source_connstr: new_wal_source_connstr, + reason: ReconnectReason::LaggingWal { + current_commit_lsn, + new_commit_lsn, + threshold: self.max_lsn_wal_lag, + }, + }); } - } - None => { - return Some(NewWalConnectionCandidate { - safekeeper_id: new_sk_id, - wal_source_connstr: new_wal_source_connstr, - reason: ReconnectReason::NoEtcdDataForExistingConnection, - }) + } + None => debug!( + "Best SK candidate has its commit_lsn behind connected SK's commit_lsn" + ), } } + + let current_lsn = match existing_wal_connection.status.streaming_lsn { + Some(lsn) => lsn, + None => self.local_timeline.get_last_record_lsn(), + }; + let current_commit_lsn = existing_wal_connection + .status + .commit_lsn + .unwrap_or(current_lsn); + let candidate_commit_lsn = new_safekeeper_etcd_data.commit_lsn.unwrap_or(Lsn(0)); + + // Keep discovered_new_wal only if connected safekeeper has not caught up yet. + let mut discovered_new_wal = existing_wal_connection + .discovered_new_wal + .filter(|new_wal| new_wal.lsn > current_commit_lsn); + + if discovered_new_wal.is_none() { + // Check if the new candidate has more WAL than the current one. + // If the new candidate has more WAL than the current one, we consider switching to the new candidate. + discovered_new_wal = if candidate_commit_lsn > current_commit_lsn { + trace!( + "New candidate has commit_lsn {}, higher than current_commit_lsn {}", + candidate_commit_lsn, + current_commit_lsn + ); + Some(NewCommittedWAL { + lsn: candidate_commit_lsn, + discovered_at: Utc::now().naive_utc(), + }) + } else { + None + }; + } + + let waiting_for_new_lsn_since = if current_lsn < current_commit_lsn { + // Connected safekeeper has more WAL, but we haven't received updates for some time. + trace!( + "Connected safekeeper has more WAL, but we haven't received updates for {:?}. current_lsn: {}, current_commit_lsn: {}", + (now - existing_wal_connection.status.latest_wal_update).to_std(), + current_lsn, + current_commit_lsn + ); + Some(existing_wal_connection.status.latest_wal_update) + } else { + discovered_new_wal.as_ref().map(|new_wal| { + // We know that new WAL is available on other safekeeper, but connected safekeeper don't have it. + new_wal + .discovered_at + .max(existing_wal_connection.status.latest_wal_update) + }) + }; + + // If we haven't received any WAL updates for a while and candidate has more WAL, switch to it. + if let Some(waiting_for_new_lsn_since) = waiting_for_new_lsn_since { + if let Ok(waiting_for_new_wal) = (now - waiting_for_new_lsn_since).to_std() { + if candidate_commit_lsn > current_commit_lsn + && waiting_for_new_wal > self.lagging_wal_timeout + { + return Some(NewWalConnectionCandidate { + safekeeper_id: new_sk_id, + wal_source_connstr: new_wal_source_connstr, + reason: ReconnectReason::NoWalTimeout { + current_lsn, + current_commit_lsn, + candidate_commit_lsn, + last_wal_interaction: Some( + existing_wal_connection.status.latest_wal_update, + ), + check_time: now, + threshold: self.lagging_wal_timeout, + }, + }); + } + } + } + + self.wal_connection.as_mut().unwrap().discovered_new_wal = discovered_new_wal; } None => { let (new_sk_id, _, new_wal_source_connstr) = @@ -450,7 +551,7 @@ impl WalreceiverState { /// Optionally, omits the given node, to support gracefully switching from a healthy safekeeper to another. /// /// The candidate that is chosen: - /// * has fewest connection attempts from pageserver to safekeeper node (reset every time the WAL replication feedback is sent) + /// * has fewest connection attempts from pageserver to safekeeper node (reset every time we receive a WAL message from the node) /// * has greatest data Lsn among the ones that are left /// /// NOTE: @@ -489,14 +590,13 @@ impl WalreceiverState { .max_by_key(|(_, info, _)| info.commit_lsn) } + /// Returns a list of safekeepers that have valid info and ready for connection. fn applicable_connection_candidates( &self, ) -> impl Iterator { self.wal_stream_candidates .iter() - .filter(|(_, etcd_info)| { - etcd_info.timeline.commit_lsn > Some(self.local_timeline.get_last_record_lsn()) - }) + .filter(|(_, info)| info.timeline.commit_lsn.is_some()) .filter_map(|(sk_id, etcd_info)| { let info = &etcd_info.timeline; match wal_stream_connection_string( @@ -512,6 +612,7 @@ impl WalreceiverState { }) } + /// Remove candidates which haven't sent etcd updates for a while. fn cleanup_old_candidates(&mut self) { let mut node_ids_to_remove = Vec::with_capacity(self.wal_stream_candidates.len()); @@ -546,17 +647,24 @@ struct NewWalConnectionCandidate { #[derive(Debug, PartialEq, Eq)] enum ReconnectReason { NoExistingConnection, - NoEtcdDataForExistingConnection, LaggingWal { - current_lsn: Lsn, - new_lsn: Lsn, + current_commit_lsn: Lsn, + new_commit_lsn: Lsn, threshold: NonZeroU64, }, NoWalTimeout { + current_lsn: Lsn, + current_commit_lsn: Lsn, + candidate_commit_lsn: Lsn, last_wal_interaction: Option, check_time: NaiveDateTime, threshold: Duration, }, + NoKeepAlives { + last_keep_alive: Option, + check_time: NaiveDateTime, + threshold: Duration, + }, } fn wal_stream_connection_string( @@ -580,7 +688,6 @@ fn wal_stream_connection_string( #[cfg(test)] mod tests { - use std::time::SystemTime; use crate::repository::{ repo_harness::{RepoHarness, TIMELINE_ID}, @@ -658,7 +765,7 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, - safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + safekeeper_connstr: None, }, etcd_version: 0, latest_update: delay_over_threshold, @@ -684,22 +791,26 @@ mod tests { let connected_sk_id = NodeId(0); let current_lsn = 100_000; + let connection_status = WalConnectionStatus { + is_connected: true, + has_received_wal: true, + latest_connection_update: now, + latest_wal_update: now, + commit_lsn: Some(Lsn(current_lsn)), + streaming_lsn: Some(Lsn(current_lsn)), + }; + state.max_lsn_wal_lag = NonZeroU64::new(100).unwrap(); state.wal_connection = Some(WalConnection { sk_id: connected_sk_id, - latest_connection_update: now, + status: connection_status.clone(), connection_task: TaskHandle::spawn(move |sender, _| async move { sender - .send(TaskEvent::NewEvent(ReplicationFeedback { - current_timeline_size: 1, - ps_writelsn: 1, - ps_applylsn: current_lsn, - ps_flushlsn: 1, - ps_replytime: SystemTime::now(), - })) + .send(TaskEvent::NewEvent(connection_status.clone())) .ok(); Ok(()) }), + discovered_new_wal: None, }); state.wal_stream_candidates = HashMap::from([ ( @@ -924,65 +1035,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn connection_no_etcd_data_candidate() -> anyhow::Result<()> { - let harness = RepoHarness::create("connection_no_etcd_data_candidate")?; - let mut state = dummy_state(&harness); - - let now = Utc::now().naive_utc(); - let current_lsn = Lsn(100_000).align(); - let connected_sk_id = NodeId(0); - let other_sk_id = NodeId(connected_sk_id.0 + 1); - - state.wal_connection = Some(WalConnection { - sk_id: connected_sk_id, - latest_connection_update: now, - connection_task: TaskHandle::spawn(move |sender, _| async move { - sender - .send(TaskEvent::NewEvent(ReplicationFeedback { - current_timeline_size: 1, - ps_writelsn: current_lsn.0, - ps_applylsn: 1, - ps_flushlsn: 1, - ps_replytime: SystemTime::now(), - })) - .ok(); - Ok(()) - }), - }); - state.wal_stream_candidates = HashMap::from([( - other_sk_id, - EtcdSkTimeline { - timeline: SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: Some(Lsn(1 + state.max_lsn_wal_lag.get())), - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), - }, - etcd_version: 0, - latest_update: now, - }, - )]); - - let only_candidate = state - .next_connection_candidate() - .expect("Expected one candidate selected out of the only data option, but got none"); - assert_eq!(only_candidate.safekeeper_id, other_sk_id); - assert_eq!( - only_candidate.reason, - ReconnectReason::NoEtcdDataForExistingConnection, - "Should select new safekeeper due to missing etcd data, even if there's an existing connection with this safekeeper" - ); - assert!(only_candidate - .wal_source_connstr - .contains(DUMMY_SAFEKEEPER_CONNSTR)); - - Ok(()) - } - #[tokio::test] async fn lsn_wal_over_threshhold_current_candidate() -> anyhow::Result<()> { let harness = RepoHarness::create("lsn_wal_over_threshcurrent_candidate")?; @@ -993,21 +1045,25 @@ mod tests { let connected_sk_id = NodeId(0); let new_lsn = Lsn(current_lsn.0 + state.max_lsn_wal_lag.get() + 1); + let connection_status = WalConnectionStatus { + is_connected: true, + has_received_wal: true, + latest_connection_update: now, + latest_wal_update: now, + commit_lsn: Some(current_lsn), + streaming_lsn: Some(current_lsn), + }; + state.wal_connection = Some(WalConnection { sk_id: connected_sk_id, - latest_connection_update: now, + status: connection_status.clone(), connection_task: TaskHandle::spawn(move |sender, _| async move { sender - .send(TaskEvent::NewEvent(ReplicationFeedback { - current_timeline_size: 1, - ps_writelsn: current_lsn.0, - ps_applylsn: 1, - ps_flushlsn: 1, - ps_replytime: SystemTime::now(), - })) + .send(TaskEvent::NewEvent(connection_status.clone())) .ok(); Ok(()) }), + discovered_new_wal: None, }); state.wal_stream_candidates = HashMap::from([ ( @@ -1052,8 +1108,8 @@ mod tests { assert_eq!( over_threshcurrent_candidate.reason, ReconnectReason::LaggingWal { - current_lsn, - new_lsn, + current_commit_lsn: current_lsn, + new_commit_lsn: new_lsn, threshold: state.max_lsn_wal_lag }, "Should select bigger WAL safekeeper if it starts to lag enough" @@ -1066,31 +1122,35 @@ mod tests { } #[tokio::test] - async fn timeout_wal_over_threshhold_current_candidate() -> anyhow::Result<()> { - let harness = RepoHarness::create("timeout_wal_over_threshhold_current_candidate")?; + async fn timeout_connection_threshhold_current_candidate() -> anyhow::Result<()> { + let harness = RepoHarness::create("timeout_connection_threshhold_current_candidate")?; let mut state = dummy_state(&harness); let current_lsn = Lsn(100_000).align(); let now = Utc::now().naive_utc(); - let lagging_wal_timeout = chrono::Duration::from_std(state.lagging_wal_timeout)?; + let wal_connect_timeout = chrono::Duration::from_std(state.wal_connect_timeout)?; let time_over_threshold = - Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout; + Utc::now().naive_utc() - wal_connect_timeout - wal_connect_timeout; + + let connection_status = WalConnectionStatus { + is_connected: true, + has_received_wal: true, + latest_connection_update: time_over_threshold, + latest_wal_update: time_over_threshold, + commit_lsn: Some(current_lsn), + streaming_lsn: Some(current_lsn), + }; state.wal_connection = Some(WalConnection { sk_id: NodeId(1), - latest_connection_update: time_over_threshold, + status: connection_status.clone(), connection_task: TaskHandle::spawn(move |sender, _| async move { sender - .send(TaskEvent::NewEvent(ReplicationFeedback { - current_timeline_size: 1, - ps_writelsn: current_lsn.0, - ps_applylsn: 1, - ps_flushlsn: 1, - ps_replytime: SystemTime::now(), - })) + .send(TaskEvent::NewEvent(connection_status.clone())) .ok(); Ok(()) }), + discovered_new_wal: None, }); state.wal_stream_candidates = HashMap::from([( NodeId(0), @@ -1115,12 +1175,12 @@ mod tests { assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0)); match over_threshcurrent_candidate.reason { - ReconnectReason::NoWalTimeout { - last_wal_interaction, + ReconnectReason::NoKeepAlives { + last_keep_alive, threshold, .. } => { - assert_eq!(last_wal_interaction, Some(time_over_threshold)); + assert_eq!(last_keep_alive, Some(time_over_threshold)); assert_eq!(threshold, state.lagging_wal_timeout); } unexpected => panic!("Unexpected reason: {unexpected:?}"), @@ -1133,20 +1193,34 @@ mod tests { } #[tokio::test] - async fn timeout_connection_over_threshhold_current_candidate() -> anyhow::Result<()> { - let harness = RepoHarness::create("timeout_connection_over_threshhold_current_candidate")?; + async fn timeout_wal_over_threshhold_current_candidate() -> anyhow::Result<()> { + let harness = RepoHarness::create("timeout_wal_over_threshhold_current_candidate")?; let mut state = dummy_state(&harness); let current_lsn = Lsn(100_000).align(); + let new_lsn = Lsn(100_100).align(); let now = Utc::now().naive_utc(); let lagging_wal_timeout = chrono::Duration::from_std(state.lagging_wal_timeout)?; let time_over_threshold = Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout; + let connection_status = WalConnectionStatus { + is_connected: true, + has_received_wal: true, + latest_connection_update: now, + latest_wal_update: time_over_threshold, + commit_lsn: Some(current_lsn), + streaming_lsn: Some(current_lsn), + }; + state.wal_connection = Some(WalConnection { sk_id: NodeId(1), - latest_connection_update: time_over_threshold, + status: connection_status, connection_task: TaskHandle::spawn(move |_, _| async move { Ok(()) }), + discovered_new_wal: Some(NewCommittedWAL { + discovered_at: time_over_threshold, + lsn: new_lsn, + }), }); state.wal_stream_candidates = HashMap::from([( NodeId(0), @@ -1154,7 +1228,7 @@ mod tests { timeline: SkTimelineInfo { last_log_term: None, flush_lsn: None, - commit_lsn: Some(current_lsn), + commit_lsn: Some(new_lsn), backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, @@ -1172,10 +1246,16 @@ mod tests { assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0)); match over_threshcurrent_candidate.reason { ReconnectReason::NoWalTimeout { + current_lsn, + current_commit_lsn, + candidate_commit_lsn, last_wal_interaction, threshold, .. } => { + assert_eq!(current_lsn, current_lsn); + assert_eq!(current_commit_lsn, current_lsn); + assert_eq!(candidate_commit_lsn, new_lsn); assert_eq!(last_wal_interaction, Some(time_over_threshold)); assert_eq!(threshold, state.lagging_wal_timeout); } @@ -1202,7 +1282,7 @@ mod tests { .expect("Failed to create an empty timeline for dummy wal connection manager"), wal_connect_timeout: Duration::from_secs(1), lagging_wal_timeout: Duration::from_secs(1), - max_lsn_wal_lag: NonZeroU64::new(1).unwrap(), + max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(), wal_connection: None, wal_stream_candidates: HashMap::new(), wal_connection_attempts: HashMap::new(), diff --git a/pageserver/src/walreceiver/walreceiver_connection.rs b/pageserver/src/walreceiver/walreceiver_connection.rs index 538ebfe30e..16a1f232e3 100644 --- a/pageserver/src/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -8,6 +8,7 @@ use std::{ use anyhow::{bail, ensure, Context}; use bytes::BytesMut; +use chrono::{NaiveDateTime, Utc}; use fail::fail_point; use futures::StreamExt; use postgres::{SimpleQueryMessage, SimpleQueryRow}; @@ -29,12 +30,29 @@ use crate::{ use postgres_ffi::waldecoder::WalStreamDecoder; use utils::{lsn::Lsn, pq_proto::ReplicationFeedback, zid::ZTenantTimelineId}; +/// Status of the connection. +#[derive(Debug, Clone)] +pub struct WalConnectionStatus { + /// If we were able to initiate a postgres connection, this means that safekeeper process is at least running. + pub is_connected: bool, + /// Defines a healthy connection as one on which we have received at least some WAL bytes. + pub has_received_wal: bool, + /// Connection establishment time or the timestamp of a latest connection message received. + pub latest_connection_update: NaiveDateTime, + /// Time of the latest WAL message received. + pub latest_wal_update: NaiveDateTime, + /// Latest WAL update contained WAL up to this LSN. Next WAL message with start from that LSN. + pub streaming_lsn: Option, + /// Latest commit_lsn received from the safekeeper. Can be zero if no message has been received yet. + pub commit_lsn: Option, +} + /// Open a connection to the given safekeeper and receive WAL, sending back progress /// messages as we go. pub async fn handle_walreceiver_connection( id: ZTenantTimelineId, wal_source_connstr: &str, - events_sender: &watch::Sender>, + events_sender: &watch::Sender>, mut cancellation: watch::Receiver<()>, connect_timeout: Duration, ) -> anyhow::Result<()> { @@ -49,12 +67,26 @@ pub async fn handle_walreceiver_connection( .await .context("Timed out while waiting for walreceiver connection to open")? .context("Failed to open walreceiver conection")?; + + info!("connected!"); + let mut connection_status = WalConnectionStatus { + is_connected: true, + has_received_wal: false, + latest_connection_update: Utc::now().naive_utc(), + latest_wal_update: Utc::now().naive_utc(), + streaming_lsn: None, + commit_lsn: None, + }; + if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) { + warn!("Wal connection event listener dropped right after connection init, aborting the connection: {e}"); + return Ok(()); + } + // The connection object performs the actual communication with the database, // so spawn it off to run on its own. let mut connection_cancellation = cancellation.clone(); tokio::spawn( async move { - info!("connected!"); select! { connection_result = connection => match connection_result{ Ok(()) => info!("Walreceiver db connection closed"), @@ -84,6 +116,14 @@ pub async fn handle_walreceiver_connection( let identify = identify_system(&mut replication_client).await?; info!("{identify:?}"); + + connection_status.latest_connection_update = Utc::now().naive_utc(); + if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) { + warn!("Wal connection event listener dropped after IDENTIFY_SYSTEM, aborting the connection: {e}"); + return Ok(()); + } + + // NB: this is a flush_lsn, not a commit_lsn. let end_of_wal = Lsn::from(u64::from(identify.xlogpos)); let mut caught_up = false; let ZTenantTimelineId { @@ -118,7 +158,7 @@ pub async fn handle_walreceiver_connection( // There might be some padding after the last full record, skip it. startpoint += startpoint.calc_padding(8u32); - info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, server is at {end_of_wal}..."); + info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, safekeeper is at {end_of_wal}..."); let query = format!("START_REPLICATION PHYSICAL {startpoint}"); @@ -140,6 +180,33 @@ pub async fn handle_walreceiver_connection( } } { let replication_message = replication_message?; + let now = Utc::now().naive_utc(); + + // Update the connection status before processing the message. If the message processing + // fails (e.g. in walingest), we still want to know latests LSNs from the safekeeper. + match &replication_message { + ReplicationMessage::XLogData(xlog_data) => { + connection_status.latest_connection_update = now; + connection_status.commit_lsn = Some(Lsn::from(xlog_data.wal_end())); + connection_status.streaming_lsn = Some(Lsn::from( + xlog_data.wal_start() + xlog_data.data().len() as u64, + )); + if !xlog_data.data().is_empty() { + connection_status.latest_wal_update = now; + connection_status.has_received_wal = true; + } + } + ReplicationMessage::PrimaryKeepAlive(keepalive) => { + connection_status.latest_connection_update = now; + connection_status.commit_lsn = Some(Lsn::from(keepalive.wal_end())); + } + &_ => {} + }; + if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) { + warn!("Wal connection event listener dropped, aborting the connection: {e}"); + return Ok(()); + } + let status_update = match replication_message { ReplicationMessage::XLogData(xlog_data) => { // Pass the WAL data to the decoder, and see if we can decode @@ -257,10 +324,6 @@ pub async fn handle_walreceiver_connection( .as_mut() .zenith_status_update(data.len() as u64, &data) .await?; - if let Err(e) = events_sender.send(TaskEvent::NewEvent(zenith_status_update)) { - warn!("Wal connection event listener dropped, aborting the connection: {e}"); - return Ok(()); - } } } diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index b55ba84756..b6f914858e 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -1090,11 +1090,9 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): # Remove initial tenant fully (two branches are active) response = sk_http.tenant_delete_force(tenant_id) - assert response == { - timeline_id_3: { - "dir_existed": True, - "was_active": True, - } + assert response[timeline_id_3] == { + "dir_existed": True, + "was_active": True, } assert not (sk_data_dir / tenant_id).exists() assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir() diff --git a/test_runner/batch_others/test_wal_acceptor_async.py b/test_runner/batch_others/test_wal_acceptor_async.py index 5c0cb56880..e1d3ba0919 100644 --- a/test_runner/batch_others/test_wal_acceptor_async.py +++ b/test_runner/batch_others/test_wal_acceptor_async.py @@ -520,3 +520,68 @@ def test_race_conditions(neon_env_builder: NeonEnvBuilder): pg = env.postgres.create_start('test_safekeepers_race_conditions') asyncio.run(run_race_conditions(env, pg)) + + +# Check that pageserver can select safekeeper with largest commit_lsn +# and switch if LSN is not updated for some time (NoWalTimeout). +async def run_wal_lagging(env: NeonEnv, pg: Postgres): + def safekeepers_guc(env: NeonEnv, active_sk: List[bool]) -> str: + # use ports 10, 11 and 12 to simulate unavailable safekeepers + return ','.join([ + f'localhost:{sk.port.pg if active else 10 + i}' + for i, (sk, active) in enumerate(zip(env.safekeepers, active_sk)) + ]) + + conn = await pg.connect_async() + await conn.execute('CREATE TABLE t(key int primary key, value text)') + await conn.close() + pg.stop() + + n_iterations = 20 + n_txes = 10000 + expected_sum = 0 + i = 1 + quorum = len(env.safekeepers) // 2 + 1 + + for it in range(n_iterations): + active_sk = list(map(lambda _: random.random() >= 0.5, env.safekeepers)) + active_count = sum(active_sk) + + if active_count < quorum: + it -= 1 + continue + + pg.adjust_for_safekeepers(safekeepers_guc(env, active_sk)) + log.info(f'Iteration {it}: {active_sk}') + + pg.start() + conn = await pg.connect_async() + + for _ in range(n_txes): + await conn.execute(f"INSERT INTO t values ({i}, 'payload')") + expected_sum += i + i += 1 + + await conn.close() + pg.stop() + + pg.adjust_for_safekeepers(safekeepers_guc(env, [True] * len(env.safekeepers))) + pg.start() + conn = await pg.connect_async() + + log.info(f'Executed {i-1} queries') + + res = await conn.fetchval('SELECT sum(key) FROM t') + assert res == expected_sum + + +# do inserts while restarting postgres and messing with safekeeper addresses +def test_wal_lagging(neon_env_builder: NeonEnvBuilder): + + neon_env_builder.num_safekeepers = 3 + env = neon_env_builder.init_start() + + env.neon_cli.create_branch('test_wal_lagging') + pg = env.postgres.create_start('test_wal_lagging') + + asyncio.run(run_wal_lagging(env, pg))