Replace usages of wait_for_active_timeline (#4243)

This commit replaces all usages of connection_manager.rs:
wait_for_active_timeline with Timeline::wait_to_become_active.
wait_to_become_active is better and in the right module.

close https://github.com/neondatabase/neon/issues/4189

Co-authored-by: Shany Pozin <shany@neon.tech>
This commit is contained in:
Joseph Koshakow
2023-05-16 10:38:39 -04:00
committed by GitHub
parent b7db62411b
commit 511b0945c3
2 changed files with 12 additions and 36 deletions

View File

@@ -945,7 +945,7 @@ impl Timeline {
pub async fn wait_to_become_active(
&self,
_ctx: &RequestContext, /* Prepare for use by cancellation */
_ctx: &RequestContext, // Prepare for use by cancellation
) -> Result<(), TimelineState> {
let mut receiver = self.state.subscribe();
loop {

View File

@@ -28,8 +28,8 @@ use storage_broker::proto::SubscribeSafekeeperInfoRequest;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use storage_broker::BrokerClientChannel;
use storage_broker::Streaming;
use tokio::select;
use tokio::sync::RwLock;
use tokio::{select, sync::watch};
use tracing::*;
use crate::{exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS};
@@ -50,13 +50,13 @@ pub(super) async fn connection_manager_loop_step(
ctx: &RequestContext,
manager_status: &RwLock<Option<ConnectionManagerStatus>>,
) -> ControlFlow<(), ()> {
let mut timeline_state_updates = connection_manager_state
match connection_manager_state
.timeline
.subscribe_for_state_updates();
match wait_for_active_timeline(&mut timeline_state_updates).await {
ControlFlow::Continue(()) => {}
ControlFlow::Break(()) => {
.wait_to_become_active(ctx)
.await
{
Ok(()) => {}
Err(_) => {
info!("Timeline dropped state updates sender before becoming active, stopping wal connection manager loop");
return ControlFlow::Break(());
}
@@ -72,6 +72,10 @@ pub(super) async fn connection_manager_loop_step(
timeline_id: connection_manager_state.timeline.timeline_id,
};
let mut timeline_state_updates = connection_manager_state
.timeline
.subscribe_for_state_updates();
// Subscribe to the broker updates. Stream shares underlying TCP connection
// with other streams on this client (other connection managers). When
// object goes out of scope, stream finishes in drop() automatically.
@@ -195,34 +199,6 @@ pub(super) async fn connection_manager_loop_step(
}
}
async fn wait_for_active_timeline(
timeline_state_updates: &mut watch::Receiver<TimelineState>,
) -> ControlFlow<(), ()> {
let current_state = *timeline_state_updates.borrow();
if current_state == TimelineState::Active {
return ControlFlow::Continue(());
}
loop {
match timeline_state_updates.changed().await {
Ok(()) => {
let new_state = *timeline_state_updates.borrow();
match new_state {
TimelineState::Active => {
debug!("Timeline state changed to active, continuing the walreceiver connection manager");
return ControlFlow::Continue(());
}
state => {
debug!("Not running the walreceiver connection manager, timeline is not active: {state:?}");
continue;
}
}
}
Err(_sender_dropped_error) => return ControlFlow::Break(()),
}
}
}
/// Endlessly try to subscribe for broker updates for a given timeline.
async fn subscribe_for_timeline_updates(
broker_client: &mut BrokerClientChannel,