mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 06:22:57 +00:00
Fix the Lsn difference reconnection
This commit is contained in:
committed by
Kirill Bulatov
parent
7933804284
commit
1d16ee92d4
@@ -37,7 +37,7 @@ pub mod defaults {
|
||||
pub const DEFAULT_PITR_INTERVAL: &str = "30 days";
|
||||
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds";
|
||||
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
|
||||
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 1_000_000;
|
||||
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10_000;
|
||||
}
|
||||
|
||||
/// Per-tenant configuration options
|
||||
|
||||
@@ -478,7 +478,11 @@ async fn timeline_wal_broker_loop_step(
|
||||
None => debug!("No connection candidate was selected for timeline"),
|
||||
}
|
||||
}
|
||||
None => warn!("Timeline has an active broker subscription, but got no updates. Other data length: {}", all_timeline_updates.len()),
|
||||
// XXX: If we subscribe for a certain timeline, we expect only its data to come.
|
||||
// But somebody could propagate a new etcd key, that has the same prefix as the subscribed one, then we'll get odd data.
|
||||
// This is an error, we don't want to have overlapping prefixes for timelines, but we can complain and thow those away instead of panicking,
|
||||
// since the next poll might bring the correct data.
|
||||
None => error!("Timeline has an active broker subscription, but got no updates. Other data length: {}", all_timeline_updates.len()),
|
||||
}
|
||||
},
|
||||
None => {
|
||||
@@ -625,18 +629,28 @@ impl WalConnectionManager {
|
||||
/// Checks current state against every fetched safekeeper state of a given timeline.
|
||||
/// Returns a new candidate, if the current state is somewhat lagging, or `None` otherwise.
|
||||
/// The current rules for approving new candidates:
|
||||
/// * pick the safekeeper with biggest `commit_lsn` that's after than pageserver's latest Lsn for the timeline
|
||||
/// * if the leader is a different SK and either
|
||||
/// * no WAL updates happened after certain time (either none since the connection time or none since the last event after the connection) — reconnect
|
||||
/// * same time amount had passed since the connection, WAL updates happened recently, but the new leader SK has timeline Lsn way ahead of the old one — reconnect
|
||||
/// * pick from the input data from etcd for currently connected safekeeper (if any)
|
||||
/// * out of the rest input entries, pick one with biggest `commit_lsn` that's after than pageserver's latest Lsn for the timeline
|
||||
/// * if there's no such entry, no new candidate found, abort
|
||||
/// * otherwise, check if etcd updates contain currently connected safekeeper
|
||||
/// * if not, that means no WAL updates happened after certain time (either none since the connection time or none since the last event after the connection)
|
||||
/// Reconnect if the time exceeds the threshold.
|
||||
/// * if there's one, compare its Lsn with the other candidate's, reconnect if candidate's over threshold
|
||||
///
|
||||
/// This way we ensure to keep up with the most up-to-date safekeeper and don't try to jump from one safekeeper to another too frequently.
|
||||
/// Both thresholds are configured per tenant.
|
||||
fn select_connection_candidate(
|
||||
&self,
|
||||
safekeeper_timelines: HashMap<NodeId, SkTimelineInfo>,
|
||||
mut safekeeper_timelines: HashMap<NodeId, SkTimelineInfo>,
|
||||
) -> Option<NewWalConnectionCandidate> {
|
||||
let (&new_sk_id, new_sk_timeline, new_wal_producer_connstr) = safekeeper_timelines
|
||||
let current_sk_data_updated =
|
||||
self.wal_connection_data
|
||||
.as_ref()
|
||||
.and_then(|connection_data| {
|
||||
safekeeper_timelines.remove(&connection_data.safekeeper_id)
|
||||
});
|
||||
|
||||
let candidate_sk_data = safekeeper_timelines
|
||||
.iter()
|
||||
.filter(|(_, info)| {
|
||||
info.commit_lsn > Some(self.timeline.tline.get_last_record_lsn())
|
||||
@@ -654,68 +668,78 @@ impl WalConnectionManager {
|
||||
}
|
||||
}
|
||||
})
|
||||
.max_by_key(|(_, info, _)| info.commit_lsn)?;
|
||||
.max_by_key(|(_, info, _)| info.commit_lsn);
|
||||
|
||||
match self.wal_connection_data.as_ref() {
|
||||
None => Some(NewWalConnectionCandidate {
|
||||
safekeeper_id: new_sk_id,
|
||||
wal_producer_connstr: new_wal_producer_connstr,
|
||||
reason: ReconnectReason::NoExistingConnection,
|
||||
}),
|
||||
Some(current_connection) => {
|
||||
if current_connection.safekeeper_id == new_sk_id {
|
||||
None
|
||||
} else {
|
||||
self.reason_to_reconnect(current_connection, new_sk_timeline)
|
||||
.map(|reason| NewWalConnectionCandidate {
|
||||
safekeeper_id: new_sk_id,
|
||||
wal_producer_connstr: new_wal_producer_connstr,
|
||||
reason,
|
||||
})
|
||||
match (current_sk_data_updated, candidate_sk_data) {
|
||||
// No better candidate than one we're already connected to:
|
||||
// whatever data update comes for the connected one, we don't have a better candidate
|
||||
(_, None) => None,
|
||||
|
||||
// No updates from the old SK in this batch, but some candidate is available:
|
||||
// check how long time ago did we receive updates from the current SK, switch connections in case it's over the threshold
|
||||
(None, Some((&new_sk_id, _, new_wal_producer_connstr))) => {
|
||||
match self.wal_connection_data.as_ref() {
|
||||
Some(current_connection) => {
|
||||
let last_sk_interaction_time =
|
||||
match current_connection.last_wal_receiver_data.as_ref() {
|
||||
Some((_, data_submission_time)) => *data_submission_time,
|
||||
None => current_connection.connection_init_time,
|
||||
};
|
||||
|
||||
let now = Utc::now().naive_utc();
|
||||
match (now - last_sk_interaction_time).to_std() {
|
||||
Ok(last_interaction) => {
|
||||
if last_interaction > self.lagging_wal_timeout {
|
||||
return Some(NewWalConnectionCandidate {
|
||||
safekeeper_id: new_sk_id,
|
||||
wal_producer_connstr: new_wal_producer_connstr,
|
||||
reason: ReconnectReason::NoWalTimeout {
|
||||
last_wal_interaction: last_sk_interaction_time,
|
||||
check_time: now,
|
||||
threshold: self.lagging_wal_timeout,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
Err(_e) => {
|
||||
warn!("Last interaction with safekeeper {} happened in the future, ignoring the candidate. Interaction time: {last_sk_interaction_time}, now: {now}", current_connection.safekeeper_id);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
None => Some(NewWalConnectionCandidate {
|
||||
safekeeper_id: new_sk_id,
|
||||
wal_producer_connstr: new_wal_producer_connstr,
|
||||
reason: ReconnectReason::NoExistingConnection,
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn reason_to_reconnect(
|
||||
&self,
|
||||
current_connection: &WalConnectionData,
|
||||
new_sk_timeline: &SkTimelineInfo,
|
||||
) -> Option<ReconnectReason> {
|
||||
let last_sk_interaction_time = match current_connection.last_wal_receiver_data.as_ref() {
|
||||
Some((last_wal_receiver_data, data_submission_time)) => {
|
||||
let new_lsn = new_sk_timeline.commit_lsn?;
|
||||
match new_lsn.0.checked_sub(last_wal_receiver_data.ps_writelsn)
|
||||
// Both current SK got updated via etcd and there's another candidate with suitable Lsn:
|
||||
// check how bigger the new SK Lsn is in the future compared to the current SK, switch connections in case it's over the threshold
|
||||
(
|
||||
Some(current_sk_timeline),
|
||||
Some((&new_sk_id, new_sk_timeline, new_wal_producer_connstr)),
|
||||
) => {
|
||||
let new_lsn = new_sk_timeline.commit_lsn.unwrap_or(Lsn(0));
|
||||
let current_lsn = current_sk_timeline.commit_lsn.unwrap_or(Lsn(0));
|
||||
match new_lsn.0.checked_sub(current_lsn.0)
|
||||
{
|
||||
Some(sk_lsn_advantage) => {
|
||||
if sk_lsn_advantage >= self.max_lsn_wal_lag.get() {
|
||||
return Some(ReconnectReason::LaggingWal { current_lsn: Lsn(last_wal_receiver_data.ps_writelsn), new_lsn, threshold: self.max_lsn_wal_lag });
|
||||
Some(new_sk_lsn_advantage) => {
|
||||
if new_sk_lsn_advantage >= self.max_lsn_wal_lag.get() {
|
||||
return Some(
|
||||
NewWalConnectionCandidate {
|
||||
safekeeper_id: new_sk_id,
|
||||
wal_producer_connstr: new_wal_producer_connstr,
|
||||
reason: ReconnectReason::LaggingWal { current_lsn, new_lsn, threshold: self.max_lsn_wal_lag },
|
||||
});
|
||||
}
|
||||
}
|
||||
None => debug!("Best SK candidate has its commit Lsn behind the current timeline's latest consistent Lsn"),
|
||||
}
|
||||
*data_submission_time
|
||||
}
|
||||
None => current_connection.connection_init_time,
|
||||
};
|
||||
|
||||
let now = Utc::now().naive_utc();
|
||||
match (now - last_sk_interaction_time).to_std() {
|
||||
Ok(last_interaction) => {
|
||||
if last_interaction > self.lagging_wal_timeout {
|
||||
return Some(ReconnectReason::NoWalTimeout {
|
||||
last_wal_interaction: last_sk_interaction_time,
|
||||
check_time: now,
|
||||
threshold: self.lagging_wal_timeout,
|
||||
});
|
||||
}
|
||||
}
|
||||
Err(_e) => {
|
||||
warn!("Last interaction with safekeeper {} happened in the future, ignoring the candidate. Interaction time: {last_sk_interaction_time}, now: {now}",
|
||||
current_connection.safekeeper_id);
|
||||
None
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1017,7 +1041,7 @@ mod tests {
|
||||
|
||||
let mut data_manager_with_connection = dummy_wal_connection_manager(&harness);
|
||||
let connected_sk_id = NodeId(0);
|
||||
let mut dummy_connection_data = dummy_connection_data(id, NodeId(0)).await;
|
||||
let mut dummy_connection_data = dummy_connection_data(id, connected_sk_id).await;
|
||||
let lagging_wal_timeout =
|
||||
chrono::Duration::from_std(data_manager_with_connection.lagging_wal_timeout)?;
|
||||
let time_over_threshold =
|
||||
@@ -1092,8 +1116,8 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn timeout_wal_over_threshcurrent_candidate() -> anyhow::Result<()> {
|
||||
let harness = RepoHarness::create("timeout_wal_over_threshcurrent_candidate")?;
|
||||
async fn timeout_wal_over_threshhold_current_candidate() -> anyhow::Result<()> {
|
||||
let harness = RepoHarness::create("timeout_wal_over_threshhold_current_candidate")?;
|
||||
let current_lsn = Lsn(100_000).align();
|
||||
|
||||
let id = ZTenantTimelineId {
|
||||
@@ -1111,36 +1135,20 @@ mod tests {
|
||||
dummy_connection_data.connection_init_time = time_over_threshold;
|
||||
data_manager_with_connection.wal_connection_data = Some(dummy_connection_data);
|
||||
|
||||
let new_lsn = Lsn(current_lsn.0 + data_manager_with_connection.max_lsn_wal_lag.get() + 1);
|
||||
let over_threshcurrent_candidate = data_manager_with_connection
|
||||
.select_connection_candidate(HashMap::from([
|
||||
(
|
||||
NodeId(0),
|
||||
SkTimelineInfo {
|
||||
last_log_term: None,
|
||||
flush_lsn: None,
|
||||
commit_lsn: Some(new_lsn),
|
||||
backup_lsn: None,
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
|
||||
pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()),
|
||||
},
|
||||
),
|
||||
(
|
||||
NodeId(1),
|
||||
SkTimelineInfo {
|
||||
last_log_term: None,
|
||||
flush_lsn: None,
|
||||
commit_lsn: Some(current_lsn),
|
||||
backup_lsn: None,
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some("not advanced by Lsn safekeeper".to_string()),
|
||||
pageserver_connstr: Some("not advanced by Lsn safekeeper".to_string()),
|
||||
},
|
||||
),
|
||||
]))
|
||||
.select_connection_candidate(HashMap::from([(
|
||||
NodeId(0),
|
||||
SkTimelineInfo {
|
||||
last_log_term: None,
|
||||
flush_lsn: None,
|
||||
commit_lsn: Some(current_lsn),
|
||||
backup_lsn: None,
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
|
||||
pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()),
|
||||
},
|
||||
)]))
|
||||
.expect(
|
||||
"Expected one candidate selected out of multiple valid data options, but got none",
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user