Tidy up walreceiver logs (#3147)

Closes https://github.com/neondatabase/neon/issues/3114

Improves walrecevier logs and remove `clone()` calls.
This commit is contained in:
Kirill Bulatov
2022-12-20 15:54:02 +02:00
committed by GitHub
parent 8e2edfcf39
commit 0c71dc627b
2 changed files with 63 additions and 35 deletions

View File

@@ -145,21 +145,17 @@ async fn connection_manager_loop_step(
let wal_connection = walreceiver_state.wal_connection.as_mut()
.expect("Should have a connection, as checked by the corresponding select! guard");
match wal_connection_update {
TaskEvent::Update(c) => {
match c {
TaskStateUpdate::Init | TaskStateUpdate::Started => {},
TaskStateUpdate::Progress(status) => {
if status.has_processed_wal {
// We have advanced last_record_lsn by processing the WAL received
// from this safekeeper. This is good enough to clean unsuccessful
// retries history and allow reconnecting to this safekeeper without
// sleeping for a long time.
walreceiver_state.wal_connection_retries.remove(&wal_connection.sk_id);
}
wal_connection.status = status.to_owned();
}
TaskEvent::Update(TaskStateUpdate::Init | TaskStateUpdate::Started) => {},
TaskEvent::Update(TaskStateUpdate::Progress(new_status)) => {
if new_status.has_processed_wal {
// We have advanced last_record_lsn by processing the WAL received
// from this safekeeper. This is good enough to clean unsuccessful
// retries history and allow reconnecting to this safekeeper without
// sleeping for a long time.
walreceiver_state.wal_connection_retries.remove(&wal_connection.sk_id);
}
},
wal_connection.status = new_status;
}
TaskEvent::End(walreceiver_task_result) => {
match walreceiver_task_result {
Ok(()) => debug!("WAL receiving task finished"),
@@ -210,7 +206,18 @@ async fn connection_manager_loop_step(
}
},
_ = async { tokio::time::sleep(time_until_next_retry.unwrap()).await }, if time_until_next_retry.is_some() => {}
Some(()) = async {
match time_until_next_retry {
Some(sleep_time) => {
tokio::time::sleep(sleep_time).await;
Some(())
},
None => {
debug!("No candidates to retry, waiting indefinitely for the broker events");
None
}
}
} => debug!("Waking up for the next retry after waiting for {time_until_next_retry:?}"),
}
if let Some(new_candidate) = walreceiver_state.next_connection_candidate() {
@@ -480,20 +487,25 @@ impl WalreceiverState {
.values()
.filter_map(|retry| retry.next_retry_at)
.filter(|next_retry_at| next_retry_at > &now)
.min();
.min()?;
next_retry_at.and_then(|next_retry_at| (next_retry_at - now).to_std().ok())
(next_retry_at - now).to_std().ok()
}
/// Adds another broker timeline into the state, if its more recent than the one already added there for the same key.
fn register_timeline_update(&mut self, timeline_update: SafekeeperTimelineInfo) {
self.wal_stream_candidates.insert(
NodeId(timeline_update.safekeeper_id),
let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);
let old_entry = self.wal_stream_candidates.insert(
new_safekeeper_id,
BrokerSkTimeline {
timeline: timeline_update,
latest_update: Utc::now().naive_utc(),
},
);
if old_entry.is_none() {
info!("New SK node was added: {new_safekeeper_id}");
}
}
/// Cleans up stale broker records and checks the rest for the new connection candidate.
@@ -720,12 +732,13 @@ impl WalreceiverState {
/// Remove candidates which haven't sent broker updates for a while.
fn cleanup_old_candidates(&mut self) {
let mut node_ids_to_remove = Vec::with_capacity(self.wal_stream_candidates.len());
let lagging_wal_timeout = self.lagging_wal_timeout;
self.wal_stream_candidates.retain(|node_id, broker_info| {
if let Ok(time_since_latest_broker_update) =
(Utc::now().naive_utc() - broker_info.latest_update).to_std()
{
let should_retain = time_since_latest_broker_update < self.lagging_wal_timeout;
let should_retain = time_since_latest_broker_update < lagging_wal_timeout;
if !should_retain {
node_ids_to_remove.push(*node_id);
}
@@ -735,8 +748,11 @@ impl WalreceiverState {
}
});
for node_id in node_ids_to_remove {
self.wal_connection_retries.remove(&node_id);
if !node_ids_to_remove.is_empty() {
for node_id in node_ids_to_remove {
info!("Safekeeper node {node_id} did not send events for over {lagging_wal_timeout:?}, not retrying the connections");
self.wal_connection_retries.remove(&node_id);
}
}
}
@@ -883,10 +899,10 @@ mod tests {
state.wal_connection = Some(WalConnection {
started_at: now,
sk_id: connected_sk_id,
status: connection_status.clone(),
status: connection_status,
connection_task: TaskHandle::spawn(move |sender, _| async move {
sender
.send(TaskStateUpdate::Progress(connection_status.clone()))
.send(TaskStateUpdate::Progress(connection_status))
.ok();
Ok(())
}),
@@ -1045,10 +1061,10 @@ mod tests {
state.wal_connection = Some(WalConnection {
started_at: now,
sk_id: connected_sk_id,
status: connection_status.clone(),
status: connection_status,
connection_task: TaskHandle::spawn(move |sender, _| async move {
sender
.send(TaskStateUpdate::Progress(connection_status.clone()))
.send(TaskStateUpdate::Progress(connection_status))
.ok();
Ok(())
}),
@@ -1110,10 +1126,10 @@ mod tests {
state.wal_connection = Some(WalConnection {
started_at: now,
sk_id: NodeId(1),
status: connection_status.clone(),
status: connection_status,
connection_task: TaskHandle::spawn(move |sender, _| async move {
sender
.send(TaskStateUpdate::Progress(connection_status.clone()))
.send(TaskStateUpdate::Progress(connection_status))
.ok();
Ok(())
}),

View File

@@ -35,7 +35,7 @@ use pq_proto::ReplicationFeedback;
use utils::lsn::Lsn;
/// Status of the connection.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Copy)]
pub struct WalConnectionStatus {
/// If we were able to initiate a postgres connection, this means that safekeeper process is at least running.
pub is_connected: bool,
@@ -83,7 +83,7 @@ pub async fn handle_walreceiver_connection(
streaming_lsn: None,
commit_lsn: None,
};
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status.clone())) {
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
warn!("Wal connection event listener dropped right after connection init, aborting the connection: {e}");
return Ok(());
}
@@ -135,7 +135,7 @@ pub async fn handle_walreceiver_connection(
connection_status.latest_connection_update = Utc::now().naive_utc();
connection_status.latest_wal_update = Utc::now().naive_utc();
connection_status.commit_lsn = Some(end_of_wal);
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status.clone())) {
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
warn!("Wal connection event listener dropped after IDENTIFY_SYSTEM, aborting the connection: {e}");
return Ok(());
}
@@ -184,7 +184,20 @@ pub async fn handle_walreceiver_connection(
replication_message = physical_stream.next() => replication_message,
}
} {
let replication_message = replication_message?;
let replication_message = match replication_message {
Ok(message) => message,
Err(replication_error) => {
if replication_error.is_closed() {
info!("Replication stream got closed");
return Ok(());
} else {
return Err(
anyhow::Error::new(replication_error).context("replication stream error")
);
}
}
};
let now = Utc::now().naive_utc();
let last_rec_lsn_before_msg = last_rec_lsn;
@@ -207,7 +220,7 @@ pub async fn handle_walreceiver_connection(
}
&_ => {}
};
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status.clone())) {
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
warn!("Wal connection event listener dropped, aborting the connection: {e}");
return Ok(());
}
@@ -273,8 +286,7 @@ pub async fn handle_walreceiver_connection(
if !connection_status.has_processed_wal && last_rec_lsn > last_rec_lsn_before_msg {
// We have successfully processed at least one WAL record.
connection_status.has_processed_wal = true;
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status.clone()))
{
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
warn!("Wal connection event listener dropped, aborting the connection: {e}");
return Ok(());
}