From 477ab12b691be1e44f557d45dbe64294009259d4 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Tue, 8 Jul 2025 17:46:55 +0100 Subject: [PATCH] pageserver: touch up broker subscription reset (#12503) ## Problem The goal of this code was to test out if resetting the broker subscription helps alleviate the issues we've been seeing in staging. Looks like it did the trick. However, the original version was too eager. ## Summary of Changes Only reset the stream when: * we are waiting for WAL * there's no connection candidates lined up * we're not already connected to a safekeeper --- .../timeline/walreceiver/connection_manager.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 9b151d2449..aba94244a3 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -182,12 +182,19 @@ pub(super) async fn connection_manager_loop_step( } }, + // If we've not received any updates from the broker from a while, are waiting for WAL + // and have no safekeeper connection or connection candidates, then it might be that + // the broker subscription is wedged. Drop the currrent subscription and re-subscribe + // with the goal of unblocking it. _ = broker_reset_interval.tick() => { - if wait_lsn_status.borrow().is_some() { - tracing::warn!("No broker updates received for a while, but waiting for WAL. Re-setting stream ...") - } + let awaiting_lsn = wait_lsn_status.borrow().is_some(); + let no_candidates = connection_manager_state.wal_stream_candidates.is_empty(); + let no_connection = connection_manager_state.wal_connection.is_none(); - broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?; + if awaiting_lsn && no_candidates && no_connection { + tracing::warn!("No broker updates received for a while, but waiting for WAL. Re-setting stream ..."); + broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?; + } }, new_event = async {