diff --git a/walkeeper/src/control_file.rs b/walkeeper/src/control_file.rs index 8b4e618661..7cc53edeb0 100644 --- a/walkeeper/src/control_file.rs +++ b/walkeeper/src/control_file.rs @@ -6,6 +6,7 @@ use lazy_static::lazy_static; use std::fs::{self, File, OpenOptions}; use std::io::{Read, Write}; +use std::ops::Deref; use std::path::{Path, PathBuf}; use tracing::*; @@ -37,8 +38,10 @@ lazy_static! { .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec"); } -pub trait Storage { - /// Persist safekeeper state on disk. +/// Storage should keep actual state inside of it. It should implement Deref +/// trait to access state fields and have persist method for updating that state. +pub trait Storage: Deref { + /// Persist safekeeper state on disk and update internal state. fn persist(&mut self, s: &SafeKeeperState) -> Result<()>; } @@ -48,19 +51,47 @@ pub struct FileStorage { timeline_dir: PathBuf, conf: SafeKeeperConf, persist_control_file_seconds: Histogram, + + /// Last state persisted to disk. + state: SafeKeeperState, } impl FileStorage { - pub fn new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> FileStorage { + pub fn restore_new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> Result { let timeline_dir = conf.timeline_dir(zttid); let tenant_id = zttid.tenant_id.to_string(); let timeline_id = zttid.timeline_id.to_string(); - FileStorage { + + let state = Self::load_control_file_conf(conf, zttid)?; + + Ok(FileStorage { timeline_dir, conf: conf.clone(), persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS .with_label_values(&[&tenant_id, &timeline_id]), - } + state, + }) + } + + pub fn create_new( + zttid: &ZTenantTimelineId, + conf: &SafeKeeperConf, + state: SafeKeeperState, + ) -> Result { + let timeline_dir = conf.timeline_dir(zttid); + let tenant_id = zttid.tenant_id.to_string(); + let timeline_id = zttid.timeline_id.to_string(); + + let mut store = FileStorage { + timeline_dir, + conf: conf.clone(), + persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS + .with_label_values(&[&tenant_id, &timeline_id]), + state: state.clone(), + }; + + store.persist(&state)?; + Ok(store) } // Check the magic/version in the on-disk data and deserialize it, if possible. @@ -141,6 +172,14 @@ impl FileStorage { } } +impl Deref for FileStorage { + type Target = SafeKeeperState; + + fn deref(&self) -> &Self::Target { + &self.state + } +} + impl Storage for FileStorage { // persists state durably to underlying storage // for description see https://lwn.net/Articles/457667/ @@ -201,6 +240,9 @@ impl Storage for FileStorage { .and_then(|f| f.sync_all()) .context("failed to sync control file directory")?; } + + // update internal state + self.state = s.clone(); Ok(()) } } @@ -228,7 +270,7 @@ mod test { ) -> Result<(FileStorage, SafeKeeperState)> { fs::create_dir_all(&conf.timeline_dir(zttid)).expect("failed to create timeline dir"); Ok(( - FileStorage::new(zttid, conf), + FileStorage::restore_new(zttid, conf)?, FileStorage::load_control_file_conf(conf, zttid)?, )) } @@ -239,8 +281,7 @@ mod test { ) -> Result<(FileStorage, SafeKeeperState)> { fs::create_dir_all(&conf.timeline_dir(zttid)).expect("failed to create timeline dir"); let state = SafeKeeperState::empty(); - let mut storage = FileStorage::new(zttid, conf); - storage.persist(&state)?; + let storage = FileStorage::create_new(zttid, conf, state.clone())?; Ok((storage, state)) } diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index 1e23d87b34..22a8481e45 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -210,6 +210,7 @@ pub struct SafekeeperMemState { pub s3_wal_lsn: Lsn, // TODO: keep only persistent version pub peer_horizon_lsn: Lsn, pub remote_consistent_lsn: Lsn, + pub proposer_uuid: PgUuid, } impl SafeKeeperState { @@ -502,9 +503,8 @@ pub struct SafeKeeper { epoch_start_lsn: Lsn, pub inmem: SafekeeperMemState, // in memory part - pub s: SafeKeeperState, // persistent part + pub state: CTRL, // persistent state storage - pub control_store: CTRL, pub wal_store: WAL, } @@ -516,14 +516,14 @@ where // constructor pub fn new( ztli: ZTimelineId, - control_store: CTRL, + state: CTRL, mut wal_store: WAL, - state: SafeKeeperState, ) -> Result> { if state.timeline_id != ZTimelineId::from([0u8; 16]) && ztli != state.timeline_id { bail!("Calling SafeKeeper::new with inconsistent ztli ({}) and SafeKeeperState.server.timeline_id ({})", ztli, state.timeline_id); } + // initialize wal_store, if state is already initialized wal_store.init_storage(&state)?; Ok(SafeKeeper { @@ -535,23 +535,25 @@ where s3_wal_lsn: state.s3_wal_lsn, peer_horizon_lsn: state.peer_horizon_lsn, remote_consistent_lsn: state.remote_consistent_lsn, + proposer_uuid: state.proposer_uuid, }, - s: state, - control_store, + state, wal_store, }) } /// Get history of term switches for the available WAL fn get_term_history(&self) -> TermHistory { - self.s + self.state .acceptor_state .term_history .up_to(self.wal_store.flush_lsn()) } pub fn get_epoch(&self) -> Term { - self.s.acceptor_state.get_epoch(self.wal_store.flush_lsn()) + self.state + .acceptor_state + .get_epoch(self.wal_store.flush_lsn()) } /// Process message from proposer and possibly form reply. Concurrent @@ -587,46 +589,47 @@ where ); } /* Postgres upgrade is not treated as fatal error */ - if msg.pg_version != self.s.server.pg_version - && self.s.server.pg_version != UNKNOWN_SERVER_VERSION + if msg.pg_version != self.state.server.pg_version + && self.state.server.pg_version != UNKNOWN_SERVER_VERSION { info!( "incompatible server version {}, expected {}", - msg.pg_version, self.s.server.pg_version + msg.pg_version, self.state.server.pg_version ); } - if msg.tenant_id != self.s.tenant_id { + if msg.tenant_id != self.state.tenant_id { bail!( "invalid tenant ID, got {}, expected {}", msg.tenant_id, - self.s.tenant_id + self.state.tenant_id ); } - if msg.ztli != self.s.timeline_id { + if msg.ztli != self.state.timeline_id { bail!( "invalid timeline ID, got {}, expected {}", msg.ztli, - self.s.timeline_id + self.state.timeline_id ); } // set basic info about server, if not yet // TODO: verify that is doesn't change after - self.s.server.system_id = msg.system_id; - self.s.server.wal_seg_size = msg.wal_seg_size; - self.control_store - .persist(&self.s) - .context("failed to persist shared state")?; + { + let mut state = self.state.clone(); + state.server.system_id = msg.system_id; + state.server.wal_seg_size = msg.wal_seg_size; + self.state.persist(&state)?; + } // pass wal_seg_size to read WAL and find flush_lsn - self.wal_store.init_storage(&self.s)?; + self.wal_store.init_storage(&self.state)?; info!( "processed greeting from proposer {:?}, sending term {:?}", - msg.proposer_id, self.s.acceptor_state.term + msg.proposer_id, self.state.acceptor_state.term ); Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting { - term: self.s.acceptor_state.term, + term: self.state.acceptor_state.term, }))) } @@ -637,17 +640,19 @@ where ) -> Result> { // initialize with refusal let mut resp = VoteResponse { - term: self.s.acceptor_state.term, + term: self.state.acceptor_state.term, vote_given: false as u64, flush_lsn: self.wal_store.flush_lsn(), - truncate_lsn: self.s.peer_horizon_lsn, + truncate_lsn: self.state.peer_horizon_lsn, term_history: self.get_term_history(), }; - if self.s.acceptor_state.term < msg.term { - self.s.acceptor_state.term = msg.term; + if self.state.acceptor_state.term < msg.term { + let mut state = self.state.clone(); + state.acceptor_state.term = msg.term; // persist vote before sending it out - self.control_store.persist(&self.s)?; - resp.term = self.s.acceptor_state.term; + self.state.persist(&state)?; + + resp.term = self.state.acceptor_state.term; resp.vote_given = true as u64; } info!("processed VoteRequest for term {}: {:?}", msg.term, &resp); @@ -656,9 +661,10 @@ where /// Bump our term if received a note from elected proposer with higher one fn bump_if_higher(&mut self, term: Term) -> Result<()> { - if self.s.acceptor_state.term < term { - self.s.acceptor_state.term = term; - self.control_store.persist(&self.s)?; + if self.state.acceptor_state.term < term { + let mut state = self.state.clone(); + state.acceptor_state.term = term; + self.state.persist(&state)?; } Ok(()) } @@ -666,9 +672,9 @@ where /// Form AppendResponse from current state. fn append_response(&self) -> AppendResponse { let ar = AppendResponse { - term: self.s.acceptor_state.term, + term: self.state.acceptor_state.term, flush_lsn: self.wal_store.flush_lsn(), - commit_lsn: self.s.commit_lsn, + commit_lsn: self.state.commit_lsn, // will be filled by the upper code to avoid bothering safekeeper hs_feedback: HotStandbyFeedback::empty(), zenith_feedback: ZenithFeedback::empty(), @@ -681,7 +687,7 @@ where info!("received ProposerElected {:?}", msg); self.bump_if_higher(msg.term)?; // If our term is higher, ignore the message (next feedback will inform the compute) - if self.s.acceptor_state.term > msg.term { + if self.state.acceptor_state.term > msg.term { return Ok(None); } @@ -692,8 +698,11 @@ where self.wal_store.truncate_wal(msg.start_streaming_at)?; // and now adopt term history from proposer - self.s.acceptor_state.term_history = msg.term_history.clone(); - self.control_store.persist(&self.s)?; + { + let mut state = self.state.clone(); + state.acceptor_state.term_history = msg.term_history.clone(); + self.state.persist(&state)?; + } info!("start receiving WAL since {:?}", msg.start_streaming_at); @@ -715,13 +724,13 @@ where // Also note that commit_lsn can reach epoch_start_lsn earlier // that we receive new epoch_start_lsn, and we still need to sync // control file in this case. - if commit_lsn == self.epoch_start_lsn && self.s.commit_lsn != commit_lsn { + if commit_lsn == self.epoch_start_lsn && self.state.commit_lsn != commit_lsn { self.persist_control_file()?; } // We got our first commit_lsn, which means we should sync // everything to disk, to initialize the state. - if self.s.commit_lsn == Lsn(0) && commit_lsn > Lsn(0) { + if self.state.commit_lsn == Lsn(0) && commit_lsn > Lsn(0) { self.wal_store.flush_wal()?; self.persist_control_file()?; } @@ -731,10 +740,12 @@ where /// Persist in-memory state to the disk. fn persist_control_file(&mut self) -> Result<()> { - self.s.commit_lsn = self.inmem.commit_lsn; - self.s.peer_horizon_lsn = self.inmem.peer_horizon_lsn; + let mut state = self.state.clone(); - self.control_store.persist(&self.s) + state.commit_lsn = self.inmem.commit_lsn; + state.peer_horizon_lsn = self.inmem.peer_horizon_lsn; + state.proposer_uuid = self.inmem.proposer_uuid; + self.state.persist(&state) } /// Handle request to append WAL. @@ -744,13 +755,13 @@ where msg: &AppendRequest, require_flush: bool, ) -> Result> { - if self.s.acceptor_state.term < msg.h.term { + if self.state.acceptor_state.term < msg.h.term { bail!("got AppendRequest before ProposerElected"); } // If our term is higher, immediately refuse the message. - if self.s.acceptor_state.term > msg.h.term { - let resp = AppendResponse::term_only(self.s.acceptor_state.term); + if self.state.acceptor_state.term > msg.h.term { + let resp = AppendResponse::term_only(self.state.acceptor_state.term); return Ok(Some(AcceptorProposerMessage::AppendResponse(resp))); } @@ -758,8 +769,7 @@ where // processing the message. self.epoch_start_lsn = msg.h.epoch_start_lsn; - // TODO: don't update state without persisting to disk - self.s.proposer_uuid = msg.h.proposer_uuid; + self.inmem.proposer_uuid = msg.h.proposer_uuid; // do the job if !msg.wal_data.is_empty() { @@ -790,7 +800,7 @@ where // Update truncate and commit LSN in control file. // To avoid negative impact on performance of extra fsync, do it only // when truncate_lsn delta exceeds WAL segment size. - if self.s.peer_horizon_lsn + (self.s.server.wal_seg_size as u64) + if self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64) < self.inmem.peer_horizon_lsn { self.persist_control_file()?; @@ -829,6 +839,8 @@ where #[cfg(test)] mod tests { + use std::ops::Deref; + use super::*; use crate::wal_storage::Storage; @@ -844,6 +856,14 @@ mod tests { } } + impl Deref for InMemoryState { + type Target = SafeKeeperState; + + fn deref(&self) -> &Self::Target { + &self.persisted_state + } + } + struct DummyWalStore { lsn: Lsn, } @@ -879,7 +899,7 @@ mod tests { }; let wal_store = DummyWalStore { lsn: Lsn(0) }; let ztli = ZTimelineId::from([0u8; 16]); - let mut sk = SafeKeeper::new(ztli, storage, wal_store, SafeKeeperState::empty()).unwrap(); + let mut sk = SafeKeeper::new(ztli, storage, wal_store).unwrap(); // check voting for 1 is ok let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 }); @@ -890,11 +910,11 @@ mod tests { } // reboot... - let state = sk.control_store.persisted_state.clone(); + let state = sk.state.persisted_state.clone(); let storage = InMemoryState { - persisted_state: state.clone(), + persisted_state: state, }; - sk = SafeKeeper::new(ztli, storage, sk.wal_store, state).unwrap(); + sk = SafeKeeper::new(ztli, storage, sk.wal_store).unwrap(); // and ensure voting second time for 1 is not ok vote_resp = sk.process_msg(&vote_request); @@ -911,7 +931,7 @@ mod tests { }; let wal_store = DummyWalStore { lsn: Lsn(0) }; let ztli = ZTimelineId::from([0u8; 16]); - let mut sk = SafeKeeper::new(ztli, storage, wal_store, SafeKeeperState::empty()).unwrap(); + let mut sk = SafeKeeper::new(ztli, storage, wal_store).unwrap(); let mut ar_hdr = AppendRequestHeader { term: 1, diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index a76ef77615..a2941a9a5c 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -21,7 +21,6 @@ use crate::broker::SafekeeperInfo; use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey}; use crate::control_file; -use crate::control_file::Storage as cf_storage; use crate::safekeeper::{ AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, SafekeeperMemState, @@ -98,10 +97,9 @@ impl SharedState { peer_ids: Vec, ) -> Result { let state = SafeKeeperState::new(zttid, peer_ids); - let control_store = control_file::FileStorage::new(zttid, conf); + let control_store = control_file::FileStorage::create_new(zttid, conf, state)?; let wal_store = wal_storage::PhysicalStorage::new(zttid, conf); - let mut sk = SafeKeeper::new(zttid.timeline_id, control_store, wal_store, state)?; - sk.control_store.persist(&sk.s)?; + let sk = SafeKeeper::new(zttid.timeline_id, control_store, wal_store)?; Ok(Self { notified_commit_lsn: Lsn(0), @@ -116,18 +114,14 @@ impl SharedState { /// Restore SharedState from control file. /// If file doesn't exist, bails out. fn restore(conf: &SafeKeeperConf, zttid: &ZTenantTimelineId) -> Result { - let state = control_file::FileStorage::load_control_file_conf(conf, zttid) - .context("failed to load from control file")?; - - let control_store = control_file::FileStorage::new(zttid, conf); - + let control_store = control_file::FileStorage::restore_new(zttid, conf)?; let wal_store = wal_storage::PhysicalStorage::new(zttid, conf); info!("timeline {} restored", zttid.timeline_id); Ok(Self { notified_commit_lsn: Lsn(0), - sk: SafeKeeper::new(zttid.timeline_id, control_store, wal_store, state)?, + sk: SafeKeeper::new(zttid.timeline_id, control_store, wal_store)?, replicas: Vec::new(), active: false, num_computes: 0, @@ -419,7 +413,7 @@ impl Timeline { pub fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) { let shared_state = self.mutex.lock().unwrap(); - (shared_state.sk.inmem.clone(), shared_state.sk.s.clone()) + (shared_state.sk.inmem.clone(), shared_state.sk.state.clone()) } /// Prepare public safekeeper info for reporting.