WIP: plumbing to feed commit_lsn to wal_advertiser

This commit is contained in:
Christian Schwarz
2025-06-02 11:52:14 +02:00
parent 9ee75ceee6
commit 39039d1be7
5 changed files with 30 additions and 22 deletions

View File

@@ -108,7 +108,6 @@ impl Env {
&timeline_dir,
&remote_path,
shared_state,
todo!(),
conf.clone(),
wal_backup.clone(),
);
@@ -118,6 +117,7 @@ impl Env {
Arc::new(TimelinesSet::default()), // ignored for now
RateLimiter::new(0, 0),
wal_backup,
todo!(),
);
Ok(timeline)
}

View File

@@ -450,7 +450,6 @@ pub struct Timeline {
/// synchronized with the disk. This is tokio mutex as we write WAL to disk
/// while holding it, ensuring that consensus checks are in order.
mutex: RwLock<SharedState>,
pub(crate) wal_advertiser: Arc<wal_advertiser::advmap::SafekeeperTimeline>,
walsenders: Arc<WalSenders>,
walreceivers: Arc<WalReceivers>,
timeline_dir: Utf8PathBuf,
@@ -482,7 +481,6 @@ impl Timeline {
timeline_dir: &Utf8Path,
remote_path: &RemotePath,
shared_state: SharedState,
wal_advertiser: Arc<wal_advertiser::advmap::SafekeeperTimeline>,
conf: Arc<SafeKeeperConf>,
wal_backup: Arc<WalBackup>,
) -> Arc<Self> {
@@ -507,7 +505,6 @@ impl Timeline {
shared_state_version_tx,
shared_state_version_rx,
mutex: RwLock::new(shared_state),
wal_advertiser,
walsenders: WalSenders::new(walreceivers.clone()),
walreceivers,
gate: Default::default(),
@@ -528,7 +525,6 @@ impl Timeline {
conf: Arc<SafeKeeperConf>,
ttid: TenantTimelineId,
wal_backup: Arc<WalBackup>,
wal_advertiser: Arc<wal_advertiser::advmap::SafekeeperTimeline>,
) -> Result<Arc<Timeline>> {
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
@@ -541,7 +537,6 @@ impl Timeline {
&timeline_dir,
&remote_path,
shared_state,
wal_advertiser,
conf,
wal_backup,
))
@@ -555,6 +550,7 @@ impl Timeline {
broker_active_set: Arc<TimelinesSet>,
partial_backup_rate_limiter: RateLimiter,
wal_backup: Arc<WalBackup>,
wal_advertiser: Arc<wal_advertiser::advmap::World>,
) {
let (tx, rx) = self.manager_ctl.bootstrap_manager();
@@ -578,6 +574,7 @@ impl Timeline {
rx,
partial_backup_rate_limiter,
wal_backup,
wal_advertiser,
)
.await
}

View File

@@ -22,7 +22,6 @@ use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, info, info_span, instrument, warn};
use utils::lsn::Lsn;
use crate::SafeKeeperConf;
use crate::control_file::{FileStorage, Storage};
use crate::metrics::{
MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL, MISC_OPERATION_SECONDS, NUM_EVICTED_TIMELINES,
@@ -37,6 +36,7 @@ use crate::timeline_guard::{AccessService, GuardId, ResidenceGuard};
use crate::timelines_set::{TimelineSetGuard, TimelinesSet};
use crate::wal_backup::{self, WalBackup, WalBackupTaskHandle};
use crate::wal_backup_partial::{self, PartialBackup, PartialRemoteSegment};
use crate::{SafeKeeperConf, wal_advertiser};
pub(crate) struct StateSnapshot {
// inmem values
@@ -201,6 +201,7 @@ pub(crate) struct Manager {
pub(crate) wal_seg_size: usize,
pub(crate) walsenders: Arc<WalSenders>,
pub(crate) wal_backup: Arc<WalBackup>,
pub(crate) wal_advertiser: Arc<wal_advertiser::advmap::SafekeeperTimeline>,
// current state
pub(crate) state_version_rx: tokio::sync::watch::Receiver<usize>,
@@ -240,6 +241,7 @@ pub async fn main_task(
mut manager_rx: tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>,
global_rate_limiter: RateLimiter,
wal_backup: Arc<WalBackup>,
wal_advertiser: Arc<wal_advertiser::advmap::World>,
) {
tli.set_status(Status::Started);
@@ -259,6 +261,7 @@ pub async fn main_task(
manager_tx,
global_rate_limiter,
wal_backup,
wal_advertiser,
)
.await;
@@ -287,7 +290,9 @@ pub async fn main_task(
mgr.set_status(Status::UpdateBackup);
let is_wal_backup_required = mgr.update_backup(num_computes, &state_snapshot).await;
mgr.update_is_active(is_wal_backup_required, num_computes, &state_snapshot);
mgr.update_broker_active(is_wal_backup_required, num_computes, &state_snapshot);
mgr.update_wal_advertiser(&state_snapshot);
mgr.set_status(Status::UpdateControlFile);
mgr.update_control_file_save(&state_snapshot, &mut next_event)
@@ -419,6 +424,7 @@ impl Manager {
manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
global_rate_limiter: RateLimiter,
wal_backup: Arc<WalBackup>,
wal_advertiser: Arc<wal_advertiser::advmap::World>,
) -> Manager {
let (is_offloaded, partial_backup_uploaded) = tli.bootstrap_mgr().await;
Manager {
@@ -428,6 +434,7 @@ impl Manager {
state_version_rx: tli.get_state_version_rx(),
num_computes_rx: tli.get_walreceivers().get_num_rx(),
tli_broker_active: broker_active_set.guard(tli.clone()),
wal_advertiser: wal_advertiser.load_timeline(tli.clone()),
last_removed_segno: 0,
is_offloaded,
backup_task: None,
@@ -494,8 +501,8 @@ impl Manager {
is_wal_backup_required
}
/// Update is_active flag and returns its value.
fn update_is_active(
/// Update broker is_active flag and returns its value.
fn update_broker_active(
&mut self,
is_wal_backup_required: bool,
num_computes: usize,
@@ -522,6 +529,11 @@ impl Manager {
.store(is_active, std::sync::atomic::Ordering::Relaxed);
}
fn update_wal_advertiser(&mut self, state: &StateSnapshot) {
self.wal_advertiser.update_commit_lsn(state.commit_lsn);
// TODO: feed back monitoring info into Arc<Timeline> like we do for tli.broker_active in update_broker_active
}
/// Save control file if needed. Returns Instant if we should persist the control file in the future.
async fn update_control_file_save(
&self,

View File

@@ -24,6 +24,7 @@ use utils::shard::TenantShardId;
use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
use crate::http::routes::DeleteOrExcludeError;
use crate::rate_limit::RateLimiter;
use crate::receive_wal::WalAcceptor;
use crate::state::TimelinePersistentState;
use crate::timeline::{Timeline, TimelineError, delete_dir, get_tenant_dir, get_timeline_dir};
use crate::timelines_set::TimelinesSet;
@@ -180,13 +181,7 @@ impl GlobalTimelines {
TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
{
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
let wal_advertiser = wal_advertiser.load_timeline(ttid);
match Timeline::load_timeline(
conf.clone(),
ttid,
wal_backup.clone(),
wal_advertiser,
) {
match Timeline::load_timeline(conf.clone(), ttid, wal_backup.clone()) {
Ok(tli) => {
let mut shared_state = tli.write_shared_state().await;
self.state
@@ -200,6 +195,7 @@ impl GlobalTimelines {
broker_active_set.clone(),
partial_backup_rate_limiter.clone(),
wal_backup.clone(),
wal_advertiser.clone(),
);
}
// If we can't load a timeline, it's most likely because of a corrupted
@@ -325,11 +321,9 @@ impl GlobalTimelines {
};
// Do the actual move and reflect the result in the map.
let wal_advertiser = wal_advertiser.load_timeline(ttid);
match GlobalTimelines::install_temp_timeline(
ttid,
tmp_path,
wal_advertiser,
conf.clone(),
wal_backup.clone(),
)
@@ -353,6 +347,7 @@ impl GlobalTimelines {
broker_active_set,
partial_backup_rate_limiter,
wal_backup,
wal_advertiser.clone(),
);
drop(timeline_shared_state);
Ok(timeline)
@@ -374,7 +369,6 @@ impl GlobalTimelines {
async fn install_temp_timeline(
ttid: TenantTimelineId,
tmp_path: &Utf8PathBuf,
wal_advertiser: Arc<wal_advertiser::advmap::SafekeeperTimeline>,
conf: Arc<SafeKeeperConf>,
wal_backup: Arc<WalBackup>,
) -> Result<Arc<Timeline>> {
@@ -418,7 +412,7 @@ impl GlobalTimelines {
// Do the move.
durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?;
Timeline::load_timeline(conf, ttid, wal_backup, wal_advertiser)
Timeline::load_timeline(conf, ttid, wal_backup)
}
/// Get a timeline from the global map. If it's not present, it doesn't exist on disk,

View File

@@ -13,6 +13,8 @@ use utils::{
shard::{ShardIndex, TenantShardId},
};
use crate::timeline::Timeline;
#[derive(Default)]
pub struct World {
pageservers: RwLock<HashMap<NodeId, Arc<Pageserver>>>,
@@ -42,7 +44,7 @@ pub struct SafekeeperTimeline {}
impl World {
pub fn housekeeping(&self) {}
pub fn load_timeline(&self, ttid: TenantTimelineId) -> Arc<SafekeeperTimeline> {
pub fn load_timeline(&self, ttid: Arc<Timeline>) -> Arc<SafekeeperTimeline> {
todo!()
}
pub fn update_pageserver_attachments(
@@ -55,6 +57,9 @@ impl World {
}
impl SafekeeperTimeline {
pub fn update_commit_lsn(&self, commit_lsn: Lsn) {
todo!()
}
pub fn get_pageserver_timeline(
&self,
ttld: TenantTimelineId,