Remember timeline_start_lsn and local_start_lsn on safekeeper.

Make it remember when timeline starts in general and on this safekeeper in
particular (the point might be later on new safekeeper replacing failed one).

Bumps control file and walproposer protocol versions.

While protocol is bumped, also add safekeeper node id to
AcceptorProposerGreeting.

ref #1561
This commit is contained in:
Arseny Sher
2022-05-01 16:58:34 +04:00
parent 748c5a577b
commit b9fd8a36ad
7 changed files with 114 additions and 13 deletions

View File

@@ -103,6 +103,43 @@ pub struct SafeKeeperStateV3 {
pub wal_start_lsn: Lsn,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SafeKeeperStateV4 {
#[serde(with = "hex")]
pub tenant_id: ZTenantId,
/// Zenith timelineid
#[serde(with = "hex")]
pub timeline_id: ZTimelineId,
/// persistent acceptor state
pub acceptor_state: AcceptorState,
/// information about server
pub server: ServerInfo,
/// Unique id of the last *elected* proposer we dealed with. Not needed
/// for correctness, exists for monitoring purposes.
#[serde(with = "hex")]
pub proposer_uuid: PgUuid,
/// Part of WAL acknowledged by quorum and available locally. Always points
/// to record boundary.
pub commit_lsn: Lsn,
/// First LSN not yet offloaded to s3. Useful to persist to avoid finding
/// out offloading progress on boot.
pub s3_wal_lsn: Lsn,
/// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
/// of last record streamed to everyone). Persisting it helps skipping
/// recovery in walproposer, generally we compute it from peers. In
/// walproposer proto called 'truncate_lsn'.
pub peer_horizon_lsn: Lsn,
/// LSN of the oldest known checkpoint made by pageserver and successfully
/// pushed to s3. We don't remove WAL beyond it. Persisted only for
/// informational purposes, we receive it from pageserver (or broker).
pub remote_consistent_lsn: Lsn,
// Peers and their state as we remember it. Knowing peers themselves is
// fundamental; but state is saved here only for informational purposes and
// obviously can be stale. (Currently not saved at all, but let's provision
// place to have less file version upgrades).
pub peers: Peers,
}
pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState> {
// migrate to storing full term history
if version == 1 {
@@ -125,6 +162,8 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
wal_seg_size: oldstate.server.wal_seg_size,
},
proposer_uuid: oldstate.proposer_uuid,
timeline_start_lsn: Lsn(0),
local_start_lsn: Lsn(0),
commit_lsn: oldstate.commit_lsn,
s3_wal_lsn: Lsn(0),
peer_horizon_lsn: oldstate.truncate_lsn,
@@ -146,6 +185,8 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
acceptor_state: oldstate.acceptor_state,
server,
proposer_uuid: oldstate.proposer_uuid,
timeline_start_lsn: Lsn(0),
local_start_lsn: Lsn(0),
commit_lsn: oldstate.commit_lsn,
s3_wal_lsn: Lsn(0),
peer_horizon_lsn: oldstate.truncate_lsn,
@@ -167,6 +208,8 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
acceptor_state: oldstate.acceptor_state,
server,
proposer_uuid: oldstate.proposer_uuid,
timeline_start_lsn: Lsn(0),
local_start_lsn: Lsn(0),
commit_lsn: oldstate.commit_lsn,
s3_wal_lsn: Lsn(0),
peer_horizon_lsn: oldstate.truncate_lsn,

View File

@@ -69,6 +69,10 @@ struct TimelineStatus {
timeline_id: ZTimelineId,
acceptor_state: AcceptorStateStatus,
#[serde(serialize_with = "display_serialize")]
timeline_start_lsn: Lsn,
#[serde(serialize_with = "display_serialize")]
local_start_lsn: Lsn,
#[serde(serialize_with = "display_serialize")]
commit_lsn: Lsn,
#[serde(serialize_with = "display_serialize")]
s3_wal_lsn: Lsn,
@@ -102,6 +106,8 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
tenant_id: zttid.tenant_id,
timeline_id: zttid.timeline_id,
acceptor_state: acc_state,
timeline_start_lsn: state.timeline_start_lsn,
local_start_lsn: state.local_start_lsn,
commit_lsn: inmem.commit_lsn,
s3_wal_lsn: inmem.s3_wal_lsn,
peer_horizon_lsn: inmem.peer_horizon_lsn,

View File

@@ -95,7 +95,7 @@ pub fn handle_json_ctrl(
/// by sending ProposerGreeting with default server.wal_seg_size.
fn prepare_safekeeper(spg: &mut SafekeeperPostgresHandler) -> Result<()> {
let greeting_request = ProposerAcceptorMessage::Greeting(ProposerGreeting {
protocol_version: 1, // current protocol
protocol_version: 2, // current protocol
pg_version: 0, // unknown
proposer_id: [0u8; 16],
system_id: 0,
@@ -124,6 +124,7 @@ fn send_proposer_elected(spg: &mut SafekeeperPostgresHandler, term: Term, lsn: L
term,
start_streaming_at: lsn,
term_history: history,
timeline_start_lsn: Lsn(0),
});
spg.timeline.get().process_msg(&proposer_elected_request)?;

View File

@@ -30,8 +30,8 @@ use utils::{
};
pub const SK_MAGIC: u32 = 0xcafeceefu32;
pub const SK_FORMAT_VERSION: u32 = 4;
const SK_PROTOCOL_VERSION: u32 = 1;
pub const SK_FORMAT_VERSION: u32 = 5;
const SK_PROTOCOL_VERSION: u32 = 2;
const UNKNOWN_SERVER_VERSION: u32 = 0;
/// Consensus logical timestamp.
@@ -52,7 +52,7 @@ impl TermHistory {
}
// Parse TermHistory as n_entries followed by TermSwitchEntry pairs
pub fn from_bytes(mut bytes: Bytes) -> Result<TermHistory> {
pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
if bytes.remaining() < 4 {
bail!("TermHistory misses len");
}
@@ -183,6 +183,13 @@ pub struct SafeKeeperState {
/// for correctness, exists for monitoring purposes.
#[serde(with = "hex")]
pub proposer_uuid: PgUuid,
/// Since which LSN this timeline generally starts. Safekeeper might have
/// joined later.
pub timeline_start_lsn: Lsn,
/// Since which LSN safekeeper has (had) WAL for this timeline.
/// All WAL segments next to one containing local_start_lsn are
/// filled with data from the beginning.
pub local_start_lsn: Lsn,
/// Part of WAL acknowledged by quorum and available locally. Always points
/// to record boundary.
pub commit_lsn: Lsn,
@@ -231,6 +238,8 @@ impl SafeKeeperState {
wal_seg_size: 0,
},
proposer_uuid: [0; 16],
timeline_start_lsn: Lsn(0),
local_start_lsn: Lsn(0),
commit_lsn: Lsn(0),
s3_wal_lsn: Lsn(0),
peer_horizon_lsn: Lsn(0),
@@ -268,6 +277,7 @@ pub struct ProposerGreeting {
#[derive(Debug, Serialize)]
pub struct AcceptorGreeting {
term: u64,
node_id: ZNodeId,
}
/// Vote request sent from proposer to safekeepers
@@ -286,6 +296,7 @@ pub struct VoteResponse {
flush_lsn: Lsn,
truncate_lsn: Lsn,
term_history: TermHistory,
timeline_start_lsn: Lsn,
}
/*
@@ -297,6 +308,7 @@ pub struct ProposerElected {
pub term: Term,
pub start_streaming_at: Lsn,
pub term_history: TermHistory,
pub timeline_start_lsn: Lsn,
}
/// Request with WAL message sent from proposer to safekeeper. Along the way it
@@ -387,10 +399,15 @@ impl ProposerAcceptorMessage {
}
let term = msg_bytes.get_u64_le();
let start_streaming_at = msg_bytes.get_u64_le().into();
let term_history = TermHistory::from_bytes(msg_bytes)?;
let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
if msg_bytes.remaining() < 8 {
bail!("ProposerElected message is not complete");
}
let timeline_start_lsn = msg_bytes.get_u64_le().into();
let msg = ProposerElected {
term,
start_streaming_at,
timeline_start_lsn,
term_history,
};
Ok(ProposerAcceptorMessage::Elected(msg))
@@ -437,6 +454,7 @@ impl AcceptorProposerMessage {
AcceptorProposerMessage::Greeting(msg) => {
buf.put_u64_le('g' as u64);
buf.put_u64_le(msg.term);
buf.put_u64_le(msg.node_id.0);
}
AcceptorProposerMessage::VoteResponse(msg) => {
buf.put_u64_le('v' as u64);
@@ -449,6 +467,7 @@ impl AcceptorProposerMessage {
buf.put_u64_le(e.term);
buf.put_u64_le(e.lsn.into());
}
buf.put_u64_le(msg.timeline_start_lsn.into());
}
AcceptorProposerMessage::AppendResponse(msg) => {
buf.put_u64_le('a' as u64);
@@ -511,6 +530,8 @@ pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
pub state: CTRL, // persistent state storage
pub wal_store: WAL,
node_id: ZNodeId, // safekeeper's node id
}
impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
@@ -523,6 +544,7 @@ where
ztli: ZTimelineId,
state: CTRL,
mut wal_store: WAL,
node_id: ZNodeId,
) -> Result<SafeKeeper<CTRL, WAL>> {
if state.timeline_id != ZTimelineId::from([0u8; 16]) && ztli != state.timeline_id {
bail!("Calling SafeKeeper::new with inconsistent ztli ({}) and SafeKeeperState.server.timeline_id ({})", ztli, state.timeline_id);
@@ -544,6 +566,7 @@ where
},
state,
wal_store,
node_id,
})
}
@@ -635,6 +658,7 @@ where
);
Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
term: self.state.acceptor_state.term,
node_id: self.node_id,
})))
}
@@ -650,6 +674,7 @@ where
flush_lsn: self.wal_store.flush_lsn(),
truncate_lsn: self.state.peer_horizon_lsn,
term_history: self.get_term_history(),
timeline_start_lsn: self.state.timeline_start_lsn,
};
if self.state.acceptor_state.term < msg.term {
let mut state = self.state.clone();
@@ -705,6 +730,23 @@ where
// and now adopt term history from proposer
{
let mut state = self.state.clone();
// Remeber point where WAL begins globally, if not yet.
if state.timeline_start_lsn == Lsn(0) {
state.timeline_start_lsn = msg.timeline_start_lsn;
info!(
"setting timeline_start_lsn to {:?}",
state.timeline_start_lsn
);
}
// Remember point where WAL begins locally, if not yet. (I doubt the
// second condition is ever possible)
if state.local_start_lsn == Lsn(0) || state.local_start_lsn >= msg.start_streaming_at {
state.local_start_lsn = msg.start_streaming_at;
info!("setting local_start_lsn to {:?}", state.local_start_lsn);
}
state.acceptor_state.term_history = msg.term_history.clone();
self.state.persist(&state)?;
}
@@ -968,7 +1010,7 @@ mod tests {
};
let wal_store = DummyWalStore { lsn: Lsn(0) };
let ztli = ZTimelineId::from([0u8; 16]);
let mut sk = SafeKeeper::new(ztli, storage, wal_store).unwrap();
let mut sk = SafeKeeper::new(ztli, storage, wal_store, ZNodeId(0)).unwrap();
// check voting for 1 is ok
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
@@ -983,7 +1025,7 @@ mod tests {
let storage = InMemoryState {
persisted_state: state,
};
sk = SafeKeeper::new(ztli, storage, sk.wal_store).unwrap();
sk = SafeKeeper::new(ztli, storage, sk.wal_store, ZNodeId(0)).unwrap();
// and ensure voting second time for 1 is not ok
vote_resp = sk.process_msg(&vote_request);
@@ -1000,7 +1042,7 @@ mod tests {
};
let wal_store = DummyWalStore { lsn: Lsn(0) };
let ztli = ZTimelineId::from([0u8; 16]);
let mut sk = SafeKeeper::new(ztli, storage, wal_store).unwrap();
let mut sk = SafeKeeper::new(ztli, storage, wal_store, ZNodeId(0)).unwrap();
let mut ar_hdr = AppendRequestHeader {
term: 1,
@@ -1023,6 +1065,7 @@ mod tests {
term: 1,
lsn: Lsn(3),
}]),
timeline_start_lsn: Lsn(0),
};
sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
.unwrap();

View File

@@ -102,7 +102,7 @@ impl SharedState {
let state = SafeKeeperState::new(zttid, peer_ids);
let control_store = control_file::FileStorage::create_new(zttid, conf, state)?;
let wal_store = wal_storage::PhysicalStorage::new(zttid, conf);
let sk = SafeKeeper::new(zttid.timeline_id, control_store, wal_store)?;
let sk = SafeKeeper::new(zttid.timeline_id, control_store, wal_store, conf.my_id)?;
Ok(Self {
notified_commit_lsn: Lsn(0),
@@ -125,7 +125,7 @@ impl SharedState {
Ok(Self {
notified_commit_lsn: Lsn(0),
sk: SafeKeeper::new(zttid.timeline_id, control_store, wal_store)?,
sk: SafeKeeper::new(zttid.timeline_id, control_store, wal_store, conf.my_id)?,
replicas: Vec::new(),
active: false,
num_computes: 0,

View File

@@ -573,7 +573,9 @@ def test_timeline_status(zenith_env_builder: ZenithEnvBuilder):
timeline_id = pg.safe_psql("show zenith.zenith_timeline")[0][0]
# fetch something sensible from status
epoch = wa_http_cli.timeline_status(tenant_id, timeline_id).acceptor_epoch
tli_status = wa_http_cli.timeline_status(tenant_id, timeline_id)
epoch = tli_status.acceptor_epoch
timeline_start_lsn = tli_status.timeline_start_lsn
pg.safe_psql("create table t(i int)")
@@ -581,9 +583,13 @@ def test_timeline_status(zenith_env_builder: ZenithEnvBuilder):
pg.stop().start()
pg.safe_psql("insert into t values(10)")
epoch_after_reboot = wa_http_cli.timeline_status(tenant_id, timeline_id).acceptor_epoch
tli_status = wa_http_cli.timeline_status(tenant_id, timeline_id)
epoch_after_reboot = tli_status.acceptor_epoch
assert epoch_after_reboot > epoch
# and timeline_start_lsn stays the same
assert tli_status.timeline_start_lsn == timeline_start_lsn
class SafekeeperEnv:
def __init__(self,

View File

@@ -1762,6 +1762,7 @@ class SafekeeperTimelineStatus:
acceptor_epoch: int
flush_lsn: str
remote_consistent_lsn: str
timeline_start_lsn: str
@dataclass
@@ -1786,7 +1787,8 @@ class SafekeeperHttpClient(requests.Session):
resj = res.json()
return SafekeeperTimelineStatus(acceptor_epoch=resj['acceptor_state']['epoch'],
flush_lsn=resj['flush_lsn'],
remote_consistent_lsn=resj['remote_consistent_lsn'])
remote_consistent_lsn=resj['remote_consistent_lsn'],
timeline_start_lsn=resj['timeline_start_lsn'])
def record_safekeeper_info(self, tenant_id: str, timeline_id: str, body):
res = self.post(