From df36b9aa6242d620ffc30dd2ed086f4a98d17430 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 2 Jun 2025 12:39:42 +0200 Subject: [PATCH] WIP(ctd): plumbing to feed commit_lsn to wal_advertiser --- safekeeper/src/timeline_manager.rs | 10 +---- safekeeper/src/wal_advertiser/advmap.rs | 52 ++++++++++++++++++++++--- 2 files changed, 49 insertions(+), 13 deletions(-) diff --git a/safekeeper/src/timeline_manager.rs b/safekeeper/src/timeline_manager.rs index 77410e480d..82c44e1fd0 100644 --- a/safekeeper/src/timeline_manager.rs +++ b/safekeeper/src/timeline_manager.rs @@ -201,7 +201,7 @@ pub(crate) struct Manager { pub(crate) wal_seg_size: usize, pub(crate) walsenders: Arc, pub(crate) wal_backup: Arc, - pub(crate) wal_advertiser: Arc, + pub(crate) wal_advertiser: wal_advertiser::advmap::SafekeeperTimelineHandle, // current state pub(crate) state_version_rx: tokio::sync::watch::Receiver, @@ -292,7 +292,6 @@ pub async fn main_task( let is_wal_backup_required = mgr.update_backup(num_computes, &state_snapshot).await; mgr.update_broker_active(is_wal_backup_required, num_computes, &state_snapshot); - mgr.update_wal_advertiser(&state_snapshot); mgr.set_status(Status::UpdateControlFile); mgr.update_control_file_save(&state_snapshot, &mut next_event) @@ -434,7 +433,7 @@ impl Manager { state_version_rx: tli.get_state_version_rx(), num_computes_rx: tli.get_walreceivers().get_num_rx(), tli_broker_active: broker_active_set.guard(tli.clone()), - wal_advertiser: wal_advertiser.load_timeline(tli.clone()), + wal_advertiser: wal_advertiser.spawn(tli.clone()), last_removed_segno: 0, is_offloaded, backup_task: None, @@ -529,11 +528,6 @@ impl Manager { .store(is_active, std::sync::atomic::Ordering::Relaxed); } - fn update_wal_advertiser(&mut self, state: &StateSnapshot) { - self.wal_advertiser.update_commit_lsn(state.commit_lsn); - // TODO: feed back monitoring info into Arc like we do for tli.broker_active in update_broker_active - } - /// Save control file if needed. Returns Instant if we should persist the control file in the future. async fn update_control_file_save( &self, diff --git a/safekeeper/src/wal_advertiser/advmap.rs b/safekeeper/src/wal_advertiser/advmap.rs index 10258feaff..a26a245859 100644 --- a/safekeeper/src/wal_advertiser/advmap.rs +++ b/safekeeper/src/wal_advertiser/advmap.rs @@ -11,6 +11,7 @@ use utils::{ id::{NodeId, TenantId, TenantTimelineId, TimelineId}, lsn::Lsn, shard::{ShardIndex, TenantShardId}, + sync::gate::GateGuard, }; use crate::timeline::Timeline; @@ -33,19 +34,31 @@ pub struct PageserverAttachment { } pub struct PageserverTimeline { - pageserver: NodeId, tenant_shard_id: TenantShardId, timeline_id: TimelineId, generation: Generation, remote_consistent_lsn: RwLock, + pageserver: Arc, } -pub struct SafekeeperTimeline {} +pub struct SafekeeperTimelineHandle {} +struct SafekeeperTimeline { + tli: Arc, + pageserver_shards: RwLock>>, +} impl World { pub fn housekeeping(&self) {} - pub fn load_timeline(&self, ttid: Arc) -> Arc { - todo!() + pub fn spawn(&self, tli: Arc) -> SafekeeperTimelineHandle { + tokio::spawn(async move { + SafekeeperTimeline { + tli, + pageserver_shards: todo!(), + } + .run() + .await; + }); + SafekeeperTimelineHandle {} } pub fn update_pageserver_attachments( &self, @@ -57,7 +70,36 @@ impl World { } impl SafekeeperTimeline { - pub fn update_commit_lsn(&self, commit_lsn: Lsn) { + async fn run(self) { + let Ok(gate_guard) = self.tli.gate.enter() else { + return; + }; + let cancel = self.tli.cancel.child_token(); + let mut commit_lsn_rx = self.tli.get_commit_lsn_watch_rx(); + let ttid = self.tli.ttid; + loop { + let commit_lsn = *commit_lsn_rx.borrow_and_update(); + { + let guard = self.pageserver_shards.read().unwrap(); + for (shard, ps_tl) in guard.iter() { + if *ps_tl.remote_consistent_lsn.read() < commit_lsn { + ps_tl.pageserver.advertise(ps_tl); + } + } + } + + tokio::select! { + _ = cancel.cancelled() => { return; } + // TODO: debounce changed notifications, one every second or so is easily enough. + commit_lsn = commit_lsn_rx.changed() => { continue; } + }; + } + drop(gate_guard); + } +} + +impl Pageserver { + fn advertise(&self, ps_tl: &Arc) { todo!() } }