diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index e247fbf423..2827830f02 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -266,7 +266,7 @@ impl UninitializedTimeline<'_> { // updated it for the layers that we created during the import. let mut timelines = self.owning_tenant.timelines.lock().unwrap(); let tl = self.initialize_with_lock(ctx, &mut timelines, false)?; - tl.activate(broker_client, ctx)?; + tl.activate(broker_client, ctx); Ok(tl) } @@ -1333,7 +1333,7 @@ impl Tenant { } }; - loaded_timeline.activate(broker_client, ctx)?; + loaded_timeline.activate(broker_client, ctx); if let Some(remote_client) = loaded_timeline.remote_client.as_ref() { // Wait for the upload of the 'index_part.json` file to finish, so that when we return @@ -1481,7 +1481,10 @@ impl Tenant { // Stop the walreceiver first. debug!("waiting for wal receiver to shutdown"); - timeline.walreceiver.stop().await; + let maybe_started_walreceiver = { timeline.walreceiver.lock().unwrap().take() }; + if let Some(walreceiver) = maybe_started_walreceiver { + walreceiver.stop().await; + } debug!("wal receiver shutdown confirmed"); // Prevent new uploads from starting. @@ -1678,30 +1681,10 @@ impl Tenant { tasks::start_background_loops(self); let mut activated_timelines = 0; - let mut timelines_broken_during_activation = 0; for timeline in not_broken_timelines { - match timeline - .activate(broker_client.clone(), ctx) - .context("timeline activation for activating tenant") - { - Ok(()) => { - activated_timelines += 1; - } - Err(e) => { - error!( - "Failed to activate timeline {}: {:#}", - timeline.timeline_id, e - ); - timeline.set_state(TimelineState::Broken); - *current_state = TenantState::broken_from_reason(format!( - "failed to activate timeline {}: {}", - timeline.timeline_id, e - )); - - timelines_broken_during_activation += 1; - } - } + timeline.activate(broker_client.clone(), ctx); + activated_timelines += 1; } let elapsed = self.loading_started_at.elapsed(); @@ -1713,7 +1696,6 @@ impl Tenant { since_creation_millis = elapsed.as_millis(), tenant_id = %self.tenant_id, activated_timelines, - timelines_broken_during_activation, total_timelines, post_state = <&'static str>::from(&*current_state), "activation attempt finished" diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 9b449812ac..b0aca45882 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -226,7 +226,7 @@ pub struct Timeline { /// or None if WAL receiver has not received anything for this timeline /// yet. pub last_received_wal: Mutex>, - pub walreceiver: WalReceiver, + pub walreceiver: Mutex>, /// Relation size cache pub rel_size_cache: RwLock>, @@ -621,17 +621,27 @@ impl Timeline { .await { Ok(()) => Ok(()), - seqwait_error => { + Err(e) => { + // don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo drop(_timer); - let walreceiver_status = self.walreceiver.status().await; - seqwait_error.with_context(|| format!( - "Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, {}", - lsn, - self.get_last_record_lsn(), - self.get_disk_consistent_lsn(), - walreceiver_status.map(|status| status.to_human_readable_string()) - .unwrap_or_else(|| "WalReceiver status: Not active".to_string()), - )) + let walreceiver_status = { + match &*self.walreceiver.lock().unwrap() { + None => "stopping or stopped".to_string(), + Some(walreceiver) => match walreceiver.status() { + Some(status) => status.to_human_readable_string(), + None => "Not active".to_string(), + }, + } + }; + Err(anyhow::Error::new(e).context({ + format!( + "Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, WalReceiver status: {}", + lsn, + self.get_last_record_lsn(), + self.get_disk_consistent_lsn(), + walreceiver_status, + ) + })) } } } @@ -906,15 +916,10 @@ impl Timeline { Ok(()) } - pub fn activate( - self: &Arc, - broker_client: BrokerClientChannel, - ctx: &RequestContext, - ) -> anyhow::Result<()> { - self.launch_wal_receiver(ctx, broker_client)?; + pub fn activate(self: &Arc, broker_client: BrokerClientChannel, ctx: &RequestContext) { + self.launch_wal_receiver(ctx, broker_client); self.set_state(TimelineState::Active); self.launch_eviction_task(); - Ok(()) } pub fn set_state(&self, new_state: TimelineState) { @@ -1323,15 +1328,7 @@ impl Timeline { let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(()))); let tenant_conf_guard = tenant_conf.read().unwrap(); - let wal_connect_timeout = tenant_conf_guard - .walreceiver_connect_timeout - .unwrap_or(conf.default_tenant_conf.walreceiver_connect_timeout); - let lagging_wal_timeout = tenant_conf_guard - .lagging_wal_timeout - .unwrap_or(conf.default_tenant_conf.lagging_wal_timeout); - let max_lsn_wal_lag = tenant_conf_guard - .max_lsn_wal_lag - .unwrap_or(conf.default_tenant_conf.max_lsn_wal_lag); + let evictions_low_residence_duration_metric_threshold = Self::get_evictions_low_residence_duration_metric_threshold( &tenant_conf_guard, @@ -1340,18 +1337,6 @@ impl Timeline { drop(tenant_conf_guard); Arc::new_cyclic(|myself| { - let walreceiver = WalReceiver::new( - TenantTimelineId::new(tenant_id, timeline_id), - Weak::clone(myself), - WalReceiverConf { - wal_connect_timeout, - lagging_wal_timeout, - max_lsn_wal_lag, - auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(), - availability_zone: conf.availability_zone.clone(), - }, - ); - let mut result = Timeline { conf, tenant_conf, @@ -1363,7 +1348,7 @@ impl Timeline { wanted_image_layers: Mutex::new(None), walredo_mgr, - walreceiver, + walreceiver: Mutex::new(None), remote_client: remote_client.map(Arc::new), @@ -1483,17 +1468,49 @@ impl Timeline { *flush_loop_state = FlushLoopState::Running; } - pub(super) fn launch_wal_receiver( - &self, + /// Creates and starts the wal receiver. + /// + /// This function is expected to be called at most once per Timeline's lifecycle + /// when the timeline is activated. + fn launch_wal_receiver( + self: &Arc, ctx: &RequestContext, broker_client: BrokerClientChannel, - ) -> anyhow::Result<()> { + ) { info!( "launching WAL receiver for timeline {} of tenant {}", self.timeline_id, self.tenant_id ); - self.walreceiver.start(ctx, broker_client)?; - Ok(()) + + let tenant_conf_guard = self.tenant_conf.read().unwrap(); + let wal_connect_timeout = tenant_conf_guard + .walreceiver_connect_timeout + .unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout); + let lagging_wal_timeout = tenant_conf_guard + .lagging_wal_timeout + .unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout); + let max_lsn_wal_lag = tenant_conf_guard + .max_lsn_wal_lag + .unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag); + drop(tenant_conf_guard); + + let mut guard = self.walreceiver.lock().unwrap(); + assert!( + guard.is_none(), + "multiple launches / re-launches of WAL receiver are not supported" + ); + *guard = Some(WalReceiver::start( + Arc::clone(self), + WalReceiverConf { + wal_connect_timeout, + lagging_wal_timeout, + max_lsn_wal_lag, + auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(), + availability_zone: self.conf.availability_zone.clone(), + }, + broker_client, + ctx, + )); } /// diff --git a/pageserver/src/tenant/timeline/walreceiver.rs b/pageserver/src/tenant/timeline/walreceiver.rs index 91f7208194..7ebf3cf172 100644 --- a/pageserver/src/tenant/timeline/walreceiver.rs +++ b/pageserver/src/tenant/timeline/walreceiver.rs @@ -29,16 +29,14 @@ use crate::tenant::timeline::walreceiver::connection_manager::{ connection_manager_loop_step, ConnectionManagerState, }; -use anyhow::Context; use std::future::Future; use std::num::NonZeroU64; use std::ops::ControlFlow; -use std::sync::atomic::{self, AtomicBool}; -use std::sync::{Arc, Weak}; +use std::sync::Arc; use std::time::Duration; use storage_broker::BrokerClientChannel; use tokio::select; -use tokio::sync::{watch, RwLock}; +use tokio::sync::watch; use tokio_util::sync::CancellationToken; use tracing::*; @@ -62,46 +60,23 @@ pub struct WalReceiverConf { pub struct WalReceiver { timeline: TenantTimelineId, - timeline_ref: Weak, - conf: WalReceiverConf, - started: AtomicBool, - manager_status: Arc>>, + manager_status: Arc>>, } impl WalReceiver { - pub fn new( - timeline: TenantTimelineId, - timeline_ref: Weak, - conf: WalReceiverConf, - ) -> Self { - Self { - timeline, - timeline_ref, - conf, - started: AtomicBool::new(false), - manager_status: Arc::new(RwLock::new(None)), - } - } - pub fn start( - &self, - ctx: &RequestContext, + timeline: Arc, + conf: WalReceiverConf, mut broker_client: BrokerClientChannel, - ) -> anyhow::Result<()> { - if self.started.load(atomic::Ordering::Acquire) { - anyhow::bail!("Wal receiver is already started"); - } - - let timeline = self.timeline_ref.upgrade().with_context(|| { - format!("walreceiver start on a dropped timeline {}", self.timeline) - })?; - + ctx: &RequestContext, + ) -> Self { let tenant_id = timeline.tenant_id; let timeline_id = timeline.timeline_id; let walreceiver_ctx = ctx.detached_child(TaskKind::WalReceiverManager, DownloadBehavior::Error); - let wal_receiver_conf = self.conf.clone(); - let loop_status = Arc::clone(&self.manager_status); + + let loop_status = Arc::new(std::sync::RwLock::new(None)); + let manager_status = Arc::clone(&loop_status); task_mgr::spawn( WALRECEIVER_RUNTIME.handle(), TaskKind::WalReceiverManager, @@ -113,7 +88,7 @@ impl WalReceiver { info!("WAL receiver manager started, connecting to broker"); let mut connection_manager_state = ConnectionManagerState::new( timeline, - wal_receiver_conf, + conf, ); loop { select! { @@ -137,29 +112,29 @@ impl WalReceiver { } connection_manager_state.shutdown().await; - *loop_status.write().await = None; + *loop_status.write().unwrap() = None; Ok(()) } .instrument(info_span!(parent: None, "wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id)) ); - self.started.store(true, atomic::Ordering::Release); - - Ok(()) + Self { + timeline: TenantTimelineId::new(tenant_id, timeline_id), + manager_status, + } } - pub async fn stop(&self) { + pub async fn stop(self) { task_mgr::shutdown_tasks( Some(TaskKind::WalReceiverManager), Some(self.timeline.tenant_id), Some(self.timeline.timeline_id), ) .await; - self.started.store(false, atomic::Ordering::Release); } - pub(super) async fn status(&self) -> Option { - self.manager_status.read().await.clone() + pub(super) fn status(&self) -> Option { + self.manager_status.read().unwrap().clone() } } diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 3da1f023e1..6b65e1fd42 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -29,7 +29,6 @@ use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; use storage_broker::BrokerClientChannel; use storage_broker::Streaming; use tokio::select; -use tokio::sync::RwLock; use tracing::*; use crate::{exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS}; @@ -48,7 +47,7 @@ pub(super) async fn connection_manager_loop_step( broker_client: &mut BrokerClientChannel, connection_manager_state: &mut ConnectionManagerState, ctx: &RequestContext, - manager_status: &RwLock>, + manager_status: &std::sync::RwLock>, ) -> ControlFlow<(), ()> { match connection_manager_state .timeline @@ -195,7 +194,7 @@ pub(super) async fn connection_manager_loop_step( .change_connection(new_candidate, ctx) .await } - *manager_status.write().await = Some(connection_manager_state.manager_status()); + *manager_status.write().unwrap() = Some(connection_manager_state.manager_status()); } }