mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 22:12:56 +00:00
Add term and http endpoint to broker messaged SkTimelineInfo.
We need them for safekeeper peer recovery https://github.com/neondatabase/neon/pull/4875
This commit is contained in:
@@ -31,6 +31,8 @@ fn lsn_invalid() -> Lsn {
|
||||
#[serde_as]
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct SkTimelineInfo {
|
||||
/// Term.
|
||||
pub term: Option<u64>,
|
||||
/// Term of the last entry.
|
||||
pub last_log_term: Option<u64>,
|
||||
/// LSN of the last record.
|
||||
@@ -58,4 +60,6 @@ pub struct SkTimelineInfo {
|
||||
/// A connection string to use for WAL receiving.
|
||||
#[serde(default)]
|
||||
pub safekeeper_connstr: Option<String>,
|
||||
#[serde(default)]
|
||||
pub http_connstr: Option<String>,
|
||||
}
|
||||
|
||||
@@ -921,6 +921,7 @@ mod tests {
|
||||
timeline: SafekeeperTimelineInfo {
|
||||
safekeeper_id: 0,
|
||||
tenant_timeline_id: None,
|
||||
term: 0,
|
||||
last_log_term: 0,
|
||||
flush_lsn: 0,
|
||||
commit_lsn,
|
||||
@@ -929,6 +930,7 @@ mod tests {
|
||||
peer_horizon_lsn: 0,
|
||||
local_start_lsn: 0,
|
||||
safekeeper_connstr: safekeeper_connstr.to_owned(),
|
||||
http_connstr: safekeeper_connstr.to_owned(),
|
||||
availability_zone: None,
|
||||
},
|
||||
latest_update,
|
||||
|
||||
@@ -286,12 +286,14 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
|
||||
tenant_id: ttid.tenant_id.as_ref().to_owned(),
|
||||
timeline_id: ttid.timeline_id.as_ref().to_owned(),
|
||||
}),
|
||||
term: sk_info.term.unwrap_or(0),
|
||||
last_log_term: sk_info.last_log_term.unwrap_or(0),
|
||||
flush_lsn: sk_info.flush_lsn.0,
|
||||
commit_lsn: sk_info.commit_lsn.0,
|
||||
remote_consistent_lsn: sk_info.remote_consistent_lsn.0,
|
||||
peer_horizon_lsn: sk_info.peer_horizon_lsn.0,
|
||||
safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
|
||||
http_connstr: sk_info.http_connstr.unwrap_or_else(|| "".to_owned()),
|
||||
backup_lsn: sk_info.backup_lsn.0,
|
||||
local_start_lsn: sk_info.local_start_lsn.0,
|
||||
availability_zone: None,
|
||||
|
||||
@@ -562,7 +562,7 @@ where
|
||||
}
|
||||
|
||||
/// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
|
||||
fn flush_lsn(&self) -> Lsn {
|
||||
pub fn flush_lsn(&self) -> Lsn {
|
||||
max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
|
||||
}
|
||||
|
||||
|
||||
@@ -248,8 +248,9 @@ impl SharedState {
|
||||
tenant_id: ttid.tenant_id.as_ref().to_owned(),
|
||||
timeline_id: ttid.timeline_id.as_ref().to_owned(),
|
||||
}),
|
||||
term: self.sk.state.acceptor_state.term,
|
||||
last_log_term: self.sk.get_epoch(),
|
||||
flush_lsn: self.sk.wal_store.flush_lsn().0,
|
||||
flush_lsn: self.sk.flush_lsn().0,
|
||||
// note: this value is not flushed to control file yet and can be lost
|
||||
commit_lsn: self.sk.inmem.commit_lsn.0,
|
||||
remote_consistent_lsn: remote_consistent_lsn.0,
|
||||
@@ -258,6 +259,7 @@ impl SharedState {
|
||||
.advertise_pg_addr
|
||||
.to_owned()
|
||||
.unwrap_or(conf.listen_pg_addr.clone()),
|
||||
http_connstr: conf.listen_http_addr.to_owned(),
|
||||
backup_lsn: self.sk.inmem.backup_lsn.0,
|
||||
local_start_lsn: self.sk.state.local_start_lsn.0,
|
||||
availability_zone: conf.availability_zone.clone(),
|
||||
|
||||
@@ -125,6 +125,7 @@ async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
|
||||
tenant_id: vec![0xFF; 16],
|
||||
timeline_id: tli_from_u64(counter % n_keys),
|
||||
}),
|
||||
term: 0,
|
||||
last_log_term: 0,
|
||||
flush_lsn: counter,
|
||||
commit_lsn: 2,
|
||||
@@ -132,6 +133,7 @@ async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
|
||||
remote_consistent_lsn: 4,
|
||||
peer_horizon_lsn: 5,
|
||||
safekeeper_connstr: "zenith-1-sk-1.local:7676".to_owned(),
|
||||
http_connstr: "zenith-1-sk-1.local:7677".to_owned(),
|
||||
local_start_lsn: 0,
|
||||
availability_zone: None,
|
||||
};
|
||||
|
||||
@@ -22,6 +22,8 @@ message SubscribeSafekeeperInfoRequest {
|
||||
message SafekeeperTimelineInfo {
|
||||
uint64 safekeeper_id = 1;
|
||||
TenantTimelineId tenant_timeline_id = 2;
|
||||
// Safekeeper term
|
||||
uint64 term = 12;
|
||||
// Term of the last entry.
|
||||
uint64 last_log_term = 3;
|
||||
// LSN of the last record.
|
||||
@@ -36,6 +38,8 @@ message SafekeeperTimelineInfo {
|
||||
uint64 local_start_lsn = 9;
|
||||
// A connection string to use for WAL receiving.
|
||||
string safekeeper_connstr = 10;
|
||||
// HTTP endpoint connection string
|
||||
string http_connstr = 13;
|
||||
// Availability zone of a safekeeper.
|
||||
optional string availability_zone = 11;
|
||||
}
|
||||
|
||||
@@ -519,6 +519,7 @@ mod tests {
|
||||
tenant_id: vec![0x00; 16],
|
||||
timeline_id,
|
||||
}),
|
||||
term: 0,
|
||||
last_log_term: 0,
|
||||
flush_lsn: 1,
|
||||
commit_lsn: 2,
|
||||
@@ -526,6 +527,7 @@ mod tests {
|
||||
remote_consistent_lsn: 4,
|
||||
peer_horizon_lsn: 5,
|
||||
safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(),
|
||||
http_connstr: "neon-1-sk-1.local:7677".to_owned(),
|
||||
local_start_lsn: 0,
|
||||
availability_zone: None,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user