mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 02:12:56 +00:00
safekeeper: log slow WalAcceptor sends (#9564)
## Problem We don't have any observability into full WalAcceptor queues per timeline. ## Summary of changes Logs a message when a WalAcceptor send has blocked for 5 seconds, and another message when the send completes. This implies that the log frequency is at most once every 5 seconds per timeline, so we don't need further throttling.
This commit is contained in:
@@ -26,10 +26,11 @@ use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::io::AsyncWrite;
|
||||
use tokio::sync::mpsc::error::SendTimeoutError;
|
||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
use tokio::task;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::{Duration, MissedTickBehavior};
|
||||
use tokio::time::{Duration, Instant, MissedTickBehavior};
|
||||
use tracing::*;
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
@@ -384,9 +385,29 @@ async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
msg_tx: Sender<ProposerAcceptorMessage>,
|
||||
mut next_msg: ProposerAcceptorMessage,
|
||||
) -> Result<(), CopyStreamHandlerEnd> {
|
||||
/// Threshold for logging slow WalAcceptor sends.
|
||||
const SLOW_THRESHOLD: Duration = Duration::from_secs(5);
|
||||
|
||||
loop {
|
||||
if msg_tx.send(next_msg).await.is_err() {
|
||||
return Ok(()); // chan closed, WalAcceptor terminated
|
||||
let started = Instant::now();
|
||||
match msg_tx.send_timeout(next_msg, SLOW_THRESHOLD).await {
|
||||
Ok(()) => {}
|
||||
// Slow send, log a message and keep trying. Log context has timeline ID.
|
||||
Err(SendTimeoutError::Timeout(next_msg)) => {
|
||||
warn!(
|
||||
"slow WalAcceptor send blocked for {:.3}s",
|
||||
Instant::now().duration_since(started).as_secs_f64()
|
||||
);
|
||||
if msg_tx.send(next_msg).await.is_err() {
|
||||
return Ok(()); // WalAcceptor terminated
|
||||
}
|
||||
warn!(
|
||||
"slow WalAcceptor send completed after {:.3}s",
|
||||
Instant::now().duration_since(started).as_secs_f64()
|
||||
)
|
||||
}
|
||||
// WalAcceptor terminated.
|
||||
Err(SendTimeoutError::Closed(_)) => return Ok(()),
|
||||
}
|
||||
next_msg = read_message(pgb_reader).await?;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user