Fix walreceiver and safekeeper bugs (#2295)

- There was an issue with zero commit_lsn `reason: LaggingWal { current_commit_lsn: 0/0, new_commit_lsn: 1/6FD90D38, threshold: 10485760 } }`. The problem was in `send_wal.rs`, where we initialized `end_pos = Lsn(0)` and in some cases sent it to the pageserver.
- IDENTIFY_SYSTEM previously returned `flush_lsn` as a physical end of WAL. Now it returns `flush_lsn` (as it was) to walproposer and `commit_lsn` to everyone else including pageserver.
- There was an issue with backoff where connection was cancelled right after initialization: `connected!` -> `safekeeper_handle_db: Connection cancelled` -> `Backoff: waiting 3 seconds`. The problem was in sleeping before establishing the connection. This is fixed by reworking retry logic.
- There was an issue with getting `NoKeepAlives` reason in a loop. The issue is probably the same as the previous.
- There was an issue with filtering safekeepers based on retry attempts, which could filter some safekeepers indefinetely. This is fixed by using retry cooldown duration instead of retry attempts.
- Some `send_wal.rs` connections failed with errors without context. This is fixed by adding a timeline to safekeepers errors.

New retry logic works like this:
- Every candidate has a `next_retry_at` timestamp and is not considered for connection until that moment
- When walreceiver connection is closed, we update `next_retry_at` using exponential backoff, increasing the cooldown on every disconnect.
- When `last_record_lsn` was advanced using the WAL from the safekeeper, we reset the retry cooldown and exponential backoff, allowing walreceiver to reconnect to the same safekeeper instantly.
This commit is contained in:
Arthur Petukhovsky
2022-08-18 13:38:23 +03:00
committed by GitHub
parent 1a07ddae5f
commit 976576ae59
4 changed files with 189 additions and 105 deletions

View File

@@ -96,6 +96,8 @@ async fn connection_manager_loop_step(
info!("Subscribed for etcd timeline changes, waiting for new etcd data"); info!("Subscribed for etcd timeline changes, waiting for new etcd data");
loop { loop {
let time_until_next_retry = walreceiver_state.time_until_next_retry();
select! { select! {
broker_connection_result = &mut broker_subscription.watcher_handle => { broker_connection_result = &mut broker_subscription.watcher_handle => {
cleanup_broker_connection(broker_connection_result, walreceiver_state); cleanup_broker_connection(broker_connection_result, walreceiver_state);
@@ -110,27 +112,23 @@ async fn connection_manager_loop_step(
} => { } => {
let wal_connection = walreceiver_state.wal_connection.as_mut().expect("Should have a connection, as checked by the corresponding select! guard"); let wal_connection = walreceiver_state.wal_connection.as_mut().expect("Should have a connection, as checked by the corresponding select! guard");
match wal_connection_update { match wal_connection_update {
TaskEvent::Started => { TaskEvent::Started => {},
*walreceiver_state.wal_connection_attempts.entry(wal_connection.sk_id).or_insert(0) += 1;
},
TaskEvent::NewEvent(status) => { TaskEvent::NewEvent(status) => {
if status.has_received_wal { if status.has_processed_wal {
// Reset connection attempts here only, we know that safekeeper is healthy // We have advanced last_record_lsn by processing the WAL received
// because it can send us a WAL update. // from this safekeeper. This is good enough to clean unsuccessful
walreceiver_state.wal_connection_attempts.remove(&wal_connection.sk_id); // retries history and allow reconnecting to this safekeeper without
// sleeping for a long time.
walreceiver_state.wal_connection_retries.remove(&wal_connection.sk_id);
} }
wal_connection.status = status; wal_connection.status = status;
}, },
TaskEvent::End(end_result) => { TaskEvent::End(end_result) => {
match end_result { match end_result {
Ok(()) => debug!("WAL receiving task finished"), Ok(()) => debug!("WAL receiving task finished"),
Err(e) => { Err(e) => warn!("WAL receiving task failed: {e}"),
warn!("WAL receiving task failed: {e}");
// If the task failed, set the connection attempts to at least 1, to try other safekeepers.
let _ = *walreceiver_state.wal_connection_attempts.entry(wal_connection.sk_id).or_insert(1);
}
}; };
walreceiver_state.wal_connection = None; walreceiver_state.drop_old_connection(false).await;
}, },
} }
}, },
@@ -154,6 +152,8 @@ async fn connection_manager_loop_step(
} }
} }
}, },
_ = async { tokio::time::sleep(time_until_next_retry.unwrap()).await }, if time_until_next_retry.is_some() => {}
} }
// Fetch more etcd timeline updates, but limit ourselves since they may arrive quickly. // Fetch more etcd timeline updates, but limit ourselves since they may arrive quickly.
@@ -234,6 +234,10 @@ async fn subscribe_for_timeline_updates(
} }
} }
const WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS: f64 = 0.1;
const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0;
const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5;
/// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible. /// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible.
struct WalreceiverState { struct WalreceiverState {
id: ZTenantTimelineId, id: ZTenantTimelineId,
@@ -247,7 +251,8 @@ struct WalreceiverState {
max_lsn_wal_lag: NonZeroU64, max_lsn_wal_lag: NonZeroU64,
/// Current connection to safekeeper for WAL streaming. /// Current connection to safekeeper for WAL streaming.
wal_connection: Option<WalConnection>, wal_connection: Option<WalConnection>,
wal_connection_attempts: HashMap<NodeId, u32>, /// Info about retries and unsuccessful attempts to connect to safekeepers.
wal_connection_retries: HashMap<NodeId, RetryInfo>,
/// Data about all timelines, available for connection, fetched from etcd, grouped by their corresponding safekeeper node id. /// Data about all timelines, available for connection, fetched from etcd, grouped by their corresponding safekeeper node id.
wal_stream_candidates: HashMap<NodeId, EtcdSkTimeline>, wal_stream_candidates: HashMap<NodeId, EtcdSkTimeline>,
} }
@@ -255,6 +260,8 @@ struct WalreceiverState {
/// Current connection data. /// Current connection data.
#[derive(Debug)] #[derive(Debug)]
struct WalConnection { struct WalConnection {
/// Time when the connection was initiated.
started_at: NaiveDateTime,
/// Current safekeeper pageserver is connected to for WAL streaming. /// Current safekeeper pageserver is connected to for WAL streaming.
sk_id: NodeId, sk_id: NodeId,
/// Status of the connection. /// Status of the connection.
@@ -274,6 +281,12 @@ struct NewCommittedWAL {
discovered_at: NaiveDateTime, discovered_at: NaiveDateTime,
} }
#[derive(Debug)]
struct RetryInfo {
next_retry_at: Option<NaiveDateTime>,
retry_duration_seconds: f64,
}
/// Data about the timeline to connect to, received from etcd. /// Data about the timeline to connect to, received from etcd.
#[derive(Debug)] #[derive(Debug)]
struct EtcdSkTimeline { struct EtcdSkTimeline {
@@ -300,31 +313,18 @@ impl WalreceiverState {
max_lsn_wal_lag, max_lsn_wal_lag,
wal_connection: None, wal_connection: None,
wal_stream_candidates: HashMap::new(), wal_stream_candidates: HashMap::new(),
wal_connection_attempts: HashMap::new(), wal_connection_retries: HashMap::new(),
} }
} }
/// Shuts down the current connection (if any) and immediately starts another one with the given connection string. /// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
async fn change_connection(&mut self, new_sk_id: NodeId, new_wal_source_connstr: String) { async fn change_connection(&mut self, new_sk_id: NodeId, new_wal_source_connstr: String) {
if let Some(old_connection) = self.wal_connection.take() { self.drop_old_connection(true).await;
old_connection.connection_task.shutdown().await
}
let id = self.id; let id = self.id;
let connect_timeout = self.wal_connect_timeout; let connect_timeout = self.wal_connect_timeout;
let connection_attempt = self
.wal_connection_attempts
.get(&new_sk_id)
.copied()
.unwrap_or(0);
let connection_handle = TaskHandle::spawn(move |events_sender, cancellation| { let connection_handle = TaskHandle::spawn(move |events_sender, cancellation| {
async move { async move {
exponential_backoff(
connection_attempt,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
)
.await;
super::walreceiver_connection::handle_walreceiver_connection( super::walreceiver_connection::handle_walreceiver_connection(
id, id,
&new_wal_source_connstr, &new_wal_source_connstr,
@@ -340,10 +340,11 @@ impl WalreceiverState {
let now = Utc::now().naive_utc(); let now = Utc::now().naive_utc();
self.wal_connection = Some(WalConnection { self.wal_connection = Some(WalConnection {
started_at: now,
sk_id: new_sk_id, sk_id: new_sk_id,
status: WalConnectionStatus { status: WalConnectionStatus {
is_connected: false, is_connected: false,
has_received_wal: false, has_processed_wal: false,
latest_connection_update: now, latest_connection_update: now,
latest_wal_update: now, latest_wal_update: now,
streaming_lsn: None, streaming_lsn: None,
@@ -354,6 +355,71 @@ impl WalreceiverState {
}); });
} }
/// Drops the current connection (if any) and updates retry timeout for the next
/// connection attempt to the same safekeeper.
async fn drop_old_connection(&mut self, needs_shutdown: bool) {
let wal_connection = match self.wal_connection.take() {
Some(wal_connection) => wal_connection,
None => return,
};
if needs_shutdown {
wal_connection.connection_task.shutdown().await;
}
let retry = self
.wal_connection_retries
.entry(wal_connection.sk_id)
.or_insert(RetryInfo {
next_retry_at: None,
retry_duration_seconds: WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS,
});
let now = Utc::now().naive_utc();
// Schedule the next retry attempt. We want to have exponential backoff for connection attempts,
// and we add backoff to the time when we started the connection attempt. If the connection
// was active for a long time, then next_retry_at will be in the past.
retry.next_retry_at =
wal_connection
.started_at
.checked_add_signed(chrono::Duration::milliseconds(
(retry.retry_duration_seconds * 1000.0) as i64,
));
if let Some(next) = &retry.next_retry_at {
if next > &now {
info!(
"Next connection retry to {:?} is at {}",
wal_connection.sk_id, next
);
}
}
let next_retry_duration =
retry.retry_duration_seconds * WALCONNECTION_RETRY_BACKOFF_MULTIPLIER;
// Clamp the next retry duration to the maximum allowed.
let next_retry_duration = next_retry_duration.min(WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS);
// Clamp the next retry duration to the minimum allowed.
let next_retry_duration = next_retry_duration.max(WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS);
retry.retry_duration_seconds = next_retry_duration;
}
/// Returns time needed to wait to have a new candidate for WAL streaming.
fn time_until_next_retry(&self) -> Option<Duration> {
let now = Utc::now().naive_utc();
let next_retry_at = self
.wal_connection_retries
.values()
.filter_map(|retry| retry.next_retry_at)
.filter(|next_retry_at| next_retry_at > &now)
.min();
next_retry_at.and_then(|next_retry_at| (next_retry_at - now).to_std().ok())
}
/// Adds another etcd timeline into the state, if its more recent than the one already added there for the same key. /// Adds another etcd timeline into the state, if its more recent than the one already added there for the same key.
fn register_timeline_update(&mut self, timeline_update: BrokerUpdate<SkTimelineInfo>) { fn register_timeline_update(&mut self, timeline_update: BrokerUpdate<SkTimelineInfo>) {
match self match self
@@ -547,52 +613,37 @@ impl WalreceiverState {
/// Optionally, omits the given node, to support gracefully switching from a healthy safekeeper to another. /// Optionally, omits the given node, to support gracefully switching from a healthy safekeeper to another.
/// ///
/// The candidate that is chosen: /// The candidate that is chosen:
/// * has fewest connection attempts from pageserver to safekeeper node (reset every time we receive a WAL message from the node) /// * has no pending retry cooldown
/// * has greatest data Lsn among the ones that are left /// * has greatest commit_lsn among the ones that are left
///
/// NOTE:
/// We evict timeline data received from etcd based on time passed since it was registered, along with its connection attempts values, but
/// otherwise to reset the connection attempts, a successful connection to that node is needed.
/// That won't happen now, before all nodes with less connection attempts are connected to first, which might leave the sk node with more advanced state to be ignored.
fn select_connection_candidate( fn select_connection_candidate(
&self, &self,
node_to_omit: Option<NodeId>, node_to_omit: Option<NodeId>,
) -> Option<(NodeId, &SkTimelineInfo, String)> { ) -> Option<(NodeId, &SkTimelineInfo, String)> {
let all_candidates = self self.applicable_connection_candidates()
.applicable_connection_candidates()
.filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit) .filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit)
.collect::<Vec<_>>();
let smallest_attempts_allowed = all_candidates
.iter()
.map(|(sk_id, _, _)| {
self.wal_connection_attempts
.get(sk_id)
.copied()
.unwrap_or(0)
})
.min()?;
all_candidates
.into_iter()
.filter(|(sk_id, _, _)| {
smallest_attempts_allowed
>= self
.wal_connection_attempts
.get(sk_id)
.copied()
.unwrap_or(0)
})
.max_by_key(|(_, info, _)| info.commit_lsn) .max_by_key(|(_, info, _)| info.commit_lsn)
} }
/// Returns a list of safekeepers that have valid info and ready for connection. /// Returns a list of safekeepers that have valid info and ready for connection.
/// Some safekeepers are filtered by the retry cooldown.
fn applicable_connection_candidates( fn applicable_connection_candidates(
&self, &self,
) -> impl Iterator<Item = (NodeId, &SkTimelineInfo, String)> { ) -> impl Iterator<Item = (NodeId, &SkTimelineInfo, String)> {
let now = Utc::now().naive_utc();
self.wal_stream_candidates self.wal_stream_candidates
.iter() .iter()
.filter(|(_, info)| info.timeline.commit_lsn.is_some()) .filter(|(_, info)| info.timeline.commit_lsn.is_some())
.filter(move |(sk_id, _)| {
let next_retry_at = self
.wal_connection_retries
.get(sk_id)
.and_then(|retry_info| {
retry_info.next_retry_at
});
next_retry_at.is_none() || next_retry_at.unwrap() <= now
})
.filter_map(|(sk_id, etcd_info)| { .filter_map(|(sk_id, etcd_info)| {
let info = &etcd_info.timeline; let info = &etcd_info.timeline;
match wal_stream_connection_string( match wal_stream_connection_string(
@@ -627,7 +678,7 @@ impl WalreceiverState {
}); });
for node_id in node_ids_to_remove { for node_id in node_ids_to_remove {
self.wal_connection_attempts.remove(&node_id); self.wal_connection_retries.remove(&node_id);
} }
} }
} }
@@ -684,7 +735,6 @@ fn wal_stream_connection_string(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::repository::{ use crate::repository::{
repo_harness::{RepoHarness, TIMELINE_ID}, repo_harness::{RepoHarness, TIMELINE_ID},
Repository, Repository,
@@ -789,7 +839,7 @@ mod tests {
let connection_status = WalConnectionStatus { let connection_status = WalConnectionStatus {
is_connected: true, is_connected: true,
has_received_wal: true, has_processed_wal: true,
latest_connection_update: now, latest_connection_update: now,
latest_wal_update: now, latest_wal_update: now,
commit_lsn: Some(Lsn(current_lsn)), commit_lsn: Some(Lsn(current_lsn)),
@@ -798,6 +848,7 @@ mod tests {
state.max_lsn_wal_lag = NonZeroU64::new(100).unwrap(); state.max_lsn_wal_lag = NonZeroU64::new(100).unwrap();
state.wal_connection = Some(WalConnection { state.wal_connection = Some(WalConnection {
started_at: now,
sk_id: connected_sk_id, sk_id: connected_sk_id,
status: connection_status.clone(), status: connection_status.clone(),
connection_task: TaskHandle::spawn(move |sender, _| async move { connection_task: TaskHandle::spawn(move |sender, _| async move {
@@ -1017,7 +1068,13 @@ mod tests {
}, },
), ),
]); ]);
state.wal_connection_attempts = HashMap::from([(NodeId(0), 1), (NodeId(1), 0)]); state.wal_connection_retries = HashMap::from([(
NodeId(0),
RetryInfo {
next_retry_at: now.checked_add_signed(chrono::Duration::hours(1)),
retry_duration_seconds: WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS,
},
)]);
let candidate_with_less_errors = state let candidate_with_less_errors = state
.next_connection_candidate() .next_connection_candidate()
@@ -1025,7 +1082,7 @@ mod tests {
assert_eq!( assert_eq!(
candidate_with_less_errors.safekeeper_id, candidate_with_less_errors.safekeeper_id,
NodeId(1), NodeId(1),
"Should select the node with less connection errors" "Should select the node with no pending retry cooldown"
); );
Ok(()) Ok(())
@@ -1043,7 +1100,7 @@ mod tests {
let connection_status = WalConnectionStatus { let connection_status = WalConnectionStatus {
is_connected: true, is_connected: true,
has_received_wal: true, has_processed_wal: true,
latest_connection_update: now, latest_connection_update: now,
latest_wal_update: now, latest_wal_update: now,
commit_lsn: Some(current_lsn), commit_lsn: Some(current_lsn),
@@ -1051,6 +1108,7 @@ mod tests {
}; };
state.wal_connection = Some(WalConnection { state.wal_connection = Some(WalConnection {
started_at: now,
sk_id: connected_sk_id, sk_id: connected_sk_id,
status: connection_status.clone(), status: connection_status.clone(),
connection_task: TaskHandle::spawn(move |sender, _| async move { connection_task: TaskHandle::spawn(move |sender, _| async move {
@@ -1130,7 +1188,7 @@ mod tests {
let connection_status = WalConnectionStatus { let connection_status = WalConnectionStatus {
is_connected: true, is_connected: true,
has_received_wal: true, has_processed_wal: true,
latest_connection_update: time_over_threshold, latest_connection_update: time_over_threshold,
latest_wal_update: time_over_threshold, latest_wal_update: time_over_threshold,
commit_lsn: Some(current_lsn), commit_lsn: Some(current_lsn),
@@ -1138,6 +1196,7 @@ mod tests {
}; };
state.wal_connection = Some(WalConnection { state.wal_connection = Some(WalConnection {
started_at: now,
sk_id: NodeId(1), sk_id: NodeId(1),
status: connection_status.clone(), status: connection_status.clone(),
connection_task: TaskHandle::spawn(move |sender, _| async move { connection_task: TaskHandle::spawn(move |sender, _| async move {
@@ -1202,7 +1261,7 @@ mod tests {
let connection_status = WalConnectionStatus { let connection_status = WalConnectionStatus {
is_connected: true, is_connected: true,
has_received_wal: true, has_processed_wal: true,
latest_connection_update: now, latest_connection_update: now,
latest_wal_update: time_over_threshold, latest_wal_update: time_over_threshold,
commit_lsn: Some(current_lsn), commit_lsn: Some(current_lsn),
@@ -1210,6 +1269,7 @@ mod tests {
}; };
state.wal_connection = Some(WalConnection { state.wal_connection = Some(WalConnection {
started_at: now,
sk_id: NodeId(1), sk_id: NodeId(1),
status: connection_status, status: connection_status,
connection_task: TaskHandle::spawn(move |_, _| async move { Ok(()) }), connection_task: TaskHandle::spawn(move |_, _| async move { Ok(()) }),
@@ -1281,7 +1341,7 @@ mod tests {
max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(), max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
wal_connection: None, wal_connection: None,
wal_stream_candidates: HashMap::new(), wal_stream_candidates: HashMap::new(),
wal_connection_attempts: HashMap::new(), wal_connection_retries: HashMap::new(),
} }
} }
} }

View File

@@ -35,8 +35,9 @@ use utils::{lsn::Lsn, pq_proto::ReplicationFeedback, zid::ZTenantTimelineId};
pub struct WalConnectionStatus { pub struct WalConnectionStatus {
/// If we were able to initiate a postgres connection, this means that safekeeper process is at least running. /// If we were able to initiate a postgres connection, this means that safekeeper process is at least running.
pub is_connected: bool, pub is_connected: bool,
/// Defines a healthy connection as one on which we have received at least some WAL bytes. /// Defines a healthy connection as one on which pageserver received WAL from safekeeper
pub has_received_wal: bool, /// and is able to process it in walingest without errors.
pub has_processed_wal: bool,
/// Connection establishment time or the timestamp of a latest connection message received. /// Connection establishment time or the timestamp of a latest connection message received.
pub latest_connection_update: NaiveDateTime, pub latest_connection_update: NaiveDateTime,
/// Time of the latest WAL message received. /// Time of the latest WAL message received.
@@ -71,7 +72,7 @@ pub async fn handle_walreceiver_connection(
info!("connected!"); info!("connected!");
let mut connection_status = WalConnectionStatus { let mut connection_status = WalConnectionStatus {
is_connected: true, is_connected: true,
has_received_wal: false, has_processed_wal: false,
latest_connection_update: Utc::now().naive_utc(), latest_connection_update: Utc::now().naive_utc(),
latest_wal_update: Utc::now().naive_utc(), latest_wal_update: Utc::now().naive_utc(),
streaming_lsn: None, streaming_lsn: None,
@@ -117,13 +118,6 @@ pub async fn handle_walreceiver_connection(
let identify = identify_system(&mut replication_client).await?; let identify = identify_system(&mut replication_client).await?;
info!("{identify:?}"); info!("{identify:?}");
connection_status.latest_connection_update = Utc::now().naive_utc();
if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) {
warn!("Wal connection event listener dropped after IDENTIFY_SYSTEM, aborting the connection: {e}");
return Ok(());
}
// NB: this is a flush_lsn, not a commit_lsn.
let end_of_wal = Lsn::from(u64::from(identify.xlogpos)); let end_of_wal = Lsn::from(u64::from(identify.xlogpos));
let mut caught_up = false; let mut caught_up = false;
let ZTenantTimelineId { let ZTenantTimelineId {
@@ -131,6 +125,14 @@ pub async fn handle_walreceiver_connection(
timeline_id, timeline_id,
} = id; } = id;
connection_status.latest_connection_update = Utc::now().naive_utc();
connection_status.latest_wal_update = Utc::now().naive_utc();
connection_status.commit_lsn = Some(end_of_wal);
if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) {
warn!("Wal connection event listener dropped after IDENTIFY_SYSTEM, aborting the connection: {e}");
return Ok(());
}
let (repo, timeline) = tokio::task::spawn_blocking(move || { let (repo, timeline) = tokio::task::spawn_blocking(move || {
let repo = tenant_mgr::get_repository_for_tenant(tenant_id) let repo = tenant_mgr::get_repository_for_tenant(tenant_id)
.with_context(|| format!("no repository found for tenant {tenant_id}"))?; .with_context(|| format!("no repository found for tenant {tenant_id}"))?;
@@ -181,6 +183,7 @@ pub async fn handle_walreceiver_connection(
} { } {
let replication_message = replication_message?; let replication_message = replication_message?;
let now = Utc::now().naive_utc(); let now = Utc::now().naive_utc();
let last_rec_lsn_before_msg = last_rec_lsn;
// Update the connection status before processing the message. If the message processing // Update the connection status before processing the message. If the message processing
// fails (e.g. in walingest), we still want to know latests LSNs from the safekeeper. // fails (e.g. in walingest), we still want to know latests LSNs from the safekeeper.
@@ -193,7 +196,6 @@ pub async fn handle_walreceiver_connection(
)); ));
if !xlog_data.data().is_empty() { if !xlog_data.data().is_empty() {
connection_status.latest_wal_update = now; connection_status.latest_wal_update = now;
connection_status.has_received_wal = true;
} }
} }
ReplicationMessage::PrimaryKeepAlive(keepalive) => { ReplicationMessage::PrimaryKeepAlive(keepalive) => {
@@ -265,6 +267,15 @@ pub async fn handle_walreceiver_connection(
_ => None, _ => None,
}; };
if !connection_status.has_processed_wal && last_rec_lsn > last_rec_lsn_before_msg {
// We have successfully processed at least one WAL record.
connection_status.has_processed_wal = true;
if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) {
warn!("Wal connection event listener dropped, aborting the connection: {e}");
return Ok(());
}
}
let timeline_to_check = Arc::clone(&timeline); let timeline_to_check = Arc::clone(&timeline);
tokio::task::spawn_blocking(move || timeline_to_check.check_checkpoint_distance()) tokio::task::spawn_blocking(move || timeline_to_check.check_checkpoint_distance())
.await .await

View File

@@ -90,7 +90,10 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler {
fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: &str) -> Result<()> { fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: &str) -> Result<()> {
let cmd = parse_cmd(query_string)?; let cmd = parse_cmd(query_string)?;
info!("got query {:?}", query_string); info!(
"got query {:?} in timeline {:?}",
query_string, self.ztimelineid
);
let create = !(matches!(cmd, SafekeeperPostgresCommand::StartReplication { .. }) let create = !(matches!(cmd, SafekeeperPostgresCommand::StartReplication { .. })
|| matches!(cmd, SafekeeperPostgresCommand::IdentifySystem)); || matches!(cmd, SafekeeperPostgresCommand::IdentifySystem));
@@ -106,23 +109,17 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler {
} }
match cmd { match cmd {
SafekeeperPostgresCommand::StartWalPush => { SafekeeperPostgresCommand::StartWalPush => ReceiveWalConn::new(pgb)
ReceiveWalConn::new(pgb) .run(self)
.run(self) .context("failed to run ReceiveWalConn"),
.context("failed to run ReceiveWalConn")?; SafekeeperPostgresCommand::StartReplication { start_lsn } => ReplicationConn::new(pgb)
} .run(self, pgb, start_lsn)
SafekeeperPostgresCommand::StartReplication { start_lsn } => { .context("failed to run ReplicationConn"),
ReplicationConn::new(pgb) SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb),
.run(self, pgb, start_lsn) SafekeeperPostgresCommand::JSONCtrl { ref cmd } => handle_json_ctrl(self, pgb, cmd),
.context("failed to run ReplicationConn")?;
}
SafekeeperPostgresCommand::IdentifySystem => {
self.handle_identify_system(pgb)?;
}
SafekeeperPostgresCommand::JSONCtrl { ref cmd } => {
handle_json_ctrl(self, pgb, cmd)?;
}
} }
.context(format!("timeline {timelineid}"))?;
Ok(()) Ok(())
} }
} }
@@ -153,8 +150,15 @@ impl SafekeeperPostgresHandler {
/// Handle IDENTIFY_SYSTEM replication command /// Handle IDENTIFY_SYSTEM replication command
/// ///
fn handle_identify_system(&mut self, pgb: &mut PostgresBackend) -> Result<()> { fn handle_identify_system(&mut self, pgb: &mut PostgresBackend) -> Result<()> {
let start_pos = self.timeline.get().get_end_of_wal(); let lsn = if self.is_walproposer_recovery() {
let lsn = start_pos.to_string(); // walproposer should get all local WAL until flush_lsn
self.timeline.get().get_end_of_wal()
} else {
// other clients shouldn't get any uncommitted WAL
self.timeline.get().get_state().0.commit_lsn
}
.to_string();
let sysid = self let sysid = self
.timeline .timeline
.get() .get()
@@ -203,4 +207,11 @@ impl SafekeeperPostgresHandler {
.write_message(&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM"))?; .write_message(&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM"))?;
Ok(()) Ok(())
} }
/// Returns true if current connection is a replication connection, originating
/// from a walproposer recovery function. This connection gets a special handling:
/// safekeeper must stream all local WAL till the flush_lsn, whether committed or not.
pub fn is_walproposer_recovery(&self) -> bool {
self.appname == Some("wal_proposer_recovery".to_string())
}
} }

View File

@@ -170,6 +170,7 @@ impl ReplicationConn {
// spawn the background thread which receives HotStandbyFeedback messages. // spawn the background thread which receives HotStandbyFeedback messages.
let bg_timeline = Arc::clone(spg.timeline.get()); let bg_timeline = Arc::clone(spg.timeline.get());
let bg_stream_in = self.stream_in.take().unwrap(); let bg_stream_in = self.stream_in.take().unwrap();
let bg_timeline_id = spg.ztimelineid.unwrap();
let state = ReplicaState::new(); let state = ReplicaState::new();
// This replica_id is used below to check if it's time to stop replication. // This replica_id is used below to check if it's time to stop replication.
@@ -188,6 +189,8 @@ impl ReplicationConn {
let _ = thread::Builder::new() let _ = thread::Builder::new()
.name("HotStandbyFeedback thread".into()) .name("HotStandbyFeedback thread".into())
.spawn(move || { .spawn(move || {
let _enter =
info_span!("HotStandbyFeedback thread", timeline = %bg_timeline_id).entered();
if let Err(err) = Self::background_thread(bg_stream_in, bg_replica_guard) { if let Err(err) = Self::background_thread(bg_stream_in, bg_replica_guard) {
error!("Replication background thread failed: {}", err); error!("Replication background thread failed: {}", err);
} }
@@ -198,13 +201,12 @@ impl ReplicationConn {
.build()?; .build()?;
runtime.block_on(async move { runtime.block_on(async move {
let (_, persisted_state) = spg.timeline.get().get_state(); let (inmem_state, persisted_state) = spg.timeline.get().get_state();
// add persisted_state.timeline_start_lsn == Lsn(0) check // add persisted_state.timeline_start_lsn == Lsn(0) check
if persisted_state.server.wal_seg_size == 0 { if persisted_state.server.wal_seg_size == 0 {
bail!("Cannot start replication before connecting to walproposer"); bail!("Cannot start replication before connecting to walproposer");
} }
let wal_end = spg.timeline.get().get_end_of_wal();
// Walproposer gets special handling: safekeeper must give proposer all // Walproposer gets special handling: safekeeper must give proposer all
// local WAL till the end, whether committed or not (walproposer will // local WAL till the end, whether committed or not (walproposer will
// hang otherwise). That's because walproposer runs the consensus and // hang otherwise). That's because walproposer runs the consensus and
@@ -214,8 +216,8 @@ impl ReplicationConn {
// another compute rises which collects majority and starts fixing log // another compute rises which collects majority and starts fixing log
// on this safekeeper itself. That's ok as (old) proposer will never be // on this safekeeper itself. That's ok as (old) proposer will never be
// able to commit such WAL. // able to commit such WAL.
let stop_pos: Option<Lsn> = if spg.appname == Some("wal_proposer_recovery".to_string()) let stop_pos: Option<Lsn> = if spg.is_walproposer_recovery() {
{ let wal_end = spg.timeline.get().get_end_of_wal();
Some(wal_end) Some(wal_end)
} else { } else {
None None
@@ -226,7 +228,7 @@ impl ReplicationConn {
// switch to copy // switch to copy
pgb.write_message(&BeMessage::CopyBothResponse)?; pgb.write_message(&BeMessage::CopyBothResponse)?;
let mut end_pos = Lsn(0); let mut end_pos = stop_pos.unwrap_or(inmem_state.commit_lsn);
let mut wal_reader = WalReader::new( let mut wal_reader = WalReader::new(
spg.conf.timeline_dir(&spg.timeline.get().zttid), spg.conf.timeline_dir(&spg.timeline.get().zttid),