diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 2543764eca..86d50de132 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -945,7 +945,7 @@ impl Timeline { pub async fn wait_to_become_active( &self, - _ctx: &RequestContext, /* Prepare for use by cancellation */ + _ctx: &RequestContext, // Prepare for use by cancellation ) -> Result<(), TimelineState> { let mut receiver = self.state.subscribe(); loop { diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 9cb17ea799..2305844d75 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -28,8 +28,8 @@ use storage_broker::proto::SubscribeSafekeeperInfoRequest; use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; use storage_broker::BrokerClientChannel; use storage_broker::Streaming; +use tokio::select; use tokio::sync::RwLock; -use tokio::{select, sync::watch}; use tracing::*; use crate::{exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS}; @@ -50,13 +50,13 @@ pub(super) async fn connection_manager_loop_step( ctx: &RequestContext, manager_status: &RwLock>, ) -> ControlFlow<(), ()> { - let mut timeline_state_updates = connection_manager_state + match connection_manager_state .timeline - .subscribe_for_state_updates(); - - match wait_for_active_timeline(&mut timeline_state_updates).await { - ControlFlow::Continue(()) => {} - ControlFlow::Break(()) => { + .wait_to_become_active(ctx) + .await + { + Ok(()) => {} + Err(_) => { info!("Timeline dropped state updates sender before becoming active, stopping wal connection manager loop"); return ControlFlow::Break(()); } @@ -72,6 +72,10 @@ pub(super) async fn connection_manager_loop_step( timeline_id: connection_manager_state.timeline.timeline_id, }; + let mut timeline_state_updates = connection_manager_state + .timeline + .subscribe_for_state_updates(); + // Subscribe to the broker updates. Stream shares underlying TCP connection // with other streams on this client (other connection managers). When // object goes out of scope, stream finishes in drop() automatically. @@ -195,34 +199,6 @@ pub(super) async fn connection_manager_loop_step( } } -async fn wait_for_active_timeline( - timeline_state_updates: &mut watch::Receiver, -) -> ControlFlow<(), ()> { - let current_state = *timeline_state_updates.borrow(); - if current_state == TimelineState::Active { - return ControlFlow::Continue(()); - } - - loop { - match timeline_state_updates.changed().await { - Ok(()) => { - let new_state = *timeline_state_updates.borrow(); - match new_state { - TimelineState::Active => { - debug!("Timeline state changed to active, continuing the walreceiver connection manager"); - return ControlFlow::Continue(()); - } - state => { - debug!("Not running the walreceiver connection manager, timeline is not active: {state:?}"); - continue; - } - } - } - Err(_sender_dropped_error) => return ControlFlow::Break(()), - } - } -} - /// Endlessly try to subscribe for broker updates for a given timeline. async fn subscribe_for_timeline_updates( broker_client: &mut BrokerClientChannel,