WIP(ctd): plumbing to feed commit_lsn to wal_advertiser

This commit is contained in:
Christian Schwarz
2025-06-02 12:39:42 +02:00
parent 18a43eeab3
commit df36b9aa62
2 changed files with 49 additions and 13 deletions

View File

@@ -201,7 +201,7 @@ pub(crate) struct Manager {
pub(crate) wal_seg_size: usize,
pub(crate) walsenders: Arc<WalSenders>,
pub(crate) wal_backup: Arc<WalBackup>,
pub(crate) wal_advertiser: Arc<wal_advertiser::advmap::SafekeeperTimeline>,
pub(crate) wal_advertiser: wal_advertiser::advmap::SafekeeperTimelineHandle,
// current state
pub(crate) state_version_rx: tokio::sync::watch::Receiver<usize>,
@@ -292,7 +292,6 @@ pub async fn main_task(
let is_wal_backup_required = mgr.update_backup(num_computes, &state_snapshot).await;
mgr.update_broker_active(is_wal_backup_required, num_computes, &state_snapshot);
mgr.update_wal_advertiser(&state_snapshot);
mgr.set_status(Status::UpdateControlFile);
mgr.update_control_file_save(&state_snapshot, &mut next_event)
@@ -434,7 +433,7 @@ impl Manager {
state_version_rx: tli.get_state_version_rx(),
num_computes_rx: tli.get_walreceivers().get_num_rx(),
tli_broker_active: broker_active_set.guard(tli.clone()),
wal_advertiser: wal_advertiser.load_timeline(tli.clone()),
wal_advertiser: wal_advertiser.spawn(tli.clone()),
last_removed_segno: 0,
is_offloaded,
backup_task: None,
@@ -529,11 +528,6 @@ impl Manager {
.store(is_active, std::sync::atomic::Ordering::Relaxed);
}
fn update_wal_advertiser(&mut self, state: &StateSnapshot) {
self.wal_advertiser.update_commit_lsn(state.commit_lsn);
// TODO: feed back monitoring info into Arc<Timeline> like we do for tli.broker_active in update_broker_active
}
/// Save control file if needed. Returns Instant if we should persist the control file in the future.
async fn update_control_file_save(
&self,

View File

@@ -11,6 +11,7 @@ use utils::{
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
shard::{ShardIndex, TenantShardId},
sync::gate::GateGuard,
};
use crate::timeline::Timeline;
@@ -33,19 +34,31 @@ pub struct PageserverAttachment {
}
pub struct PageserverTimeline {
pageserver: NodeId,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
generation: Generation,
remote_consistent_lsn: RwLock<Lsn>,
pageserver: Arc<Pageserver>,
}
pub struct SafekeeperTimeline {}
pub struct SafekeeperTimelineHandle {}
struct SafekeeperTimeline {
tli: Arc<Timeline>,
pageserver_shards: RwLock<HashMap<TenantShardId, Arc<PageserverTimeline>>>,
}
impl World {
pub fn housekeeping(&self) {}
pub fn load_timeline(&self, ttid: Arc<Timeline>) -> Arc<SafekeeperTimeline> {
todo!()
pub fn spawn(&self, tli: Arc<Timeline>) -> SafekeeperTimelineHandle {
tokio::spawn(async move {
SafekeeperTimeline {
tli,
pageserver_shards: todo!(),
}
.run()
.await;
});
SafekeeperTimelineHandle {}
}
pub fn update_pageserver_attachments(
&self,
@@ -57,7 +70,36 @@ impl World {
}
impl SafekeeperTimeline {
pub fn update_commit_lsn(&self, commit_lsn: Lsn) {
async fn run(self) {
let Ok(gate_guard) = self.tli.gate.enter() else {
return;
};
let cancel = self.tli.cancel.child_token();
let mut commit_lsn_rx = self.tli.get_commit_lsn_watch_rx();
let ttid = self.tli.ttid;
loop {
let commit_lsn = *commit_lsn_rx.borrow_and_update();
{
let guard = self.pageserver_shards.read().unwrap();
for (shard, ps_tl) in guard.iter() {
if *ps_tl.remote_consistent_lsn.read() < commit_lsn {
ps_tl.pageserver.advertise(ps_tl);
}
}
}
tokio::select! {
_ = cancel.cancelled() => { return; }
// TODO: debounce changed notifications, one every second or so is easily enough.
commit_lsn = commit_lsn_rx.changed() => { continue; }
};
}
drop(gate_guard);
}
}
impl Pageserver {
fn advertise(&self, ps_tl: &Arc<PageserverTimeline>) {
todo!()
}
}