mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-24 22:59:59 +00:00
Remember timeline_start_lsn and local_start_lsn on safekeeper.
Make it remember when timeline starts in general and on this safekeeper in particular (the point might be later on new safekeeper replacing failed one). Bumps control file and walproposer protocol versions. While protocol is bumped, also add safekeeper node id to AcceptorProposerGreeting. ref #1561
This commit is contained in:
@@ -103,6 +103,43 @@ pub struct SafeKeeperStateV3 {
|
||||
pub wal_start_lsn: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct SafeKeeperStateV4 {
|
||||
#[serde(with = "hex")]
|
||||
pub tenant_id: ZTenantId,
|
||||
/// Zenith timelineid
|
||||
#[serde(with = "hex")]
|
||||
pub timeline_id: ZTimelineId,
|
||||
/// persistent acceptor state
|
||||
pub acceptor_state: AcceptorState,
|
||||
/// information about server
|
||||
pub server: ServerInfo,
|
||||
/// Unique id of the last *elected* proposer we dealed with. Not needed
|
||||
/// for correctness, exists for monitoring purposes.
|
||||
#[serde(with = "hex")]
|
||||
pub proposer_uuid: PgUuid,
|
||||
/// Part of WAL acknowledged by quorum and available locally. Always points
|
||||
/// to record boundary.
|
||||
pub commit_lsn: Lsn,
|
||||
/// First LSN not yet offloaded to s3. Useful to persist to avoid finding
|
||||
/// out offloading progress on boot.
|
||||
pub s3_wal_lsn: Lsn,
|
||||
/// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
|
||||
/// of last record streamed to everyone). Persisting it helps skipping
|
||||
/// recovery in walproposer, generally we compute it from peers. In
|
||||
/// walproposer proto called 'truncate_lsn'.
|
||||
pub peer_horizon_lsn: Lsn,
|
||||
/// LSN of the oldest known checkpoint made by pageserver and successfully
|
||||
/// pushed to s3. We don't remove WAL beyond it. Persisted only for
|
||||
/// informational purposes, we receive it from pageserver (or broker).
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
// Peers and their state as we remember it. Knowing peers themselves is
|
||||
// fundamental; but state is saved here only for informational purposes and
|
||||
// obviously can be stale. (Currently not saved at all, but let's provision
|
||||
// place to have less file version upgrades).
|
||||
pub peers: Peers,
|
||||
}
|
||||
|
||||
pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState> {
|
||||
// migrate to storing full term history
|
||||
if version == 1 {
|
||||
@@ -125,6 +162,8 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
|
||||
wal_seg_size: oldstate.server.wal_seg_size,
|
||||
},
|
||||
proposer_uuid: oldstate.proposer_uuid,
|
||||
timeline_start_lsn: Lsn(0),
|
||||
local_start_lsn: Lsn(0),
|
||||
commit_lsn: oldstate.commit_lsn,
|
||||
s3_wal_lsn: Lsn(0),
|
||||
peer_horizon_lsn: oldstate.truncate_lsn,
|
||||
@@ -146,6 +185,8 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
|
||||
acceptor_state: oldstate.acceptor_state,
|
||||
server,
|
||||
proposer_uuid: oldstate.proposer_uuid,
|
||||
timeline_start_lsn: Lsn(0),
|
||||
local_start_lsn: Lsn(0),
|
||||
commit_lsn: oldstate.commit_lsn,
|
||||
s3_wal_lsn: Lsn(0),
|
||||
peer_horizon_lsn: oldstate.truncate_lsn,
|
||||
@@ -167,6 +208,8 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
|
||||
acceptor_state: oldstate.acceptor_state,
|
||||
server,
|
||||
proposer_uuid: oldstate.proposer_uuid,
|
||||
timeline_start_lsn: Lsn(0),
|
||||
local_start_lsn: Lsn(0),
|
||||
commit_lsn: oldstate.commit_lsn,
|
||||
s3_wal_lsn: Lsn(0),
|
||||
peer_horizon_lsn: oldstate.truncate_lsn,
|
||||
|
||||
@@ -69,6 +69,10 @@ struct TimelineStatus {
|
||||
timeline_id: ZTimelineId,
|
||||
acceptor_state: AcceptorStateStatus,
|
||||
#[serde(serialize_with = "display_serialize")]
|
||||
timeline_start_lsn: Lsn,
|
||||
#[serde(serialize_with = "display_serialize")]
|
||||
local_start_lsn: Lsn,
|
||||
#[serde(serialize_with = "display_serialize")]
|
||||
commit_lsn: Lsn,
|
||||
#[serde(serialize_with = "display_serialize")]
|
||||
s3_wal_lsn: Lsn,
|
||||
@@ -102,6 +106,8 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
|
||||
tenant_id: zttid.tenant_id,
|
||||
timeline_id: zttid.timeline_id,
|
||||
acceptor_state: acc_state,
|
||||
timeline_start_lsn: state.timeline_start_lsn,
|
||||
local_start_lsn: state.local_start_lsn,
|
||||
commit_lsn: inmem.commit_lsn,
|
||||
s3_wal_lsn: inmem.s3_wal_lsn,
|
||||
peer_horizon_lsn: inmem.peer_horizon_lsn,
|
||||
|
||||
@@ -95,7 +95,7 @@ pub fn handle_json_ctrl(
|
||||
/// by sending ProposerGreeting with default server.wal_seg_size.
|
||||
fn prepare_safekeeper(spg: &mut SafekeeperPostgresHandler) -> Result<()> {
|
||||
let greeting_request = ProposerAcceptorMessage::Greeting(ProposerGreeting {
|
||||
protocol_version: 1, // current protocol
|
||||
protocol_version: 2, // current protocol
|
||||
pg_version: 0, // unknown
|
||||
proposer_id: [0u8; 16],
|
||||
system_id: 0,
|
||||
@@ -124,6 +124,7 @@ fn send_proposer_elected(spg: &mut SafekeeperPostgresHandler, term: Term, lsn: L
|
||||
term,
|
||||
start_streaming_at: lsn,
|
||||
term_history: history,
|
||||
timeline_start_lsn: Lsn(0),
|
||||
});
|
||||
|
||||
spg.timeline.get().process_msg(&proposer_elected_request)?;
|
||||
|
||||
@@ -30,8 +30,8 @@ use utils::{
|
||||
};
|
||||
|
||||
pub const SK_MAGIC: u32 = 0xcafeceefu32;
|
||||
pub const SK_FORMAT_VERSION: u32 = 4;
|
||||
const SK_PROTOCOL_VERSION: u32 = 1;
|
||||
pub const SK_FORMAT_VERSION: u32 = 5;
|
||||
const SK_PROTOCOL_VERSION: u32 = 2;
|
||||
const UNKNOWN_SERVER_VERSION: u32 = 0;
|
||||
|
||||
/// Consensus logical timestamp.
|
||||
@@ -52,7 +52,7 @@ impl TermHistory {
|
||||
}
|
||||
|
||||
// Parse TermHistory as n_entries followed by TermSwitchEntry pairs
|
||||
pub fn from_bytes(mut bytes: Bytes) -> Result<TermHistory> {
|
||||
pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
|
||||
if bytes.remaining() < 4 {
|
||||
bail!("TermHistory misses len");
|
||||
}
|
||||
@@ -183,6 +183,13 @@ pub struct SafeKeeperState {
|
||||
/// for correctness, exists for monitoring purposes.
|
||||
#[serde(with = "hex")]
|
||||
pub proposer_uuid: PgUuid,
|
||||
/// Since which LSN this timeline generally starts. Safekeeper might have
|
||||
/// joined later.
|
||||
pub timeline_start_lsn: Lsn,
|
||||
/// Since which LSN safekeeper has (had) WAL for this timeline.
|
||||
/// All WAL segments next to one containing local_start_lsn are
|
||||
/// filled with data from the beginning.
|
||||
pub local_start_lsn: Lsn,
|
||||
/// Part of WAL acknowledged by quorum and available locally. Always points
|
||||
/// to record boundary.
|
||||
pub commit_lsn: Lsn,
|
||||
@@ -231,6 +238,8 @@ impl SafeKeeperState {
|
||||
wal_seg_size: 0,
|
||||
},
|
||||
proposer_uuid: [0; 16],
|
||||
timeline_start_lsn: Lsn(0),
|
||||
local_start_lsn: Lsn(0),
|
||||
commit_lsn: Lsn(0),
|
||||
s3_wal_lsn: Lsn(0),
|
||||
peer_horizon_lsn: Lsn(0),
|
||||
@@ -268,6 +277,7 @@ pub struct ProposerGreeting {
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct AcceptorGreeting {
|
||||
term: u64,
|
||||
node_id: ZNodeId,
|
||||
}
|
||||
|
||||
/// Vote request sent from proposer to safekeepers
|
||||
@@ -286,6 +296,7 @@ pub struct VoteResponse {
|
||||
flush_lsn: Lsn,
|
||||
truncate_lsn: Lsn,
|
||||
term_history: TermHistory,
|
||||
timeline_start_lsn: Lsn,
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -297,6 +308,7 @@ pub struct ProposerElected {
|
||||
pub term: Term,
|
||||
pub start_streaming_at: Lsn,
|
||||
pub term_history: TermHistory,
|
||||
pub timeline_start_lsn: Lsn,
|
||||
}
|
||||
|
||||
/// Request with WAL message sent from proposer to safekeeper. Along the way it
|
||||
@@ -387,10 +399,15 @@ impl ProposerAcceptorMessage {
|
||||
}
|
||||
let term = msg_bytes.get_u64_le();
|
||||
let start_streaming_at = msg_bytes.get_u64_le().into();
|
||||
let term_history = TermHistory::from_bytes(msg_bytes)?;
|
||||
let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
|
||||
if msg_bytes.remaining() < 8 {
|
||||
bail!("ProposerElected message is not complete");
|
||||
}
|
||||
let timeline_start_lsn = msg_bytes.get_u64_le().into();
|
||||
let msg = ProposerElected {
|
||||
term,
|
||||
start_streaming_at,
|
||||
timeline_start_lsn,
|
||||
term_history,
|
||||
};
|
||||
Ok(ProposerAcceptorMessage::Elected(msg))
|
||||
@@ -437,6 +454,7 @@ impl AcceptorProposerMessage {
|
||||
AcceptorProposerMessage::Greeting(msg) => {
|
||||
buf.put_u64_le('g' as u64);
|
||||
buf.put_u64_le(msg.term);
|
||||
buf.put_u64_le(msg.node_id.0);
|
||||
}
|
||||
AcceptorProposerMessage::VoteResponse(msg) => {
|
||||
buf.put_u64_le('v' as u64);
|
||||
@@ -449,6 +467,7 @@ impl AcceptorProposerMessage {
|
||||
buf.put_u64_le(e.term);
|
||||
buf.put_u64_le(e.lsn.into());
|
||||
}
|
||||
buf.put_u64_le(msg.timeline_start_lsn.into());
|
||||
}
|
||||
AcceptorProposerMessage::AppendResponse(msg) => {
|
||||
buf.put_u64_le('a' as u64);
|
||||
@@ -511,6 +530,8 @@ pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
|
||||
pub state: CTRL, // persistent state storage
|
||||
|
||||
pub wal_store: WAL,
|
||||
|
||||
node_id: ZNodeId, // safekeeper's node id
|
||||
}
|
||||
|
||||
impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
|
||||
@@ -523,6 +544,7 @@ where
|
||||
ztli: ZTimelineId,
|
||||
state: CTRL,
|
||||
mut wal_store: WAL,
|
||||
node_id: ZNodeId,
|
||||
) -> Result<SafeKeeper<CTRL, WAL>> {
|
||||
if state.timeline_id != ZTimelineId::from([0u8; 16]) && ztli != state.timeline_id {
|
||||
bail!("Calling SafeKeeper::new with inconsistent ztli ({}) and SafeKeeperState.server.timeline_id ({})", ztli, state.timeline_id);
|
||||
@@ -544,6 +566,7 @@ where
|
||||
},
|
||||
state,
|
||||
wal_store,
|
||||
node_id,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -635,6 +658,7 @@ where
|
||||
);
|
||||
Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
|
||||
term: self.state.acceptor_state.term,
|
||||
node_id: self.node_id,
|
||||
})))
|
||||
}
|
||||
|
||||
@@ -650,6 +674,7 @@ where
|
||||
flush_lsn: self.wal_store.flush_lsn(),
|
||||
truncate_lsn: self.state.peer_horizon_lsn,
|
||||
term_history: self.get_term_history(),
|
||||
timeline_start_lsn: self.state.timeline_start_lsn,
|
||||
};
|
||||
if self.state.acceptor_state.term < msg.term {
|
||||
let mut state = self.state.clone();
|
||||
@@ -705,6 +730,23 @@ where
|
||||
// and now adopt term history from proposer
|
||||
{
|
||||
let mut state = self.state.clone();
|
||||
|
||||
// Remeber point where WAL begins globally, if not yet.
|
||||
if state.timeline_start_lsn == Lsn(0) {
|
||||
state.timeline_start_lsn = msg.timeline_start_lsn;
|
||||
info!(
|
||||
"setting timeline_start_lsn to {:?}",
|
||||
state.timeline_start_lsn
|
||||
);
|
||||
}
|
||||
|
||||
// Remember point where WAL begins locally, if not yet. (I doubt the
|
||||
// second condition is ever possible)
|
||||
if state.local_start_lsn == Lsn(0) || state.local_start_lsn >= msg.start_streaming_at {
|
||||
state.local_start_lsn = msg.start_streaming_at;
|
||||
info!("setting local_start_lsn to {:?}", state.local_start_lsn);
|
||||
}
|
||||
|
||||
state.acceptor_state.term_history = msg.term_history.clone();
|
||||
self.state.persist(&state)?;
|
||||
}
|
||||
@@ -968,7 +1010,7 @@ mod tests {
|
||||
};
|
||||
let wal_store = DummyWalStore { lsn: Lsn(0) };
|
||||
let ztli = ZTimelineId::from([0u8; 16]);
|
||||
let mut sk = SafeKeeper::new(ztli, storage, wal_store).unwrap();
|
||||
let mut sk = SafeKeeper::new(ztli, storage, wal_store, ZNodeId(0)).unwrap();
|
||||
|
||||
// check voting for 1 is ok
|
||||
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
|
||||
@@ -983,7 +1025,7 @@ mod tests {
|
||||
let storage = InMemoryState {
|
||||
persisted_state: state,
|
||||
};
|
||||
sk = SafeKeeper::new(ztli, storage, sk.wal_store).unwrap();
|
||||
sk = SafeKeeper::new(ztli, storage, sk.wal_store, ZNodeId(0)).unwrap();
|
||||
|
||||
// and ensure voting second time for 1 is not ok
|
||||
vote_resp = sk.process_msg(&vote_request);
|
||||
@@ -1000,7 +1042,7 @@ mod tests {
|
||||
};
|
||||
let wal_store = DummyWalStore { lsn: Lsn(0) };
|
||||
let ztli = ZTimelineId::from([0u8; 16]);
|
||||
let mut sk = SafeKeeper::new(ztli, storage, wal_store).unwrap();
|
||||
let mut sk = SafeKeeper::new(ztli, storage, wal_store, ZNodeId(0)).unwrap();
|
||||
|
||||
let mut ar_hdr = AppendRequestHeader {
|
||||
term: 1,
|
||||
@@ -1023,6 +1065,7 @@ mod tests {
|
||||
term: 1,
|
||||
lsn: Lsn(3),
|
||||
}]),
|
||||
timeline_start_lsn: Lsn(0),
|
||||
};
|
||||
sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
|
||||
.unwrap();
|
||||
|
||||
@@ -102,7 +102,7 @@ impl SharedState {
|
||||
let state = SafeKeeperState::new(zttid, peer_ids);
|
||||
let control_store = control_file::FileStorage::create_new(zttid, conf, state)?;
|
||||
let wal_store = wal_storage::PhysicalStorage::new(zttid, conf);
|
||||
let sk = SafeKeeper::new(zttid.timeline_id, control_store, wal_store)?;
|
||||
let sk = SafeKeeper::new(zttid.timeline_id, control_store, wal_store, conf.my_id)?;
|
||||
|
||||
Ok(Self {
|
||||
notified_commit_lsn: Lsn(0),
|
||||
@@ -125,7 +125,7 @@ impl SharedState {
|
||||
|
||||
Ok(Self {
|
||||
notified_commit_lsn: Lsn(0),
|
||||
sk: SafeKeeper::new(zttid.timeline_id, control_store, wal_store)?,
|
||||
sk: SafeKeeper::new(zttid.timeline_id, control_store, wal_store, conf.my_id)?,
|
||||
replicas: Vec::new(),
|
||||
active: false,
|
||||
num_computes: 0,
|
||||
|
||||
@@ -573,7 +573,9 @@ def test_timeline_status(zenith_env_builder: ZenithEnvBuilder):
|
||||
timeline_id = pg.safe_psql("show zenith.zenith_timeline")[0][0]
|
||||
|
||||
# fetch something sensible from status
|
||||
epoch = wa_http_cli.timeline_status(tenant_id, timeline_id).acceptor_epoch
|
||||
tli_status = wa_http_cli.timeline_status(tenant_id, timeline_id)
|
||||
epoch = tli_status.acceptor_epoch
|
||||
timeline_start_lsn = tli_status.timeline_start_lsn
|
||||
|
||||
pg.safe_psql("create table t(i int)")
|
||||
|
||||
@@ -581,9 +583,13 @@ def test_timeline_status(zenith_env_builder: ZenithEnvBuilder):
|
||||
pg.stop().start()
|
||||
pg.safe_psql("insert into t values(10)")
|
||||
|
||||
epoch_after_reboot = wa_http_cli.timeline_status(tenant_id, timeline_id).acceptor_epoch
|
||||
tli_status = wa_http_cli.timeline_status(tenant_id, timeline_id)
|
||||
epoch_after_reboot = tli_status.acceptor_epoch
|
||||
assert epoch_after_reboot > epoch
|
||||
|
||||
# and timeline_start_lsn stays the same
|
||||
assert tli_status.timeline_start_lsn == timeline_start_lsn
|
||||
|
||||
|
||||
class SafekeeperEnv:
|
||||
def __init__(self,
|
||||
|
||||
@@ -1762,6 +1762,7 @@ class SafekeeperTimelineStatus:
|
||||
acceptor_epoch: int
|
||||
flush_lsn: str
|
||||
remote_consistent_lsn: str
|
||||
timeline_start_lsn: str
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -1786,7 +1787,8 @@ class SafekeeperHttpClient(requests.Session):
|
||||
resj = res.json()
|
||||
return SafekeeperTimelineStatus(acceptor_epoch=resj['acceptor_state']['epoch'],
|
||||
flush_lsn=resj['flush_lsn'],
|
||||
remote_consistent_lsn=resj['remote_consistent_lsn'])
|
||||
remote_consistent_lsn=resj['remote_consistent_lsn'],
|
||||
timeline_start_lsn=resj['timeline_start_lsn'])
|
||||
|
||||
def record_safekeeper_info(self, tenant_id: str, timeline_id: str, body):
|
||||
res = self.post(
|
||||
|
||||
Reference in New Issue
Block a user