mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 12:10:37 +00:00
Implement state guard for updating control file
This commit is contained in:
@@ -6,6 +6,7 @@ use lazy_static::lazy_static;
|
||||
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::io::{Read, Write};
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use tracing::*;
|
||||
@@ -37,30 +38,110 @@ lazy_static! {
|
||||
.expect("Failed to register safekeeper_persist_control_file_seconds histogram vec");
|
||||
}
|
||||
|
||||
pub trait Storage {
|
||||
pub trait StatePersister {
|
||||
/// Persist safekeeper state on disk.
|
||||
fn persist(&mut self, s: &SafeKeeperState) -> Result<()>;
|
||||
}
|
||||
|
||||
/// Storage should keep actual state inside of it. It should implement Deref
|
||||
/// trait to access state fields and have persist method for updating that state.
|
||||
pub trait Storage: Deref<Target = SafeKeeperState> + StatePersister {
|
||||
/// Returns a guard which implements DeferMut trait and have persist method.
|
||||
fn update_guard(&mut self) -> StateGuard<Self>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
StateGuard::new(self.clone(), self)
|
||||
}
|
||||
}
|
||||
|
||||
/// A guard that allows safekeeper state to be updated atomically.
|
||||
pub struct StateGuard<'a, P: StatePersister> {
|
||||
persister: &'a mut P,
|
||||
state: SafeKeeperState,
|
||||
}
|
||||
|
||||
impl<'a, P> StateGuard<'a, P>
|
||||
where
|
||||
P: StatePersister,
|
||||
{
|
||||
pub fn new(state: SafeKeeperState, persister: &'a mut P) -> Self {
|
||||
StateGuard { persister, state }
|
||||
}
|
||||
|
||||
pub fn persist(&mut self) -> Result<()> {
|
||||
self.persister.persist(&self.state)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, P> Deref for StateGuard<'a, P>
|
||||
where
|
||||
P: StatePersister,
|
||||
{
|
||||
type Target = SafeKeeperState;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.state
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, P> DerefMut for StateGuard<'a, P>
|
||||
where
|
||||
P: StatePersister,
|
||||
{
|
||||
fn deref_mut(&mut self) -> &mut SafeKeeperState {
|
||||
&mut self.state
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FileStorage {
|
||||
// save timeline dir to avoid reconstructing it every time
|
||||
timeline_dir: PathBuf,
|
||||
conf: SafeKeeperConf,
|
||||
persist_control_file_seconds: Histogram,
|
||||
|
||||
/// Last state persisted to disk.
|
||||
state: SafeKeeperState,
|
||||
}
|
||||
|
||||
impl FileStorage {
|
||||
pub fn new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> FileStorage {
|
||||
pub fn restore_new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> Result<FileStorage> {
|
||||
let timeline_dir = conf.timeline_dir(zttid);
|
||||
let tenant_id = zttid.tenant_id.to_string();
|
||||
let timeline_id = zttid.timeline_id.to_string();
|
||||
FileStorage {
|
||||
|
||||
let state = Self::load_control_file_conf(conf, zttid)
|
||||
.context("failed to load from control file")?;
|
||||
|
||||
Ok(FileStorage {
|
||||
timeline_dir,
|
||||
conf: conf.clone(),
|
||||
persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS
|
||||
.with_label_values(&[&tenant_id, &timeline_id]),
|
||||
}
|
||||
state,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn create_new(
|
||||
zttid: &ZTenantTimelineId,
|
||||
conf: &SafeKeeperConf,
|
||||
state: SafeKeeperState,
|
||||
) -> Result<FileStorage> {
|
||||
let timeline_dir = conf.timeline_dir(zttid);
|
||||
let tenant_id = zttid.tenant_id.to_string();
|
||||
let timeline_id = zttid.timeline_id.to_string();
|
||||
|
||||
let mut store = FileStorage {
|
||||
timeline_dir,
|
||||
conf: conf.clone(),
|
||||
persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS
|
||||
.with_label_values(&[&tenant_id, &timeline_id]),
|
||||
state: state.clone(),
|
||||
};
|
||||
|
||||
store.persist(&state)?;
|
||||
Ok(store)
|
||||
}
|
||||
|
||||
// Check the magic/version in the on-disk data and deserialize it, if possible.
|
||||
@@ -141,7 +222,17 @@ impl FileStorage {
|
||||
}
|
||||
}
|
||||
|
||||
impl Storage for FileStorage {
|
||||
impl Storage for FileStorage {}
|
||||
|
||||
impl Deref for FileStorage {
|
||||
type Target = SafeKeeperState;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.state
|
||||
}
|
||||
}
|
||||
|
||||
impl StatePersister for FileStorage {
|
||||
// persists state durably to underlying storage
|
||||
// for description see https://lwn.net/Articles/457667/
|
||||
fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
|
||||
@@ -201,6 +292,9 @@ impl Storage for FileStorage {
|
||||
.and_then(|f| f.sync_all())
|
||||
.context("failed to sync control file directory")?;
|
||||
}
|
||||
|
||||
// update internal state
|
||||
self.state = s.clone();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -228,7 +322,7 @@ mod test {
|
||||
) -> Result<(FileStorage, SafeKeeperState)> {
|
||||
fs::create_dir_all(&conf.timeline_dir(zttid)).expect("failed to create timeline dir");
|
||||
Ok((
|
||||
FileStorage::new(zttid, conf),
|
||||
FileStorage::restore_new(zttid, conf)?,
|
||||
FileStorage::load_control_file_conf(conf, zttid)?,
|
||||
))
|
||||
}
|
||||
@@ -239,8 +333,7 @@ mod test {
|
||||
) -> Result<(FileStorage, SafeKeeperState)> {
|
||||
fs::create_dir_all(&conf.timeline_dir(zttid)).expect("failed to create timeline dir");
|
||||
let state = SafeKeeperState::empty();
|
||||
let mut storage = FileStorage::new(zttid, conf);
|
||||
storage.persist(&state)?;
|
||||
let storage = FileStorage::create_new(zttid, conf, state.clone())?;
|
||||
Ok((storage, state))
|
||||
}
|
||||
|
||||
|
||||
@@ -210,6 +210,7 @@ pub struct SafekeeperMemState {
|
||||
pub s3_wal_lsn: Lsn, // TODO: keep only persistent version
|
||||
pub peer_horizon_lsn: Lsn,
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
pub proposer_uuid: PgUuid,
|
||||
}
|
||||
|
||||
impl SafeKeeperState {
|
||||
@@ -502,9 +503,8 @@ pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
|
||||
epoch_start_lsn: Lsn,
|
||||
|
||||
pub inmem: SafekeeperMemState, // in memory part
|
||||
pub s: SafeKeeperState, // persistent part
|
||||
pub state: CTRL, // persistent state storage
|
||||
|
||||
pub control_store: CTRL,
|
||||
pub wal_store: WAL,
|
||||
}
|
||||
|
||||
@@ -514,12 +514,7 @@ where
|
||||
WAL: wal_storage::Storage,
|
||||
{
|
||||
// constructor
|
||||
pub fn new(
|
||||
ztli: ZTimelineId,
|
||||
control_store: CTRL,
|
||||
mut wal_store: WAL,
|
||||
state: SafeKeeperState,
|
||||
) -> Result<SafeKeeper<CTRL, WAL>> {
|
||||
pub fn new(ztli: ZTimelineId, state: CTRL, wal_store: WAL) -> Result<SafeKeeper<CTRL, WAL>> {
|
||||
if state.timeline_id != ZTimelineId::from([0u8; 16]) && ztli != state.timeline_id {
|
||||
bail!("Calling SafeKeeper::new with inconsistent ztli ({}) and SafeKeeperState.server.timeline_id ({})", ztli, state.timeline_id);
|
||||
}
|
||||
@@ -535,23 +530,25 @@ where
|
||||
s3_wal_lsn: state.s3_wal_lsn,
|
||||
peer_horizon_lsn: state.peer_horizon_lsn,
|
||||
remote_consistent_lsn: state.remote_consistent_lsn,
|
||||
proposer_uuid: state.proposer_uuid,
|
||||
},
|
||||
s: state,
|
||||
control_store,
|
||||
state,
|
||||
wal_store,
|
||||
})
|
||||
}
|
||||
|
||||
/// Get history of term switches for the available WAL
|
||||
fn get_term_history(&self) -> TermHistory {
|
||||
self.s
|
||||
self.state
|
||||
.acceptor_state
|
||||
.term_history
|
||||
.up_to(self.wal_store.flush_lsn())
|
||||
}
|
||||
|
||||
pub fn get_epoch(&self) -> Term {
|
||||
self.s.acceptor_state.get_epoch(self.wal_store.flush_lsn())
|
||||
self.state
|
||||
.acceptor_state
|
||||
.get_epoch(self.wal_store.flush_lsn())
|
||||
}
|
||||
|
||||
/// Process message from proposer and possibly form reply. Concurrent
|
||||
@@ -587,46 +584,47 @@ where
|
||||
);
|
||||
}
|
||||
/* Postgres upgrade is not treated as fatal error */
|
||||
if msg.pg_version != self.s.server.pg_version
|
||||
&& self.s.server.pg_version != UNKNOWN_SERVER_VERSION
|
||||
if msg.pg_version != self.state.server.pg_version
|
||||
&& self.state.server.pg_version != UNKNOWN_SERVER_VERSION
|
||||
{
|
||||
info!(
|
||||
"incompatible server version {}, expected {}",
|
||||
msg.pg_version, self.s.server.pg_version
|
||||
msg.pg_version, self.state.server.pg_version
|
||||
);
|
||||
}
|
||||
if msg.tenant_id != self.s.tenant_id {
|
||||
if msg.tenant_id != self.state.tenant_id {
|
||||
bail!(
|
||||
"invalid tenant ID, got {}, expected {}",
|
||||
msg.tenant_id,
|
||||
self.s.tenant_id
|
||||
self.state.tenant_id
|
||||
);
|
||||
}
|
||||
if msg.ztli != self.s.timeline_id {
|
||||
if msg.ztli != self.state.timeline_id {
|
||||
bail!(
|
||||
"invalid timeline ID, got {}, expected {}",
|
||||
msg.ztli,
|
||||
self.s.timeline_id
|
||||
self.state.timeline_id
|
||||
);
|
||||
}
|
||||
|
||||
// set basic info about server, if not yet
|
||||
// TODO: verify that is doesn't change after
|
||||
self.s.server.system_id = msg.system_id;
|
||||
self.s.server.wal_seg_size = msg.wal_seg_size;
|
||||
self.control_store
|
||||
.persist(&self.s)
|
||||
.context("failed to persist shared state")?;
|
||||
{
|
||||
let mut state = self.state.update_guard();
|
||||
state.server.system_id = msg.system_id;
|
||||
state.server.wal_seg_size = msg.wal_seg_size;
|
||||
state.persist()?;
|
||||
}
|
||||
|
||||
// pass wal_seg_size to read WAL and find flush_lsn
|
||||
self.wal_store.init_storage(&self.s)?;
|
||||
self.wal_store.init_storage(&self.state)?;
|
||||
|
||||
info!(
|
||||
"processed greeting from proposer {:?}, sending term {:?}",
|
||||
msg.proposer_id, self.s.acceptor_state.term
|
||||
msg.proposer_id, self.state.acceptor_state.term
|
||||
);
|
||||
Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
|
||||
term: self.s.acceptor_state.term,
|
||||
term: self.state.acceptor_state.term,
|
||||
})))
|
||||
}
|
||||
|
||||
@@ -637,17 +635,19 @@ where
|
||||
) -> Result<Option<AcceptorProposerMessage>> {
|
||||
// initialize with refusal
|
||||
let mut resp = VoteResponse {
|
||||
term: self.s.acceptor_state.term,
|
||||
term: self.state.acceptor_state.term,
|
||||
vote_given: false as u64,
|
||||
flush_lsn: self.wal_store.flush_lsn(),
|
||||
truncate_lsn: self.s.peer_horizon_lsn,
|
||||
truncate_lsn: self.state.peer_horizon_lsn,
|
||||
term_history: self.get_term_history(),
|
||||
};
|
||||
if self.s.acceptor_state.term < msg.term {
|
||||
self.s.acceptor_state.term = msg.term;
|
||||
if self.state.acceptor_state.term < msg.term {
|
||||
let mut state = self.state.update_guard();
|
||||
state.acceptor_state.term = msg.term;
|
||||
// persist vote before sending it out
|
||||
self.control_store.persist(&self.s)?;
|
||||
resp.term = self.s.acceptor_state.term;
|
||||
state.persist()?;
|
||||
|
||||
resp.term = self.state.acceptor_state.term;
|
||||
resp.vote_given = true as u64;
|
||||
}
|
||||
info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
|
||||
@@ -656,9 +656,10 @@ where
|
||||
|
||||
/// Bump our term if received a note from elected proposer with higher one
|
||||
fn bump_if_higher(&mut self, term: Term) -> Result<()> {
|
||||
if self.s.acceptor_state.term < term {
|
||||
self.s.acceptor_state.term = term;
|
||||
self.control_store.persist(&self.s)?;
|
||||
if self.state.acceptor_state.term < term {
|
||||
let mut state = self.state.update_guard();
|
||||
state.acceptor_state.term = term;
|
||||
state.persist()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -666,9 +667,9 @@ where
|
||||
/// Form AppendResponse from current state.
|
||||
fn append_response(&self) -> AppendResponse {
|
||||
let ar = AppendResponse {
|
||||
term: self.s.acceptor_state.term,
|
||||
term: self.state.acceptor_state.term,
|
||||
flush_lsn: self.wal_store.flush_lsn(),
|
||||
commit_lsn: self.s.commit_lsn,
|
||||
commit_lsn: self.state.commit_lsn,
|
||||
// will be filled by the upper code to avoid bothering safekeeper
|
||||
hs_feedback: HotStandbyFeedback::empty(),
|
||||
zenith_feedback: ZenithFeedback::empty(),
|
||||
@@ -681,7 +682,7 @@ where
|
||||
info!("received ProposerElected {:?}", msg);
|
||||
self.bump_if_higher(msg.term)?;
|
||||
// If our term is higher, ignore the message (next feedback will inform the compute)
|
||||
if self.s.acceptor_state.term > msg.term {
|
||||
if self.state.acceptor_state.term > msg.term {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
@@ -692,8 +693,11 @@ where
|
||||
self.wal_store.truncate_wal(msg.start_streaming_at)?;
|
||||
|
||||
// and now adopt term history from proposer
|
||||
self.s.acceptor_state.term_history = msg.term_history.clone();
|
||||
self.control_store.persist(&self.s)?;
|
||||
{
|
||||
let mut state = self.state.update_guard();
|
||||
state.acceptor_state.term_history = msg.term_history.clone();
|
||||
state.persist()?;
|
||||
}
|
||||
|
||||
info!("start receiving WAL since {:?}", msg.start_streaming_at);
|
||||
|
||||
@@ -715,13 +719,13 @@ where
|
||||
// Also note that commit_lsn can reach epoch_start_lsn earlier
|
||||
// that we receive new epoch_start_lsn, and we still need to sync
|
||||
// control file in this case.
|
||||
if commit_lsn == self.epoch_start_lsn && self.s.commit_lsn != commit_lsn {
|
||||
if commit_lsn == self.epoch_start_lsn && self.state.commit_lsn != commit_lsn {
|
||||
self.persist_control_file()?;
|
||||
}
|
||||
|
||||
// We got our first commit_lsn, which means we should sync
|
||||
// everything to disk, to initialize the state.
|
||||
if self.s.commit_lsn == Lsn(0) && commit_lsn > Lsn(0) {
|
||||
if self.state.commit_lsn == Lsn(0) && commit_lsn > Lsn(0) {
|
||||
self.wal_store.flush_wal()?;
|
||||
self.persist_control_file()?;
|
||||
}
|
||||
@@ -731,10 +735,12 @@ where
|
||||
|
||||
/// Persist in-memory state to the disk.
|
||||
fn persist_control_file(&mut self) -> Result<()> {
|
||||
self.s.commit_lsn = self.inmem.commit_lsn;
|
||||
self.s.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
|
||||
let mut state = self.state.update_guard();
|
||||
|
||||
self.control_store.persist(&self.s)
|
||||
state.commit_lsn = self.inmem.commit_lsn;
|
||||
state.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
|
||||
state.proposer_uuid = self.inmem.proposer_uuid;
|
||||
state.persist()
|
||||
}
|
||||
|
||||
/// Handle request to append WAL.
|
||||
@@ -744,13 +750,13 @@ where
|
||||
msg: &AppendRequest,
|
||||
require_flush: bool,
|
||||
) -> Result<Option<AcceptorProposerMessage>> {
|
||||
if self.s.acceptor_state.term < msg.h.term {
|
||||
if self.state.acceptor_state.term < msg.h.term {
|
||||
bail!("got AppendRequest before ProposerElected");
|
||||
}
|
||||
|
||||
// If our term is higher, immediately refuse the message.
|
||||
if self.s.acceptor_state.term > msg.h.term {
|
||||
let resp = AppendResponse::term_only(self.s.acceptor_state.term);
|
||||
if self.state.acceptor_state.term > msg.h.term {
|
||||
let resp = AppendResponse::term_only(self.state.acceptor_state.term);
|
||||
return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
|
||||
}
|
||||
|
||||
@@ -758,8 +764,7 @@ where
|
||||
// processing the message.
|
||||
|
||||
self.epoch_start_lsn = msg.h.epoch_start_lsn;
|
||||
// TODO: don't update state without persisting to disk
|
||||
self.s.proposer_uuid = msg.h.proposer_uuid;
|
||||
self.inmem.proposer_uuid = msg.h.proposer_uuid;
|
||||
|
||||
// do the job
|
||||
if !msg.wal_data.is_empty() {
|
||||
@@ -790,7 +795,7 @@ where
|
||||
// Update truncate and commit LSN in control file.
|
||||
// To avoid negative impact on performance of extra fsync, do it only
|
||||
// when truncate_lsn delta exceeds WAL segment size.
|
||||
if self.s.peer_horizon_lsn + (self.s.server.wal_seg_size as u64)
|
||||
if self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
|
||||
< self.inmem.peer_horizon_lsn
|
||||
{
|
||||
self.persist_control_file()?;
|
||||
@@ -829,21 +834,33 @@ where
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::ops::Deref;
|
||||
|
||||
use super::*;
|
||||
use crate::wal_storage::Storage;
|
||||
use crate::{control_file::StatePersister, wal_storage::Storage};
|
||||
|
||||
// fake storage for tests
|
||||
struct InMemoryState {
|
||||
persisted_state: SafeKeeperState,
|
||||
}
|
||||
|
||||
impl control_file::Storage for InMemoryState {
|
||||
impl control_file::Storage for InMemoryState {}
|
||||
|
||||
impl StatePersister for InMemoryState {
|
||||
fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
|
||||
self.persisted_state = s.clone();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for InMemoryState {
|
||||
type Target = SafeKeeperState;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.persisted_state
|
||||
}
|
||||
}
|
||||
|
||||
struct DummyWalStore {
|
||||
lsn: Lsn,
|
||||
}
|
||||
@@ -879,7 +896,7 @@ mod tests {
|
||||
};
|
||||
let wal_store = DummyWalStore { lsn: Lsn(0) };
|
||||
let ztli = ZTimelineId::from([0u8; 16]);
|
||||
let mut sk = SafeKeeper::new(ztli, storage, wal_store, SafeKeeperState::empty()).unwrap();
|
||||
let mut sk = SafeKeeper::new(ztli, storage, wal_store).unwrap();
|
||||
|
||||
// check voting for 1 is ok
|
||||
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
|
||||
@@ -890,11 +907,11 @@ mod tests {
|
||||
}
|
||||
|
||||
// reboot...
|
||||
let state = sk.control_store.persisted_state.clone();
|
||||
let state = sk.state.persisted_state.clone();
|
||||
let storage = InMemoryState {
|
||||
persisted_state: state.clone(),
|
||||
persisted_state: state,
|
||||
};
|
||||
sk = SafeKeeper::new(ztli, storage, sk.wal_store, state).unwrap();
|
||||
sk = SafeKeeper::new(ztli, storage, sk.wal_store).unwrap();
|
||||
|
||||
// and ensure voting second time for 1 is not ok
|
||||
vote_resp = sk.process_msg(&vote_request);
|
||||
@@ -911,7 +928,7 @@ mod tests {
|
||||
};
|
||||
let wal_store = DummyWalStore { lsn: Lsn(0) };
|
||||
let ztli = ZTimelineId::from([0u8; 16]);
|
||||
let mut sk = SafeKeeper::new(ztli, storage, wal_store, SafeKeeperState::empty()).unwrap();
|
||||
let mut sk = SafeKeeper::new(ztli, storage, wal_store).unwrap();
|
||||
|
||||
let mut ar_hdr = AppendRequestHeader {
|
||||
term: 1,
|
||||
|
||||
@@ -21,7 +21,6 @@ use crate::broker::SafekeeperInfo;
|
||||
use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey};
|
||||
|
||||
use crate::control_file;
|
||||
use crate::control_file::Storage as cf_storage;
|
||||
use crate::safekeeper::{
|
||||
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
|
||||
SafekeeperMemState,
|
||||
@@ -98,10 +97,9 @@ impl SharedState {
|
||||
peer_ids: Vec<ZNodeId>,
|
||||
) -> Result<Self> {
|
||||
let state = SafeKeeperState::new(zttid, peer_ids);
|
||||
let control_store = control_file::FileStorage::new(zttid, conf);
|
||||
let control_store = control_file::FileStorage::create_new(zttid, conf, state)?;
|
||||
let wal_store = wal_storage::PhysicalStorage::new(zttid, conf);
|
||||
let mut sk = SafeKeeper::new(zttid.timeline_id, control_store, wal_store, state)?;
|
||||
sk.control_store.persist(&sk.s)?;
|
||||
let sk = SafeKeeper::new(zttid.timeline_id, control_store, wal_store)?;
|
||||
|
||||
Ok(Self {
|
||||
notified_commit_lsn: Lsn(0),
|
||||
@@ -116,18 +114,14 @@ impl SharedState {
|
||||
/// Restore SharedState from control file.
|
||||
/// If file doesn't exist, bails out.
|
||||
fn restore(conf: &SafeKeeperConf, zttid: &ZTenantTimelineId) -> Result<Self> {
|
||||
let state = control_file::FileStorage::load_control_file_conf(conf, zttid)
|
||||
.context("failed to load from control file")?;
|
||||
|
||||
let control_store = control_file::FileStorage::new(zttid, conf);
|
||||
|
||||
let control_store = control_file::FileStorage::restore_new(zttid, conf)?;
|
||||
let wal_store = wal_storage::PhysicalStorage::new(zttid, conf);
|
||||
|
||||
info!("timeline {} restored", zttid.timeline_id);
|
||||
|
||||
Ok(Self {
|
||||
notified_commit_lsn: Lsn(0),
|
||||
sk: SafeKeeper::new(zttid.timeline_id, control_store, wal_store, state)?,
|
||||
sk: SafeKeeper::new(zttid.timeline_id, control_store, wal_store)?,
|
||||
replicas: Vec::new(),
|
||||
active: false,
|
||||
num_computes: 0,
|
||||
@@ -419,7 +413,7 @@ impl Timeline {
|
||||
|
||||
pub fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) {
|
||||
let shared_state = self.mutex.lock().unwrap();
|
||||
(shared_state.sk.inmem.clone(), shared_state.sk.s.clone())
|
||||
(shared_state.sk.inmem.clone(), shared_state.sk.state.clone())
|
||||
}
|
||||
|
||||
/// Prepare public safekeeper info for reporting.
|
||||
|
||||
Reference in New Issue
Block a user