From a42eba3cd72beadc6e02647bd5be0f27fff49911 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Fri, 27 May 2022 13:09:17 +0400 Subject: [PATCH] s3 WAL offloading staging review. - Uncomment accidently `self.keep_alive.abort()` commented line, due to this task never finished, which blocked launcher. - Mess up with initialization one more time, to fix offloader trying to back up segment 0. Now we initialize all required LSNs in handle_elected, where we learn start LSN for the first time. - Fix blind attempt to provide safekeeper service file with remote storage params. --- .circleci/ansible/systemd/safekeeper.service | 2 +- libs/utils/src/zid.rs | 2 +- safekeeper/src/broker.rs | 2 +- safekeeper/src/safekeeper.rs | 50 +++++++++----------- safekeeper/src/wal_backup.rs | 19 ++++---- 5 files changed, 35 insertions(+), 40 deletions(-) diff --git a/.circleci/ansible/systemd/safekeeper.service b/.circleci/ansible/systemd/safekeeper.service index a6b443c3e7..e4a395a60e 100644 --- a/.circleci/ansible/systemd/safekeeper.service +++ b/.circleci/ansible/systemd/safekeeper.service @@ -6,7 +6,7 @@ After=network.target auditd.service Type=simple User=safekeeper Environment=RUST_BACKTRACE=1 ZENITH_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/lib -ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}.local:6500 --listen-http {{ inventory_hostname }}.local:7676 -p {{ first_pageserver }}:6400 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --remote_storage='{bucket_name={{bucket_name}}, bucket_region={{bucket_region}}, prefix_in_bucket=wal}' +ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}.local:6500 --listen-http {{ inventory_hostname }}.local:7676 -p {{ first_pageserver }}:6400 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="wal"}' ExecReload=/bin/kill -HUP $MAINPID KillMode=mixed KillSignal=SIGINT diff --git a/libs/utils/src/zid.rs b/libs/utils/src/zid.rs index 02f781c49a..0ef174da4d 100644 --- a/libs/utils/src/zid.rs +++ b/libs/utils/src/zid.rs @@ -218,7 +218,7 @@ impl ZTenantTimelineId { impl fmt::Display for ZTenantTimelineId { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}-{}", self.tenant_id, self.timeline_id) + write!(f, "{}/{}", self.tenant_id, self.timeline_id) } } diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index 676719b60d..5bcb197205 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -83,7 +83,7 @@ impl ElectionLeader { } pub async fn give_up(self) { - // self.keep_alive.abort(); + self.keep_alive.abort(); // TODO: it'll be wise to resign here but it'll happen after lease expiration anyway // should we await for keep alive termination? let _ = self.keep_alive.await; diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 9a07127771..0a7adb96b6 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -731,24 +731,36 @@ where { let mut state = self.state.clone(); - // Remeber point where WAL begins globally, if not yet. + // Here we learn initial LSN for the first time, set fields + // interested in that. + if state.timeline_start_lsn == Lsn(0) { + // Remember point where WAL begins globally. state.timeline_start_lsn = msg.timeline_start_lsn; info!( "setting timeline_start_lsn to {:?}", state.timeline_start_lsn ); - } - // Remember point where WAL begins locally, if not yet. (I doubt the - // second condition is ever possible) - if state.local_start_lsn == Lsn(0) || state.local_start_lsn >= msg.start_streaming_at { state.local_start_lsn = msg.start_streaming_at; info!("setting local_start_lsn to {:?}", state.local_start_lsn); } + // Initializing commit_lsn before acking first flushed record is + // important to let find_end_of_wal skip the whole in the beginning + // of the first segment. + // + // NB: on new clusters, this happens at the same time as + // timeline_start_lsn initialization, it is taken outside to provide + // upgrade. + self.global_commit_lsn = max(self.global_commit_lsn, state.timeline_start_lsn); + self.inmem.commit_lsn = max(self.inmem.commit_lsn, state.timeline_start_lsn); + self.metrics.commit_lsn.set(self.inmem.commit_lsn.0 as f64); + + // Initalizing backup_lsn is useful to avoid making backup think it should upload 0 segment. + self.inmem.backup_lsn = max(self.inmem.backup_lsn, state.timeline_start_lsn); state.acceptor_state.term_history = msg.term_history.clone(); - self.state.persist(&state)?; + self.persist_control_file(state)?; } info!("start receiving WAL since {:?}", msg.start_streaming_at); @@ -764,14 +776,6 @@ where self.inmem.commit_lsn = commit_lsn; self.metrics.commit_lsn.set(self.inmem.commit_lsn.0 as f64); - // We got our first commit_lsn, which means we should sync - // everything to disk, to initialize the state. - if self.state.commit_lsn == Lsn::INVALID && commit_lsn != Lsn::INVALID { - self.inmem.backup_lsn = self.inmem.commit_lsn; // initialize backup_lsn - self.wal_store.flush_wal()?; - self.persist_control_file()?; - } - // If new commit_lsn reached epoch switch, force sync of control // file: walproposer in sync mode is very interested when this // happens. Note: this is for sync-safekeepers mode only, as @@ -780,15 +784,14 @@ where // that we receive new epoch_start_lsn, and we still need to sync // control file in this case. if commit_lsn == self.epoch_start_lsn && self.state.commit_lsn != commit_lsn { - self.persist_control_file()?; + self.persist_control_file(self.state.clone())?; } Ok(()) } - /// Persist in-memory state to the disk. - fn persist_control_file(&mut self) -> Result<()> { - let mut state = self.state.clone(); + /// Persist in-memory state to the disk, taking other data from state. + fn persist_control_file(&mut self, mut state: SafeKeeperState) -> Result<()> { state.commit_lsn = self.inmem.commit_lsn; state.backup_lsn = self.inmem.backup_lsn; state.peer_horizon_lsn = self.inmem.peer_horizon_lsn; @@ -823,13 +826,6 @@ where // do the job if !msg.wal_data.is_empty() { self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data)?; - - // If this was the first record we ever received, initialize - // commit_lsn to help find_end_of_wal skip the hole in the - // beginning. - if self.global_commit_lsn == Lsn(0) { - self.global_commit_lsn = msg.h.begin_lsn; - } } // flush wal to the disk, if required @@ -852,7 +848,7 @@ where if self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64) < self.inmem.peer_horizon_lsn { - self.persist_control_file()?; + self.persist_control_file(self.state.clone())?; } trace!( @@ -920,7 +916,7 @@ where self.inmem.peer_horizon_lsn = new_peer_horizon_lsn; } if sync_control_file { - self.persist_control_file()?; + self.persist_control_file(self.state.clone())?; } Ok(()) } diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index ef8ebe14e1..83dc312d28 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -71,7 +71,7 @@ async fn wal_backup_launcher_main_loop( mut wal_backup_launcher_rx: Receiver, ) { info!( - "wal backup launcher started, remote config {:?}", + "WAL backup launcher: started, remote config {:?}", conf.remote_storage ); @@ -95,7 +95,7 @@ async fn wal_backup_launcher_main_loop( if is_wal_backup_required != tasks.contains_key(&zttid) { if is_wal_backup_required { // need to start the task - info!("starting wal backup task for {}", zttid); + info!("starting WAL backup task for {}", zttid); // TODO: decide who should offload in launcher itself by simply checking current state let election_name = broker::get_campaign_name( @@ -115,7 +115,7 @@ async fn wal_backup_launcher_main_loop( let handle = tokio::spawn( backup_task_main(zttid, timeline_dir, shutdown_rx, election) - .instrument(info_span!("WAL backup", zttid = %zttid)), + .instrument(info_span!("WAL backup task", zttid = %zttid)), ); tasks.insert( @@ -127,7 +127,7 @@ async fn wal_backup_launcher_main_loop( ); } else { // need to stop the task - info!("stopping wal backup task for {}", zttid); + info!("stopping WAL backup task for {}", zttid); let wb_handle = tasks.remove(&zttid).unwrap(); // Tell the task to shutdown. Error means task exited earlier, that's ok. @@ -236,20 +236,19 @@ impl WalBackupTask { } let commit_lsn = *self.commit_lsn_watch_rx.borrow(); - assert!( - commit_lsn >= backup_lsn, - "backup lsn should never pass commit lsn" - ); + // Note that backup_lsn can be higher than commit_lsn if we + // don't have much local WAL and others already uploaded + // segments we don't even have. if backup_lsn.segment_number(self.wal_seg_size) - == commit_lsn.segment_number(self.wal_seg_size) + >= commit_lsn.segment_number(self.wal_seg_size) { continue; /* nothing to do, common case as we wake up on every commit_lsn bump */ } // Perhaps peers advanced the position, check shmem value. backup_lsn = self.timeline.get_wal_backup_lsn(); if backup_lsn.segment_number(self.wal_seg_size) - == commit_lsn.segment_number(self.wal_seg_size) + >= commit_lsn.segment_number(self.wal_seg_size) { continue; }