diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 7fac7d2ac0..03a4ff8c8e 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -177,9 +177,9 @@ impl UninitializedTimeline<'_> { /// /// The new timeline is initialized in Active state, and its background jobs are /// started - pub fn initialize(self, _ctx: &RequestContext) -> anyhow::Result> { + pub fn initialize(self, ctx: &RequestContext) -> anyhow::Result> { let mut timelines = self.owning_tenant.timelines.lock().unwrap(); - self.initialize_with_lock(&mut timelines, true, true) + self.initialize_with_lock(ctx, &mut timelines, true, true) } /// Like `initialize`, but the caller is already holding lock on Tenant::timelines. @@ -189,6 +189,7 @@ impl UninitializedTimeline<'_> { /// been initialized. fn initialize_with_lock( mut self, + ctx: &RequestContext, timelines: &mut HashMap>, load_layer_map: bool, activate: bool, @@ -229,7 +230,9 @@ impl UninitializedTimeline<'_> { new_timeline.maybe_spawn_flush_loop(); if activate { - new_timeline.activate(); + new_timeline + .activate(ctx) + .context("initializing timeline activation")?; } } } @@ -469,7 +472,7 @@ impl Tenant { local_metadata: Option, ancestor: Option>, first_save: bool, - _ctx: &RequestContext, + ctx: &RequestContext, ) -> anyhow::Result<()> { let tenant_id = self.tenant_id; @@ -504,7 +507,7 @@ impl Tenant { // Do not start walreceiver here. We do need loaded layer map for reconcile_with_remote // But we shouldnt start walreceiver before we have all the data locally, because working walreceiver // will ingest data which may require looking at the layers which are not yet available locally - match timeline.initialize_with_lock(&mut timelines_accessor, true, false) { + match timeline.initialize_with_lock(ctx, &mut timelines_accessor, true, false) { Ok(new_timeline) => new_timeline, Err(e) => { error!("Failed to initialize timeline {tenant_id}/{timeline_id}: {e:?}"); @@ -629,7 +632,7 @@ impl Tenant { /// /// Background task that downloads all data for a tenant and brings it to Active state. /// - #[instrument(skip(self, ctx), fields(tenant_id=%self.tenant_id))] + #[instrument(skip_all, fields(tenant_id=%self.tenant_id))] async fn attach(self: &Arc, ctx: RequestContext) -> anyhow::Result<()> { // Create directory with marker file to indicate attaching state. // The load_local_tenants() function in tenant::mgr relies on the marker file @@ -750,7 +753,7 @@ impl Tenant { // Start background operations and open the tenant for business. // The loops will shut themselves down when they notice that the tenant is inactive. - self.activate()?; + self.activate(&ctx)?; info!("Done"); @@ -1022,7 +1025,7 @@ impl Tenant { // Start background operations and open the tenant for business. // The loops will shut themselves down when they notice that the tenant is inactive. - self.activate()?; + self.activate(ctx)?; info!("Done"); @@ -1358,12 +1361,7 @@ impl Tenant { // Stop the walreceiver first. debug!("waiting for wal receiver to shutdown"); - task_mgr::shutdown_tasks( - Some(TaskKind::WalReceiverManager), - Some(self.tenant_id), - Some(timeline_id), - ) - .await; + timeline.walreceiver.stop().await; debug!("wal receiver shutdown confirmed"); info!("waiting for timeline tasks to shutdown"); @@ -1450,7 +1448,7 @@ impl Tenant { } /// Changes tenant status to active, unless shutdown was already requested. - fn activate(&self) -> anyhow::Result<()> { + fn activate(&self, ctx: &RequestContext) -> anyhow::Result<()> { let mut result = Ok(()); self.state.send_modify(|current_state| { match *current_state { @@ -1484,7 +1482,20 @@ impl Tenant { tasks::start_background_loops(self.tenant_id); for timeline in not_broken_timelines { - timeline.activate(); + match timeline + .activate(ctx) + .context("timeline activation for activating tenant") + { + Ok(()) => {} + Err(e) => { + error!( + "Failed to activate timeline {}: {:#}", + timeline.timeline_id, e + ); + timeline.set_state(TimelineState::Broken); + *current_state = TenantState::Broken; + } + } } } } @@ -2093,7 +2104,7 @@ impl Tenant { src_timeline: &Arc, dst_id: TimelineId, start_lsn: Option, - _ctx: &RequestContext, + ctx: &RequestContext, ) -> anyhow::Result> { let src_id = src_timeline.timeline_id; @@ -2186,7 +2197,7 @@ impl Tenant { false, Some(Arc::clone(src_timeline)), )? - .initialize_with_lock(&mut timelines, true, true)?; + .initialize_with_lock(ctx, &mut timelines, true, true)?; drop(timelines); // Root timeline gets its layers during creation and uploads them along with the metadata. @@ -2299,7 +2310,7 @@ impl Tenant { let timeline = { let mut timelines = self.timelines.lock().unwrap(); - raw_timeline.initialize_with_lock(&mut timelines, false, true)? + raw_timeline.initialize_with_lock(ctx, &mut timelines, false, true)? }; info!( diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index e80e32644b..4b0d7a6994 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -14,6 +14,7 @@ use pageserver_api::models::{ DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceStatus, TimelineState, }; use remote_storage::GenericRemoteStorage; +use storage_broker::BrokerClientChannel; use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError}; use tokio_util::sync::CancellationToken; use tracing::*; @@ -30,7 +31,7 @@ use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering}; use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak}; use std::time::{Duration, Instant, SystemTime}; -use crate::broker_client::is_broker_client_initialized; +use crate::broker_client::{get_broker_client, is_broker_client_initialized}; use crate::context::{DownloadBehavior, RequestContext}; use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata}; use crate::tenant::storage_layer::{ @@ -71,10 +72,10 @@ use crate::walredo::WalRedoManager; use crate::METADATA_FILE_NAME; use crate::ZERO_PAGE; use crate::{is_temporary, task_mgr}; -use walreceiver::spawn_connection_manager_task; pub(super) use self::eviction_task::EvictionTaskTenantState; use self::eviction_task::EvictionTaskTimelineState; +use self::walreceiver::{WalReceiver, WalReceiverConf}; use super::layer_map::BatchedUpdates; use super::remote_timeline_client::index::IndexPart; @@ -214,6 +215,7 @@ pub struct Timeline { /// or None if WAL receiver has not received anything for this timeline /// yet. pub last_received_wal: Mutex>, + pub walreceiver: WalReceiver, /// Relation size cache pub rel_size_cache: RwLock>, @@ -866,10 +868,18 @@ impl Timeline { Ok(()) } - pub fn activate(self: &Arc) { + pub fn activate(self: &Arc, ctx: &RequestContext) -> anyhow::Result<()> { + if is_broker_client_initialized() { + self.launch_wal_receiver(ctx, get_broker_client().clone())?; + } else if cfg!(test) { + info!("not launching WAL receiver because broker client hasn't been initialized"); + } else { + anyhow::bail!("broker client not initialized"); + } + self.set_state(TimelineState::Active); - self.launch_wal_receiver(); self.launch_eviction_task(); + Ok(()) } pub fn set_state(&self, new_state: TimelineState) { @@ -1220,7 +1230,31 @@ impl Timeline { let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0); let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(()))); + let tenant_conf_guard = tenant_conf.read().unwrap(); + let wal_connect_timeout = tenant_conf_guard + .walreceiver_connect_timeout + .unwrap_or(conf.default_tenant_conf.walreceiver_connect_timeout); + let lagging_wal_timeout = tenant_conf_guard + .lagging_wal_timeout + .unwrap_or(conf.default_tenant_conf.lagging_wal_timeout); + let max_lsn_wal_lag = tenant_conf_guard + .max_lsn_wal_lag + .unwrap_or(conf.default_tenant_conf.max_lsn_wal_lag); + drop(tenant_conf_guard); + Arc::new_cyclic(|myself| { + let walreceiver = WalReceiver::new( + TenantTimelineId::new(tenant_id, timeline_id), + Weak::clone(myself), + WalReceiverConf { + wal_connect_timeout, + lagging_wal_timeout, + max_lsn_wal_lag, + auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(), + availability_zone: conf.availability_zone.clone(), + }, + ); + let mut result = Timeline { conf, tenant_conf, @@ -1231,6 +1265,7 @@ impl Timeline { layers: RwLock::new(LayerMap::default()), walredo_mgr, + walreceiver, remote_client: remote_client.map(Arc::new), @@ -1350,44 +1385,17 @@ impl Timeline { *flush_loop_state = FlushLoopState::Running; } - pub(super) fn launch_wal_receiver(self: &Arc) { - if !is_broker_client_initialized() { - if cfg!(test) { - info!("not launching WAL receiver because broker client hasn't been initialized"); - return; - } else { - panic!("broker client not initialized"); - } - } - + pub(super) fn launch_wal_receiver( + &self, + ctx: &RequestContext, + broker_client: BrokerClientChannel, + ) -> anyhow::Result<()> { info!( "launching WAL receiver for timeline {} of tenant {}", self.timeline_id, self.tenant_id ); - let tenant_conf_guard = self.tenant_conf.read().unwrap(); - let lagging_wal_timeout = tenant_conf_guard - .lagging_wal_timeout - .unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout); - let walreceiver_connect_timeout = tenant_conf_guard - .walreceiver_connect_timeout - .unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout); - let max_lsn_wal_lag = tenant_conf_guard - .max_lsn_wal_lag - .unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag); - drop(tenant_conf_guard); - let self_clone = Arc::clone(self); - let background_ctx = - // XXX: this is a detached_child. Plumb through the ctx from call sites. - RequestContext::todo_child(TaskKind::WalReceiverManager, DownloadBehavior::Error); - spawn_connection_manager_task( - self_clone, - walreceiver_connect_timeout, - lagging_wal_timeout, - max_lsn_wal_lag, - crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(), - self.conf.availability_zone.clone(), - background_ctx, - ); + self.walreceiver.start(ctx, broker_client)?; + Ok(()) } /// diff --git a/pageserver/src/tenant/timeline/walreceiver.rs b/pageserver/src/tenant/timeline/walreceiver.rs index f33a12c5cc..00f446af38 100644 --- a/pageserver/src/tenant/timeline/walreceiver.rs +++ b/pageserver/src/tenant/timeline/walreceiver.rs @@ -23,14 +23,133 @@ mod connection_manager; mod walreceiver_connection; -use crate::task_mgr::WALRECEIVER_RUNTIME; +use crate::context::{DownloadBehavior, RequestContext}; +use crate::task_mgr::{self, TaskKind, WALRECEIVER_RUNTIME}; +use crate::tenant::timeline::walreceiver::connection_manager::{ + connection_manager_loop_step, ConnectionManagerState, +}; +use anyhow::Context; use std::future::Future; +use std::num::NonZeroU64; +use std::ops::ControlFlow; +use std::sync::atomic::{self, AtomicBool}; +use std::sync::{Arc, Weak}; +use std::time::Duration; +use storage_broker::BrokerClientChannel; +use tokio::select; use tokio::sync::watch; use tokio_util::sync::CancellationToken; use tracing::*; -pub use connection_manager::spawn_connection_manager_task; +use utils::id::TenantTimelineId; + +use super::Timeline; + +#[derive(Clone)] +pub struct WalReceiverConf { + /// The timeout on the connection to safekeeper for WAL streaming. + pub wal_connect_timeout: Duration, + /// The timeout to use to determine when the current connection is "stale" and reconnect to the other one. + pub lagging_wal_timeout: Duration, + /// The Lsn lag to use to determine when the current connection is lagging to much behind and reconnect to the other one. + pub max_lsn_wal_lag: NonZeroU64, + pub auth_token: Option>, + pub availability_zone: Option, +} + +pub struct WalReceiver { + timeline: TenantTimelineId, + timeline_ref: Weak, + conf: WalReceiverConf, + started: AtomicBool, +} + +impl WalReceiver { + pub fn new( + timeline: TenantTimelineId, + timeline_ref: Weak, + conf: WalReceiverConf, + ) -> Self { + Self { + timeline, + timeline_ref, + conf, + started: AtomicBool::new(false), + } + } + + pub fn start( + &self, + ctx: &RequestContext, + mut broker_client: BrokerClientChannel, + ) -> anyhow::Result<()> { + if self.started.load(atomic::Ordering::Acquire) { + anyhow::bail!("Wal receiver is already started"); + } + + let timeline = self.timeline_ref.upgrade().with_context(|| { + format!("walreceiver start on a dropped timeline {}", self.timeline) + })?; + + let tenant_id = timeline.tenant_id; + let timeline_id = timeline.timeline_id; + let walreceiver_ctx = + ctx.detached_child(TaskKind::WalReceiverManager, DownloadBehavior::Error); + + let wal_receiver_conf = self.conf.clone(); + task_mgr::spawn( + WALRECEIVER_RUNTIME.handle(), + TaskKind::WalReceiverManager, + Some(tenant_id), + Some(timeline_id), + &format!("walreceiver for timeline {tenant_id}/{timeline_id}"), + false, + async move { + info!("WAL receiver manager started, connecting to broker"); + let mut connection_manager_state = ConnectionManagerState::new( + timeline, + wal_receiver_conf, + ); + loop { + select! { + _ = task_mgr::shutdown_watcher() => { + info!("WAL receiver shutdown requested, shutting down"); + connection_manager_state.shutdown().await; + return Ok(()); + }, + loop_step_result = connection_manager_loop_step( + &mut broker_client, + &mut connection_manager_state, + &walreceiver_ctx, + ) => match loop_step_result { + ControlFlow::Continue(()) => continue, + ControlFlow::Break(()) => { + info!("Connection manager loop ended, shutting down"); + connection_manager_state.shutdown().await; + return Ok(()); + } + }, + } + } + }.instrument(info_span!(parent: None, "wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id)) + ); + + self.started.store(true, atomic::Ordering::Release); + + Ok(()) + } + + pub async fn stop(&self) { + task_mgr::shutdown_tasks( + Some(TaskKind::WalReceiverManager), + Some(self.timeline.tenant_id), + Some(self.timeline.timeline_id), + ) + .await; + self.started.store(false, atomic::Ordering::Release); + } +} /// A handle of an asynchronous task. /// The task has a channel that it can use to communicate its lifecycle events in a certain form, see [`TaskEvent`] @@ -39,26 +158,26 @@ pub use connection_manager::spawn_connection_manager_task; /// Note that the communication happens via the `watch` channel, that does not accumulate the events, replacing the old one with the never one on submission. /// That may lead to certain events not being observed by the listener. #[derive(Debug)] -pub struct TaskHandle { +struct TaskHandle { join_handle: Option>>, events_receiver: watch::Receiver>, cancellation: CancellationToken, } -pub enum TaskEvent { +enum TaskEvent { Update(TaskStateUpdate), End(anyhow::Result<()>), } #[derive(Debug, Clone)] -pub enum TaskStateUpdate { +enum TaskStateUpdate { Started, Progress(E), } impl TaskHandle { /// Initializes the task, starting it immediately after the creation. - pub fn spawn( + fn spawn( task: impl FnOnce(watch::Sender>, CancellationToken) -> Fut + Send + 'static, ) -> Self where @@ -131,7 +250,7 @@ impl TaskHandle { } /// Aborts current task, waiting for it to finish. - pub async fn shutdown(self) { + async fn shutdown(self) { if let Some(jh) = self.join_handle { self.cancellation.cancel(); match jh.await { diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index de07676ffe..efcbfbce3d 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -11,11 +11,9 @@ use std::{collections::HashMap, num::NonZeroU64, ops::ControlFlow, sync::Arc, time::Duration}; -use super::TaskStateUpdate; -use crate::broker_client::get_broker_client; +use super::{TaskStateUpdate, WalReceiverConf}; use crate::context::{DownloadBehavior, RequestContext}; -use crate::task_mgr::WALRECEIVER_RUNTIME; -use crate::task_mgr::{self, TaskKind}; +use crate::task_mgr::TaskKind; use crate::tenant::Timeline; use anyhow::Context; use chrono::{NaiveDateTime, Utc}; @@ -38,75 +36,17 @@ use utils::{ use super::{walreceiver_connection::WalConnectionStatus, TaskEvent, TaskHandle}; -/// Spawns the loop to take care of the timeline's WAL streaming connection. -pub fn spawn_connection_manager_task( - timeline: Arc, - wal_connect_timeout: Duration, - lagging_wal_timeout: Duration, - max_lsn_wal_lag: NonZeroU64, - auth_token: Option>, - availability_zone: Option, - ctx: RequestContext, -) { - let mut broker_client = get_broker_client().clone(); - - let tenant_id = timeline.tenant_id; - let timeline_id = timeline.timeline_id; - - task_mgr::spawn( - WALRECEIVER_RUNTIME.handle(), - TaskKind::WalReceiverManager, - Some(tenant_id), - Some(timeline_id), - &format!("walreceiver for timeline {tenant_id}/{timeline_id}"), - false, - async move { - info!("WAL receiver manager started, connecting to broker"); - let mut walreceiver_state = WalreceiverState::new( - timeline, - wal_connect_timeout, - lagging_wal_timeout, - max_lsn_wal_lag, - auth_token, - availability_zone, - ); - loop { - select! { - _ = task_mgr::shutdown_watcher() => { - info!("WAL receiver shutdown requested, shutting down"); - walreceiver_state.shutdown().await; - return Ok(()); - }, - loop_step_result = connection_manager_loop_step( - &mut broker_client, - &mut walreceiver_state, - &ctx, - ) => match loop_step_result { - ControlFlow::Continue(()) => continue, - ControlFlow::Break(()) => { - info!("Connection manager loop ended, shutting down"); - walreceiver_state.shutdown().await; - return Ok(()); - } - }, - } - } - } - .instrument( - info_span!(parent: None, "wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id), - ), - ); -} - /// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker. /// Based on the updates, desides whether to start, keep or stop a WAL receiver task. /// If storage broker subscription is cancelled, exits. -async fn connection_manager_loop_step( +pub(super) async fn connection_manager_loop_step( broker_client: &mut BrokerClientChannel, - walreceiver_state: &mut WalreceiverState, + connection_manager_state: &mut ConnectionManagerState, ctx: &RequestContext, ) -> ControlFlow<(), ()> { - let mut timeline_state_updates = walreceiver_state.timeline.subscribe_for_state_updates(); + let mut timeline_state_updates = connection_manager_state + .timeline + .subscribe_for_state_updates(); match wait_for_active_timeline(&mut timeline_state_updates).await { ControlFlow::Continue(()) => {} @@ -117,8 +57,8 @@ async fn connection_manager_loop_step( } let id = TenantTimelineId { - tenant_id: walreceiver_state.timeline.tenant_id, - timeline_id: walreceiver_state.timeline.timeline_id, + tenant_id: connection_manager_state.timeline.tenant_id, + timeline_id: connection_manager_state.timeline.timeline_id, }; // Subscribe to the broker updates. Stream shares underlying TCP connection @@ -128,7 +68,7 @@ async fn connection_manager_loop_step( info!("Subscribed for broker timeline updates"); loop { - let time_until_next_retry = walreceiver_state.time_until_next_retry(); + let time_until_next_retry = connection_manager_state.time_until_next_retry(); // These things are happening concurrently: // @@ -141,12 +81,12 @@ async fn connection_manager_loop_step( // - timeline state changes to something that does not allow walreceiver to run concurrently select! { Some(wal_connection_update) = async { - match walreceiver_state.wal_connection.as_mut() { + match connection_manager_state.wal_connection.as_mut() { Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await), None => None, } } => { - let wal_connection = walreceiver_state.wal_connection.as_mut() + let wal_connection = connection_manager_state.wal_connection.as_mut() .expect("Should have a connection, as checked by the corresponding select! guard"); match wal_connection_update { TaskEvent::Update(TaskStateUpdate::Started) => {}, @@ -156,7 +96,7 @@ async fn connection_manager_loop_step( // from this safekeeper. This is good enough to clean unsuccessful // retries history and allow reconnecting to this safekeeper without // sleeping for a long time. - walreceiver_state.wal_connection_retries.remove(&wal_connection.sk_id); + connection_manager_state.wal_connection_retries.remove(&wal_connection.sk_id); } wal_connection.status = new_status; } @@ -165,7 +105,7 @@ async fn connection_manager_loop_step( Ok(()) => debug!("WAL receiving task finished"), Err(e) => error!("wal receiver task finished with an error: {e:?}"), } - walreceiver_state.drop_old_connection(false).await; + connection_manager_state.drop_old_connection(false).await; }, } }, @@ -173,7 +113,7 @@ async fn connection_manager_loop_step( // Got a new update from the broker broker_update = broker_subscription.message() => { match broker_update { - Ok(Some(broker_update)) => walreceiver_state.register_timeline_update(broker_update), + Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update), Err(e) => { error!("broker subscription failed: {e}"); return ControlFlow::Continue(()); @@ -187,12 +127,12 @@ async fn connection_manager_loop_step( new_event = async { loop { - if walreceiver_state.timeline.current_state() == TimelineState::Loading { + if connection_manager_state.timeline.current_state() == TimelineState::Loading { warn!("wal connection manager should only be launched after timeline has become active"); } match timeline_state_updates.changed().await { Ok(()) => { - let new_state = walreceiver_state.timeline.current_state(); + let new_state = connection_manager_state.timeline.current_state(); match new_state { // we're already active as walreceiver, no need to reactivate TimelineState::Active => continue, @@ -234,9 +174,9 @@ async fn connection_manager_loop_step( } => debug!("Waking up for the next retry after waiting for {time_until_next_retry:?}"), } - if let Some(new_candidate) = walreceiver_state.next_connection_candidate() { + if let Some(new_candidate) = connection_manager_state.next_connection_candidate() { info!("Switching to new connection candidate: {new_candidate:?}"); - walreceiver_state + connection_manager_state .change_connection(new_candidate, ctx) .await } @@ -314,25 +254,17 @@ const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0; const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5; /// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible. -struct WalreceiverState { +pub(super) struct ConnectionManagerState { id: TenantTimelineId, - /// Use pageserver data about the timeline to filter out some of the safekeepers. timeline: Arc, - /// The timeout on the connection to safekeeper for WAL streaming. - wal_connect_timeout: Duration, - /// The timeout to use to determine when the current connection is "stale" and reconnect to the other one. - lagging_wal_timeout: Duration, - /// The Lsn lag to use to determine when the current connection is lagging to much behind and reconnect to the other one. - max_lsn_wal_lag: NonZeroU64, + conf: WalReceiverConf, /// Current connection to safekeeper for WAL streaming. wal_connection: Option, /// Info about retries and unsuccessful attempts to connect to safekeepers. wal_connection_retries: HashMap, /// Data about all timelines, available for connection, fetched from storage broker, grouped by their corresponding safekeeper node id. wal_stream_candidates: HashMap, - auth_token: Option>, - availability_zone: Option, } /// Current connection data. @@ -375,15 +307,8 @@ struct BrokerSkTimeline { latest_update: NaiveDateTime, } -impl WalreceiverState { - fn new( - timeline: Arc, - wal_connect_timeout: Duration, - lagging_wal_timeout: Duration, - max_lsn_wal_lag: NonZeroU64, - auth_token: Option>, - availability_zone: Option, - ) -> Self { +impl ConnectionManagerState { + pub(super) fn new(timeline: Arc, conf: WalReceiverConf) -> Self { let id = TenantTimelineId { tenant_id: timeline.tenant_id, timeline_id: timeline.timeline_id, @@ -391,14 +316,10 @@ impl WalreceiverState { Self { id, timeline, - wal_connect_timeout, - lagging_wal_timeout, - max_lsn_wal_lag, + conf, wal_connection: None, wal_stream_candidates: HashMap::new(), wal_connection_retries: HashMap::new(), - auth_token, - availability_zone, } } @@ -407,7 +328,7 @@ impl WalreceiverState { self.drop_old_connection(true).await; let id = self.id; - let connect_timeout = self.wal_connect_timeout; + let connect_timeout = self.conf.wal_connect_timeout; let timeline = Arc::clone(&self.timeline); let ctx = ctx.detached_child( TaskKind::WalReceiverConnectionHandler, @@ -563,7 +484,7 @@ impl WalreceiverState { (now - existing_wal_connection.status.latest_connection_update).to_std() { // Drop connection if we haven't received keepalive message for a while. - if latest_interaciton > self.wal_connect_timeout { + if latest_interaciton > self.conf.wal_connect_timeout { return Some(NewWalConnectionCandidate { safekeeper_id: new_sk_id, wal_source_connconf: new_wal_source_connconf, @@ -573,7 +494,7 @@ impl WalreceiverState { existing_wal_connection.status.latest_connection_update, ), check_time: now, - threshold: self.wal_connect_timeout, + threshold: self.conf.wal_connect_timeout, }, }); } @@ -589,7 +510,7 @@ impl WalreceiverState { // Check if the new candidate has much more WAL than the current one. match new_commit_lsn.0.checked_sub(current_commit_lsn.0) { Some(new_sk_lsn_advantage) => { - if new_sk_lsn_advantage >= self.max_lsn_wal_lag.get() { + if new_sk_lsn_advantage >= self.conf.max_lsn_wal_lag.get() { return Some(NewWalConnectionCandidate { safekeeper_id: new_sk_id, wal_source_connconf: new_wal_source_connconf, @@ -597,16 +518,16 @@ impl WalreceiverState { reason: ReconnectReason::LaggingWal { current_commit_lsn, new_commit_lsn, - threshold: self.max_lsn_wal_lag, + threshold: self.conf.max_lsn_wal_lag, }, }); } // If we have a candidate with the same commit_lsn as the current one, which is in the same AZ as pageserver, // and the current one is not, switch to the new one. - if self.availability_zone.is_some() + if self.conf.availability_zone.is_some() && existing_wal_connection.availability_zone - != self.availability_zone - && self.availability_zone == new_availability_zone + != self.conf.availability_zone + && self.conf.availability_zone == new_availability_zone { return Some(NewWalConnectionCandidate { safekeeper_id: new_sk_id, @@ -677,7 +598,7 @@ impl WalreceiverState { if let Some(waiting_for_new_lsn_since) = waiting_for_new_lsn_since { if let Ok(waiting_for_new_wal) = (now - waiting_for_new_lsn_since).to_std() { if candidate_commit_lsn > current_commit_lsn - && waiting_for_new_wal > self.lagging_wal_timeout + && waiting_for_new_wal > self.conf.lagging_wal_timeout { return Some(NewWalConnectionCandidate { safekeeper_id: new_sk_id, @@ -691,7 +612,7 @@ impl WalreceiverState { existing_wal_connection.status.latest_wal_update, ), check_time: now, - threshold: self.lagging_wal_timeout, + threshold: self.conf.lagging_wal_timeout, }, }); } @@ -757,11 +678,11 @@ impl WalreceiverState { match wal_stream_connection_config( self.id, info.safekeeper_connstr.as_ref(), - match &self.auth_token { + match &self.conf.auth_token { None => None, Some(x) => Some(x), }, - self.availability_zone.as_deref(), + self.conf.availability_zone.as_deref(), ) { Ok(connstr) => Some((*sk_id, info, connstr)), Err(e) => { @@ -775,7 +696,7 @@ impl WalreceiverState { /// Remove candidates which haven't sent broker updates for a while. fn cleanup_old_candidates(&mut self) { let mut node_ids_to_remove = Vec::with_capacity(self.wal_stream_candidates.len()); - let lagging_wal_timeout = self.lagging_wal_timeout; + let lagging_wal_timeout = self.conf.lagging_wal_timeout; self.wal_stream_candidates.retain(|node_id, broker_info| { if let Ok(time_since_latest_broker_update) = @@ -799,7 +720,7 @@ impl WalreceiverState { } } - async fn shutdown(mut self) { + pub(super) async fn shutdown(mut self) { if let Some(wal_connection) = self.wal_connection.take() { wal_connection.connection_task.shutdown().await; } @@ -903,7 +824,7 @@ mod tests { let mut state = dummy_state(&harness).await; let now = Utc::now().naive_utc(); - let lagging_wal_timeout = chrono::Duration::from_std(state.lagging_wal_timeout)?; + let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?; let delay_over_threshold = now - lagging_wal_timeout - lagging_wal_timeout; state.wal_connection = None; @@ -914,7 +835,7 @@ mod tests { ( NodeId(3), dummy_broker_sk_timeline( - 1 + state.max_lsn_wal_lag.get(), + 1 + state.conf.max_lsn_wal_lag.get(), "delay_over_threshold", delay_over_threshold, ), @@ -948,7 +869,7 @@ mod tests { streaming_lsn: Some(Lsn(current_lsn)), }; - state.max_lsn_wal_lag = NonZeroU64::new(100).unwrap(); + state.conf.max_lsn_wal_lag = NonZeroU64::new(100).unwrap(); state.wal_connection = Some(WalConnection { started_at: now, sk_id: connected_sk_id, @@ -966,7 +887,7 @@ mod tests { ( connected_sk_id, dummy_broker_sk_timeline( - current_lsn + state.max_lsn_wal_lag.get() * 2, + current_lsn + state.conf.max_lsn_wal_lag.get() * 2, DUMMY_SAFEKEEPER_HOST, now, ), @@ -978,7 +899,7 @@ mod tests { ( NodeId(2), dummy_broker_sk_timeline( - current_lsn + state.max_lsn_wal_lag.get() / 2, + current_lsn + state.conf.max_lsn_wal_lag.get() / 2, "not_enough_advanced_lsn", now, ), @@ -1003,7 +924,11 @@ mod tests { state.wal_connection = None; state.wal_stream_candidates = HashMap::from([( NodeId(0), - dummy_broker_sk_timeline(1 + state.max_lsn_wal_lag.get(), DUMMY_SAFEKEEPER_HOST, now), + dummy_broker_sk_timeline( + 1 + state.conf.max_lsn_wal_lag.get(), + DUMMY_SAFEKEEPER_HOST, + now, + ), )]); let only_candidate = state @@ -1101,7 +1026,7 @@ mod tests { let now = Utc::now().naive_utc(); let connected_sk_id = NodeId(0); - let new_lsn = Lsn(current_lsn.0 + state.max_lsn_wal_lag.get() + 1); + let new_lsn = Lsn(current_lsn.0 + state.conf.max_lsn_wal_lag.get() + 1); let connection_status = WalConnectionStatus { is_connected: true, @@ -1146,7 +1071,7 @@ mod tests { ReconnectReason::LaggingWal { current_commit_lsn: current_lsn, new_commit_lsn: new_lsn, - threshold: state.max_lsn_wal_lag + threshold: state.conf.max_lsn_wal_lag }, "Should select bigger WAL safekeeper if it starts to lag enough" ); @@ -1165,7 +1090,7 @@ mod tests { let current_lsn = Lsn(100_000).align(); let now = Utc::now().naive_utc(); - let wal_connect_timeout = chrono::Duration::from_std(state.wal_connect_timeout)?; + let wal_connect_timeout = chrono::Duration::from_std(state.conf.wal_connect_timeout)?; let time_over_threshold = Utc::now().naive_utc() - wal_connect_timeout - wal_connect_timeout; @@ -1208,7 +1133,7 @@ mod tests { .. } => { assert_eq!(last_keep_alive, Some(time_over_threshold)); - assert_eq!(threshold, state.lagging_wal_timeout); + assert_eq!(threshold, state.conf.lagging_wal_timeout); } unexpected => panic!("Unexpected reason: {unexpected:?}"), } @@ -1228,7 +1153,7 @@ mod tests { let new_lsn = Lsn(100_100).align(); let now = Utc::now().naive_utc(); - let lagging_wal_timeout = chrono::Duration::from_std(state.lagging_wal_timeout)?; + let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?; let time_over_threshold = Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout; @@ -1275,7 +1200,7 @@ mod tests { assert_eq!(current_commit_lsn, current_lsn); assert_eq!(candidate_commit_lsn, new_lsn); assert_eq!(last_wal_interaction, Some(time_over_threshold)); - assert_eq!(threshold, state.lagging_wal_timeout); + assert_eq!(threshold, state.conf.lagging_wal_timeout); } unexpected => panic!("Unexpected reason: {unexpected:?}"), } @@ -1289,27 +1214,29 @@ mod tests { const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr"; - async fn dummy_state(harness: &TenantHarness<'_>) -> WalreceiverState { + async fn dummy_state(harness: &TenantHarness<'_>) -> ConnectionManagerState { let (tenant, ctx) = harness.load().await; let timeline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx) .expect("Failed to create an empty timeline for dummy wal connection manager"); let timeline = timeline.initialize(&ctx).unwrap(); - WalreceiverState { + ConnectionManagerState { id: TenantTimelineId { tenant_id: harness.tenant_id, timeline_id: TIMELINE_ID, }, timeline, - wal_connect_timeout: Duration::from_secs(1), - lagging_wal_timeout: Duration::from_secs(1), - max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(), + conf: WalReceiverConf { + wal_connect_timeout: Duration::from_secs(1), + lagging_wal_timeout: Duration::from_secs(1), + max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(), + auth_token: None, + availability_zone: None, + }, wal_connection: None, wal_stream_candidates: HashMap::new(), wal_connection_retries: HashMap::new(), - auth_token: None, - availability_zone: None, } } @@ -1321,7 +1248,7 @@ mod tests { let harness = TenantHarness::create("switch_to_same_availability_zone")?; let mut state = dummy_state(&harness).await; - state.availability_zone = test_az.clone(); + state.conf.availability_zone = test_az.clone(); let current_lsn = Lsn(100_000).align(); let now = Utc::now().naive_utc(); diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index ea2f2392ea..d5099dc2a5 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -42,7 +42,7 @@ use utils::lsn::Lsn; /// Status of the connection. #[derive(Debug, Clone, Copy)] -pub struct WalConnectionStatus { +pub(super) struct WalConnectionStatus { /// If we were able to initiate a postgres connection, this means that safekeeper process is at least running. pub is_connected: bool, /// Defines a healthy connection as one on which pageserver received WAL from safekeeper @@ -60,7 +60,7 @@ pub struct WalConnectionStatus { /// Open a connection to the given safekeeper and receive WAL, sending back progress /// messages as we go. -pub async fn handle_walreceiver_connection( +pub(super) async fn handle_walreceiver_connection( timeline: Arc, wal_source_connconf: PgConnectionConfig, events_sender: watch::Sender>,