diff --git a/safekeeper/src/test_utils.rs b/safekeeper/src/test_utils.rs index c4cfd0656e..2bda795eb4 100644 --- a/safekeeper/src/test_utils.rs +++ b/safekeeper/src/test_utils.rs @@ -108,7 +108,6 @@ impl Env { &timeline_dir, &remote_path, shared_state, - todo!(), conf.clone(), wal_backup.clone(), ); @@ -118,6 +117,7 @@ impl Env { Arc::new(TimelinesSet::default()), // ignored for now RateLimiter::new(0, 0), wal_backup, + todo!(), ); Ok(timeline) } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index f1e0710382..baf29f0b48 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -450,7 +450,6 @@ pub struct Timeline { /// synchronized with the disk. This is tokio mutex as we write WAL to disk /// while holding it, ensuring that consensus checks are in order. mutex: RwLock, - pub(crate) wal_advertiser: Arc, walsenders: Arc, walreceivers: Arc, timeline_dir: Utf8PathBuf, @@ -482,7 +481,6 @@ impl Timeline { timeline_dir: &Utf8Path, remote_path: &RemotePath, shared_state: SharedState, - wal_advertiser: Arc, conf: Arc, wal_backup: Arc, ) -> Arc { @@ -507,7 +505,6 @@ impl Timeline { shared_state_version_tx, shared_state_version_rx, mutex: RwLock::new(shared_state), - wal_advertiser, walsenders: WalSenders::new(walreceivers.clone()), walreceivers, gate: Default::default(), @@ -528,7 +525,6 @@ impl Timeline { conf: Arc, ttid: TenantTimelineId, wal_backup: Arc, - wal_advertiser: Arc, ) -> Result> { let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered(); @@ -541,7 +537,6 @@ impl Timeline { &timeline_dir, &remote_path, shared_state, - wal_advertiser, conf, wal_backup, )) @@ -555,6 +550,7 @@ impl Timeline { broker_active_set: Arc, partial_backup_rate_limiter: RateLimiter, wal_backup: Arc, + wal_advertiser: Arc, ) { let (tx, rx) = self.manager_ctl.bootstrap_manager(); @@ -578,6 +574,7 @@ impl Timeline { rx, partial_backup_rate_limiter, wal_backup, + wal_advertiser, ) .await } diff --git a/safekeeper/src/timeline_manager.rs b/safekeeper/src/timeline_manager.rs index 48eda92fed..77410e480d 100644 --- a/safekeeper/src/timeline_manager.rs +++ b/safekeeper/src/timeline_manager.rs @@ -22,7 +22,6 @@ use tokio_util::sync::CancellationToken; use tracing::{Instrument, debug, info, info_span, instrument, warn}; use utils::lsn::Lsn; -use crate::SafeKeeperConf; use crate::control_file::{FileStorage, Storage}; use crate::metrics::{ MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL, MISC_OPERATION_SECONDS, NUM_EVICTED_TIMELINES, @@ -37,6 +36,7 @@ use crate::timeline_guard::{AccessService, GuardId, ResidenceGuard}; use crate::timelines_set::{TimelineSetGuard, TimelinesSet}; use crate::wal_backup::{self, WalBackup, WalBackupTaskHandle}; use crate::wal_backup_partial::{self, PartialBackup, PartialRemoteSegment}; +use crate::{SafeKeeperConf, wal_advertiser}; pub(crate) struct StateSnapshot { // inmem values @@ -201,6 +201,7 @@ pub(crate) struct Manager { pub(crate) wal_seg_size: usize, pub(crate) walsenders: Arc, pub(crate) wal_backup: Arc, + pub(crate) wal_advertiser: Arc, // current state pub(crate) state_version_rx: tokio::sync::watch::Receiver, @@ -240,6 +241,7 @@ pub async fn main_task( mut manager_rx: tokio::sync::mpsc::UnboundedReceiver, global_rate_limiter: RateLimiter, wal_backup: Arc, + wal_advertiser: Arc, ) { tli.set_status(Status::Started); @@ -259,6 +261,7 @@ pub async fn main_task( manager_tx, global_rate_limiter, wal_backup, + wal_advertiser, ) .await; @@ -287,7 +290,9 @@ pub async fn main_task( mgr.set_status(Status::UpdateBackup); let is_wal_backup_required = mgr.update_backup(num_computes, &state_snapshot).await; - mgr.update_is_active(is_wal_backup_required, num_computes, &state_snapshot); + + mgr.update_broker_active(is_wal_backup_required, num_computes, &state_snapshot); + mgr.update_wal_advertiser(&state_snapshot); mgr.set_status(Status::UpdateControlFile); mgr.update_control_file_save(&state_snapshot, &mut next_event) @@ -419,6 +424,7 @@ impl Manager { manager_tx: tokio::sync::mpsc::UnboundedSender, global_rate_limiter: RateLimiter, wal_backup: Arc, + wal_advertiser: Arc, ) -> Manager { let (is_offloaded, partial_backup_uploaded) = tli.bootstrap_mgr().await; Manager { @@ -428,6 +434,7 @@ impl Manager { state_version_rx: tli.get_state_version_rx(), num_computes_rx: tli.get_walreceivers().get_num_rx(), tli_broker_active: broker_active_set.guard(tli.clone()), + wal_advertiser: wal_advertiser.load_timeline(tli.clone()), last_removed_segno: 0, is_offloaded, backup_task: None, @@ -494,8 +501,8 @@ impl Manager { is_wal_backup_required } - /// Update is_active flag and returns its value. - fn update_is_active( + /// Update broker is_active flag and returns its value. + fn update_broker_active( &mut self, is_wal_backup_required: bool, num_computes: usize, @@ -522,6 +529,11 @@ impl Manager { .store(is_active, std::sync::atomic::Ordering::Relaxed); } + fn update_wal_advertiser(&mut self, state: &StateSnapshot) { + self.wal_advertiser.update_commit_lsn(state.commit_lsn); + // TODO: feed back monitoring info into Arc like we do for tli.broker_active in update_broker_active + } + /// Save control file if needed. Returns Instant if we should persist the control file in the future. async fn update_control_file_save( &self, diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index 774cfad531..83152a7529 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -24,6 +24,7 @@ use utils::shard::TenantShardId; use crate::defaults::DEFAULT_EVICTION_CONCURRENCY; use crate::http::routes::DeleteOrExcludeError; use crate::rate_limit::RateLimiter; +use crate::receive_wal::WalAcceptor; use crate::state::TimelinePersistentState; use crate::timeline::{Timeline, TimelineError, delete_dir, get_tenant_dir, get_timeline_dir}; use crate::timelines_set::TimelinesSet; @@ -180,13 +181,7 @@ impl GlobalTimelines { TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or("")) { let ttid = TenantTimelineId::new(tenant_id, timeline_id); - let wal_advertiser = wal_advertiser.load_timeline(ttid); - match Timeline::load_timeline( - conf.clone(), - ttid, - wal_backup.clone(), - wal_advertiser, - ) { + match Timeline::load_timeline(conf.clone(), ttid, wal_backup.clone()) { Ok(tli) => { let mut shared_state = tli.write_shared_state().await; self.state @@ -200,6 +195,7 @@ impl GlobalTimelines { broker_active_set.clone(), partial_backup_rate_limiter.clone(), wal_backup.clone(), + wal_advertiser.clone(), ); } // If we can't load a timeline, it's most likely because of a corrupted @@ -325,11 +321,9 @@ impl GlobalTimelines { }; // Do the actual move and reflect the result in the map. - let wal_advertiser = wal_advertiser.load_timeline(ttid); match GlobalTimelines::install_temp_timeline( ttid, tmp_path, - wal_advertiser, conf.clone(), wal_backup.clone(), ) @@ -353,6 +347,7 @@ impl GlobalTimelines { broker_active_set, partial_backup_rate_limiter, wal_backup, + wal_advertiser.clone(), ); drop(timeline_shared_state); Ok(timeline) @@ -374,7 +369,6 @@ impl GlobalTimelines { async fn install_temp_timeline( ttid: TenantTimelineId, tmp_path: &Utf8PathBuf, - wal_advertiser: Arc, conf: Arc, wal_backup: Arc, ) -> Result> { @@ -418,7 +412,7 @@ impl GlobalTimelines { // Do the move. durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?; - Timeline::load_timeline(conf, ttid, wal_backup, wal_advertiser) + Timeline::load_timeline(conf, ttid, wal_backup) } /// Get a timeline from the global map. If it's not present, it doesn't exist on disk, diff --git a/safekeeper/src/wal_advertiser/advmap.rs b/safekeeper/src/wal_advertiser/advmap.rs index f43de8e146..855191a43d 100644 --- a/safekeeper/src/wal_advertiser/advmap.rs +++ b/safekeeper/src/wal_advertiser/advmap.rs @@ -13,6 +13,8 @@ use utils::{ shard::{ShardIndex, TenantShardId}, }; +use crate::timeline::Timeline; + #[derive(Default)] pub struct World { pageservers: RwLock>>, @@ -42,7 +44,7 @@ pub struct SafekeeperTimeline {} impl World { pub fn housekeeping(&self) {} - pub fn load_timeline(&self, ttid: TenantTimelineId) -> Arc { + pub fn load_timeline(&self, ttid: Arc) -> Arc { todo!() } pub fn update_pageserver_attachments( @@ -55,6 +57,9 @@ impl World { } impl SafekeeperTimeline { + pub fn update_commit_lsn(&self, commit_lsn: Lsn) { + todo!() + } pub fn get_pageserver_timeline( &self, ttld: TenantTimelineId,