Don't s3 offload from newly joined safekeeper not having required WAL.

I made the check at launcher level with the perspective of generally moving
election (decision who offloads) there.

Also log timeline 'active' changes.
This commit is contained in:
Arseny Sher
2022-06-09 14:15:41 +04:00
parent e22d9cee3a
commit a51b2dac9a
3 changed files with 118 additions and 70 deletions

View File

@@ -149,8 +149,12 @@ impl SharedState {
/// Mark timeline active/inactive and return whether s3 offloading requires
/// start/stop action.
fn update_status(&mut self) -> bool {
self.active = self.is_active();
fn update_status(&mut self, ttid: ZTenantTimelineId) -> bool {
let is_active = self.is_active();
if self.active != is_active {
info!("timeline {} active={} now", ttid, is_active);
}
self.active = is_active;
self.is_wal_backup_action_pending()
}
@@ -187,6 +191,12 @@ impl SharedState {
self.wal_backup_active
}
// Can this safekeeper offload to s3? Recently joined safekeepers might not
// have necessary WAL.
fn can_wal_backup(&self) -> bool {
self.sk.state.local_start_lsn <= self.sk.inmem.backup_lsn
}
fn get_wal_seg_size(&self) -> usize {
self.sk.state.server.wal_seg_size as usize
}
@@ -291,7 +301,7 @@ impl Timeline {
{
let mut shared_state = self.mutex.lock().unwrap();
shared_state.num_computes += 1;
is_wal_backup_action_pending = shared_state.update_status();
is_wal_backup_action_pending = shared_state.update_status(self.zttid);
}
// Wake up wal backup launcher, if offloading not started yet.
if is_wal_backup_action_pending {
@@ -308,7 +318,7 @@ impl Timeline {
{
let mut shared_state = self.mutex.lock().unwrap();
shared_state.num_computes -= 1;
is_wal_backup_action_pending = shared_state.update_status();
is_wal_backup_action_pending = shared_state.update_status(self.zttid);
}
// Wake up wal backup launcher, if it is time to stop the offloading.
if is_wal_backup_action_pending {
@@ -327,7 +337,7 @@ impl Timeline {
(replica_state.remote_consistent_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet.
replica_state.remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn);
if stop {
shared_state.update_status();
shared_state.update_status(self.zttid);
return Ok(true);
}
}
@@ -341,6 +351,12 @@ impl Timeline {
shared_state.wal_backup_attend()
}
// Can this safekeeper offload to s3? Recently joined safekeepers might not
// have necessary WAL.
pub fn can_wal_backup(&self) -> bool {
self.mutex.lock().unwrap().can_wal_backup()
}
/// Deactivates the timeline, assuming it is being deleted.
/// Returns whether the timeline was already active.
///
@@ -509,7 +525,7 @@ impl Timeline {
}
shared_state.sk.record_safekeeper_info(sk_info)?;
self.notify_wal_senders(&mut shared_state);
is_wal_backup_action_pending = shared_state.update_status();
is_wal_backup_action_pending = shared_state.update_status(self.zttid);
commit_lsn = shared_state.sk.inmem.commit_lsn;
}
self.commit_lsn_watch_tx.send(commit_lsn)?;

View File

@@ -49,14 +49,10 @@ pub fn wal_backup_launcher_thread_main(
});
}
/// Check whether wal backup is required for timeline and mark that launcher is
/// aware of current status (if timeline exists).
fn is_wal_backup_required(zttid: ZTenantTimelineId) -> bool {
if let Some(tli) = GlobalTimelines::get_loaded(zttid) {
tli.wal_backup_attend()
} else {
false
}
/// Check whether wal backup is required for timeline. If yes, mark that launcher is
/// aware of current status and return the timeline.
fn is_wal_backup_required(zttid: ZTenantTimelineId) -> Option<Arc<Timeline>> {
GlobalTimelines::get_loaded(zttid).filter(|t| t.wal_backup_attend())
}
struct WalBackupTaskHandle {
@@ -64,6 +60,56 @@ struct WalBackupTaskHandle {
handle: JoinHandle<()>,
}
struct WalBackupTimelineEntry {
timeline: Arc<Timeline>,
handle: Option<WalBackupTaskHandle>,
}
/// Start per timeline task, if it makes sense for this safekeeper to offload.
fn consider_start_task(
conf: &SafeKeeperConf,
zttid: ZTenantTimelineId,
task: &mut WalBackupTimelineEntry,
) {
if !task.timeline.can_wal_backup() {
return;
}
info!("starting WAL backup task for {}", zttid);
// TODO: decide who should offload right here by simply checking current
// state instead of running elections in offloading task.
let election_name = SubscriptionKey {
cluster_prefix: conf.broker_etcd_prefix.clone(),
kind: SubscriptionKind::Operation(
zttid,
NodeKind::Safekeeper,
OperationKind::Safekeeper(SkOperationKind::WalBackup),
),
}
.watch_key();
let my_candidate_name = broker::get_candiate_name(conf.my_id);
let election = broker::Election::new(
election_name,
my_candidate_name,
conf.broker_endpoints.clone(),
);
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let timeline_dir = conf.timeline_dir(&zttid);
let handle = tokio::spawn(
backup_task_main(zttid, timeline_dir, shutdown_rx, election)
.instrument(info_span!("WAL backup task", zttid = %zttid)),
);
task.handle = Some(WalBackupTaskHandle {
shutdown_tx,
handle,
});
}
const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;
/// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup
/// tasks. Having this in separate task simplifies locking, allows to reap
/// panics and separate elections from offloading itself.
@@ -72,7 +118,7 @@ async fn wal_backup_launcher_main_loop(
mut wal_backup_launcher_rx: Receiver<ZTenantTimelineId>,
) {
info!(
"WAL backup launcher: started, remote config {:?}",
"WAL backup launcher started, remote config {:?}",
conf.remote_storage
);
@@ -83,64 +129,50 @@ async fn wal_backup_launcher_main_loop(
})
});
let mut tasks: HashMap<ZTenantTimelineId, WalBackupTaskHandle> = HashMap::new();
// Presense in this map means launcher is aware s3 offloading is needed for
// the timeline, but task is started only if it makes sense for to offload
// from this safekeeper.
let mut tasks: HashMap<ZTenantTimelineId, WalBackupTimelineEntry> = HashMap::new();
let mut ticker = tokio::time::interval(Duration::from_millis(CHECK_TASKS_INTERVAL_MSEC));
loop {
// channel is never expected to get closed
let zttid = wal_backup_launcher_rx.recv().await.unwrap();
let is_wal_backup_required = is_wal_backup_required(zttid);
if conf.remote_storage.is_none() || !conf.wal_backup_enabled {
continue; /* just drain the channel and do nothing */
}
// do we need to do anything at all?
if is_wal_backup_required != tasks.contains_key(&zttid) {
if is_wal_backup_required {
// need to start the task
info!("starting WAL backup task for {}", zttid);
// TODO: decide who should offload in launcher itself by simply checking current state
let election_name = SubscriptionKey {
cluster_prefix: conf.broker_etcd_prefix.clone(),
kind: SubscriptionKind::Operation(
zttid,
NodeKind::Safekeeper,
OperationKind::Safekeeper(SkOperationKind::WalBackup),
),
tokio::select! {
zttid = wal_backup_launcher_rx.recv() => {
// channel is never expected to get closed
let zttid = zttid.unwrap();
if conf.remote_storage.is_none() || !conf.wal_backup_enabled {
continue; /* just drain the channel and do nothing */
}
.watch_key();
let my_candidate_name = broker::get_candiate_name(conf.my_id);
let election = broker::Election::new(
election_name,
my_candidate_name,
conf.broker_endpoints.clone(),
);
let timeline = is_wal_backup_required(zttid);
// do we need to do anything at all?
if timeline.is_some() != tasks.contains_key(&zttid) {
if let Some(timeline) = timeline {
// need to start the task
let entry = tasks.entry(zttid).or_insert(WalBackupTimelineEntry {
timeline,
handle: None,
});
consider_start_task(&conf, zttid, entry);
} else {
// need to stop the task
info!("stopping WAL backup task for {}", zttid);
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let timeline_dir = conf.timeline_dir(&zttid);
let handle = tokio::spawn(
backup_task_main(zttid, timeline_dir, shutdown_rx, election)
.instrument(info_span!("WAL backup task", zttid = %zttid)),
);
tasks.insert(
zttid,
WalBackupTaskHandle {
shutdown_tx,
handle,
},
);
} else {
// need to stop the task
info!("stopping WAL backup task for {}", zttid);
let wb_handle = tasks.remove(&zttid).unwrap();
// Tell the task to shutdown. Error means task exited earlier, that's ok.
let _ = wb_handle.shutdown_tx.send(()).await;
// Await the task itself. TODO: restart panicked tasks earlier.
// Hm, why I can't await on reference to handle?
if let Err(e) = wb_handle.handle.await {
warn!("WAL backup task for {} panicked: {}", zttid, e);
let entry = tasks.remove(&zttid).unwrap();
if let Some(wb_handle) = entry.handle {
// Tell the task to shutdown. Error means task exited earlier, that's ok.
let _ = wb_handle.shutdown_tx.send(()).await;
// Await the task itself. TODO: restart panicked tasks earlier.
if let Err(e) = wb_handle.handle.await {
warn!("WAL backup task for {} panicked: {}", zttid, e);
}
}
}
}
}
// Start known tasks, if needed and possible.
_ = ticker.tick() => {
for (zttid, entry) in tasks.iter_mut().filter(|(_, entry)| entry.handle.is_none()) {
consider_start_task(&conf, *zttid, entry);
}
}
}

View File

@@ -42,8 +42,8 @@ def test_normal_work(neon_env_builder: NeonEnvBuilder, num_timelines: int, num_s
Repeat check for several tenants/timelines.
"""
env = neon_env_builder.init_start()
neon_env_builder.num_safekeepers = num_safekeepers
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
for _ in range(num_timelines):