From 123816e99ac3b150aecfc71002f53cf0b1e64bf0 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 1 Nov 2024 13:47:03 +0100 Subject: [PATCH] safekeeper: log slow WalAcceptor sends (#9564) ## Problem We don't have any observability into full WalAcceptor queues per timeline. ## Summary of changes Logs a message when a WalAcceptor send has blocked for 5 seconds, and another message when the send completes. This implies that the log frequency is at most once every 5 seconds per timeline, so we don't need further throttling. --- safekeeper/src/receive_wal.rs | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index f97e127a17..2410e22f45 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -26,10 +26,11 @@ use std::net::SocketAddr; use std::sync::Arc; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; +use tokio::sync::mpsc::error::SendTimeoutError; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::task; use tokio::task::JoinHandle; -use tokio::time::{Duration, MissedTickBehavior}; +use tokio::time::{Duration, Instant, MissedTickBehavior}; use tracing::*; use utils::id::TenantTimelineId; use utils::lsn::Lsn; @@ -384,9 +385,29 @@ async fn read_network_loop( msg_tx: Sender, mut next_msg: ProposerAcceptorMessage, ) -> Result<(), CopyStreamHandlerEnd> { + /// Threshold for logging slow WalAcceptor sends. + const SLOW_THRESHOLD: Duration = Duration::from_secs(5); + loop { - if msg_tx.send(next_msg).await.is_err() { - return Ok(()); // chan closed, WalAcceptor terminated + let started = Instant::now(); + match msg_tx.send_timeout(next_msg, SLOW_THRESHOLD).await { + Ok(()) => {} + // Slow send, log a message and keep trying. Log context has timeline ID. + Err(SendTimeoutError::Timeout(next_msg)) => { + warn!( + "slow WalAcceptor send blocked for {:.3}s", + Instant::now().duration_since(started).as_secs_f64() + ); + if msg_tx.send(next_msg).await.is_err() { + return Ok(()); // WalAcceptor terminated + } + warn!( + "slow WalAcceptor send completed after {:.3}s", + Instant::now().duration_since(started).as_secs_f64() + ) + } + // WalAcceptor terminated. + Err(SendTimeoutError::Closed(_)) => return Ok(()), } next_msg = read_message(pgb_reader).await?; }