diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index a69dadb7bb..42bb02c1ea 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -149,8 +149,12 @@ impl SharedState { /// Mark timeline active/inactive and return whether s3 offloading requires /// start/stop action. - fn update_status(&mut self) -> bool { - self.active = self.is_active(); + fn update_status(&mut self, ttid: ZTenantTimelineId) -> bool { + let is_active = self.is_active(); + if self.active != is_active { + info!("timeline {} active={} now", ttid, is_active); + } + self.active = is_active; self.is_wal_backup_action_pending() } @@ -187,6 +191,12 @@ impl SharedState { self.wal_backup_active } + // Can this safekeeper offload to s3? Recently joined safekeepers might not + // have necessary WAL. + fn can_wal_backup(&self) -> bool { + self.sk.state.local_start_lsn <= self.sk.inmem.backup_lsn + } + fn get_wal_seg_size(&self) -> usize { self.sk.state.server.wal_seg_size as usize } @@ -291,7 +301,7 @@ impl Timeline { { let mut shared_state = self.mutex.lock().unwrap(); shared_state.num_computes += 1; - is_wal_backup_action_pending = shared_state.update_status(); + is_wal_backup_action_pending = shared_state.update_status(self.zttid); } // Wake up wal backup launcher, if offloading not started yet. if is_wal_backup_action_pending { @@ -308,7 +318,7 @@ impl Timeline { { let mut shared_state = self.mutex.lock().unwrap(); shared_state.num_computes -= 1; - is_wal_backup_action_pending = shared_state.update_status(); + is_wal_backup_action_pending = shared_state.update_status(self.zttid); } // Wake up wal backup launcher, if it is time to stop the offloading. if is_wal_backup_action_pending { @@ -327,7 +337,7 @@ impl Timeline { (replica_state.remote_consistent_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet. replica_state.remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn); if stop { - shared_state.update_status(); + shared_state.update_status(self.zttid); return Ok(true); } } @@ -341,6 +351,12 @@ impl Timeline { shared_state.wal_backup_attend() } + // Can this safekeeper offload to s3? Recently joined safekeepers might not + // have necessary WAL. + pub fn can_wal_backup(&self) -> bool { + self.mutex.lock().unwrap().can_wal_backup() + } + /// Deactivates the timeline, assuming it is being deleted. /// Returns whether the timeline was already active. /// @@ -509,7 +525,7 @@ impl Timeline { } shared_state.sk.record_safekeeper_info(sk_info)?; self.notify_wal_senders(&mut shared_state); - is_wal_backup_action_pending = shared_state.update_status(); + is_wal_backup_action_pending = shared_state.update_status(self.zttid); commit_lsn = shared_state.sk.inmem.commit_lsn; } self.commit_lsn_watch_tx.send(commit_lsn)?; diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 08e19f3f2f..1d7c8de3b8 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -49,14 +49,10 @@ pub fn wal_backup_launcher_thread_main( }); } -/// Check whether wal backup is required for timeline and mark that launcher is -/// aware of current status (if timeline exists). -fn is_wal_backup_required(zttid: ZTenantTimelineId) -> bool { - if let Some(tli) = GlobalTimelines::get_loaded(zttid) { - tli.wal_backup_attend() - } else { - false - } +/// Check whether wal backup is required for timeline. If yes, mark that launcher is +/// aware of current status and return the timeline. +fn is_wal_backup_required(zttid: ZTenantTimelineId) -> Option> { + GlobalTimelines::get_loaded(zttid).filter(|t| t.wal_backup_attend()) } struct WalBackupTaskHandle { @@ -64,6 +60,56 @@ struct WalBackupTaskHandle { handle: JoinHandle<()>, } +struct WalBackupTimelineEntry { + timeline: Arc, + handle: Option, +} + +/// Start per timeline task, if it makes sense for this safekeeper to offload. +fn consider_start_task( + conf: &SafeKeeperConf, + zttid: ZTenantTimelineId, + task: &mut WalBackupTimelineEntry, +) { + if !task.timeline.can_wal_backup() { + return; + } + info!("starting WAL backup task for {}", zttid); + + // TODO: decide who should offload right here by simply checking current + // state instead of running elections in offloading task. + let election_name = SubscriptionKey { + cluster_prefix: conf.broker_etcd_prefix.clone(), + kind: SubscriptionKind::Operation( + zttid, + NodeKind::Safekeeper, + OperationKind::Safekeeper(SkOperationKind::WalBackup), + ), + } + .watch_key(); + let my_candidate_name = broker::get_candiate_name(conf.my_id); + let election = broker::Election::new( + election_name, + my_candidate_name, + conf.broker_endpoints.clone(), + ); + + let (shutdown_tx, shutdown_rx) = mpsc::channel(1); + let timeline_dir = conf.timeline_dir(&zttid); + + let handle = tokio::spawn( + backup_task_main(zttid, timeline_dir, shutdown_rx, election) + .instrument(info_span!("WAL backup task", zttid = %zttid)), + ); + + task.handle = Some(WalBackupTaskHandle { + shutdown_tx, + handle, + }); +} + +const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000; + /// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup /// tasks. Having this in separate task simplifies locking, allows to reap /// panics and separate elections from offloading itself. @@ -72,7 +118,7 @@ async fn wal_backup_launcher_main_loop( mut wal_backup_launcher_rx: Receiver, ) { info!( - "WAL backup launcher: started, remote config {:?}", + "WAL backup launcher started, remote config {:?}", conf.remote_storage ); @@ -83,64 +129,50 @@ async fn wal_backup_launcher_main_loop( }) }); - let mut tasks: HashMap = HashMap::new(); + // Presense in this map means launcher is aware s3 offloading is needed for + // the timeline, but task is started only if it makes sense for to offload + // from this safekeeper. + let mut tasks: HashMap = HashMap::new(); + let mut ticker = tokio::time::interval(Duration::from_millis(CHECK_TASKS_INTERVAL_MSEC)); loop { - // channel is never expected to get closed - let zttid = wal_backup_launcher_rx.recv().await.unwrap(); - let is_wal_backup_required = is_wal_backup_required(zttid); - if conf.remote_storage.is_none() || !conf.wal_backup_enabled { - continue; /* just drain the channel and do nothing */ - } - // do we need to do anything at all? - if is_wal_backup_required != tasks.contains_key(&zttid) { - if is_wal_backup_required { - // need to start the task - info!("starting WAL backup task for {}", zttid); - - // TODO: decide who should offload in launcher itself by simply checking current state - let election_name = SubscriptionKey { - cluster_prefix: conf.broker_etcd_prefix.clone(), - kind: SubscriptionKind::Operation( - zttid, - NodeKind::Safekeeper, - OperationKind::Safekeeper(SkOperationKind::WalBackup), - ), + tokio::select! { + zttid = wal_backup_launcher_rx.recv() => { + // channel is never expected to get closed + let zttid = zttid.unwrap(); + if conf.remote_storage.is_none() || !conf.wal_backup_enabled { + continue; /* just drain the channel and do nothing */ } - .watch_key(); - let my_candidate_name = broker::get_candiate_name(conf.my_id); - let election = broker::Election::new( - election_name, - my_candidate_name, - conf.broker_endpoints.clone(), - ); + let timeline = is_wal_backup_required(zttid); + // do we need to do anything at all? + if timeline.is_some() != tasks.contains_key(&zttid) { + if let Some(timeline) = timeline { + // need to start the task + let entry = tasks.entry(zttid).or_insert(WalBackupTimelineEntry { + timeline, + handle: None, + }); + consider_start_task(&conf, zttid, entry); + } else { + // need to stop the task + info!("stopping WAL backup task for {}", zttid); - let (shutdown_tx, shutdown_rx) = mpsc::channel(1); - let timeline_dir = conf.timeline_dir(&zttid); - - let handle = tokio::spawn( - backup_task_main(zttid, timeline_dir, shutdown_rx, election) - .instrument(info_span!("WAL backup task", zttid = %zttid)), - ); - - tasks.insert( - zttid, - WalBackupTaskHandle { - shutdown_tx, - handle, - }, - ); - } else { - // need to stop the task - info!("stopping WAL backup task for {}", zttid); - - let wb_handle = tasks.remove(&zttid).unwrap(); - // Tell the task to shutdown. Error means task exited earlier, that's ok. - let _ = wb_handle.shutdown_tx.send(()).await; - // Await the task itself. TODO: restart panicked tasks earlier. - // Hm, why I can't await on reference to handle? - if let Err(e) = wb_handle.handle.await { - warn!("WAL backup task for {} panicked: {}", zttid, e); + let entry = tasks.remove(&zttid).unwrap(); + if let Some(wb_handle) = entry.handle { + // Tell the task to shutdown. Error means task exited earlier, that's ok. + let _ = wb_handle.shutdown_tx.send(()).await; + // Await the task itself. TODO: restart panicked tasks earlier. + if let Err(e) = wb_handle.handle.await { + warn!("WAL backup task for {} panicked: {}", zttid, e); + } + } + } + } + } + // Start known tasks, if needed and possible. + _ = ticker.tick() => { + for (zttid, entry) in tasks.iter_mut().filter(|(_, entry)| entry.handle.is_none()) { + consider_start_task(&conf, *zttid, entry); } } } diff --git a/test_runner/batch_others/test_normal_work.py b/test_runner/batch_others/test_normal_work.py index c0f44ce7a9..4635a70de6 100644 --- a/test_runner/batch_others/test_normal_work.py +++ b/test_runner/batch_others/test_normal_work.py @@ -42,8 +42,8 @@ def test_normal_work(neon_env_builder: NeonEnvBuilder, num_timelines: int, num_s Repeat check for several tenants/timelines. """ - env = neon_env_builder.init_start() neon_env_builder.num_safekeepers = num_safekeepers + env = neon_env_builder.init_start() pageserver_http = env.pageserver.http_client() for _ in range(num_timelines):