Improve walreceiver logic (#2253)

This patch makes walreceiver logic more complicated, but it should work better in most cases. Added `test_wal_lagging` to test scenarios where alive safekeepers can lag behind other alive safekeepers.

- There was a bug which looks like `etcd_info.timeline.commit_lsn > Some(self.local_timeline.get_last_record_lsn())` filtered all safekeepers in some strange cases. I removed this filter, it should probably help with #2237
- Now walreceiver_connection reports status, including commit_lsn. This allows keeping safekeeper connection even when etcd is down.
- Safekeeper connection now fails if pageserver doesn't receive safekeeper messages for some time. Usually safekeeper sends messages at least once per second.
- `LaggingWal` check now uses `commit_lsn` directly from safekeeper. This fixes the issue with often reconnects, when compute generates WAL really fast.
- `NoWalTimeout` is rewritten to trigger only when we know about the new WAL and the connected safekeeper doesn't stream any WAL. This allows setting a small `lagging_wal_timeout` because it will trigger only when we observe that the connected safekeeper has stuck.
This commit is contained in:
Arthur Petukhovsky
2022-08-15 13:31:26 +03:00
committed by GitHub
parent 431393e361
commit 116ecdf87a
5 changed files with 380 additions and 174 deletions

View File

@@ -37,7 +37,7 @@ pub mod defaults {
pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3;
pub const DEFAULT_PITR_INTERVAL: &str = "30 days";
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds";
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "3 seconds";
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024;
}

View File

@@ -17,7 +17,7 @@ use std::{
};
use anyhow::Context;
use chrono::{DateTime, Local, NaiveDateTime, Utc};
use chrono::{NaiveDateTime, Utc};
use etcd_broker::{
subscription_key::SubscriptionKey, subscription_value::SkTimelineInfo, BrokerSubscription,
BrokerUpdate, Client,
@@ -33,11 +33,10 @@ use crate::{
use crate::{RepositoryImpl, TimelineImpl};
use utils::{
lsn::Lsn,
pq_proto::ReplicationFeedback,
zid::{NodeId, ZTenantTimelineId},
};
use super::{TaskEvent, TaskHandle};
use super::{walreceiver_connection::WalConnectionStatus, TaskEvent, TaskHandle};
/// Spawns the loop to take care of the timeline's WAL streaming connection.
pub(super) fn spawn_connection_manager_task(
@@ -114,21 +113,26 @@ async fn connection_manager_loop_step(
}
} => {
let wal_connection = walreceiver_state.wal_connection.as_mut().expect("Should have a connection, as checked by the corresponding select! guard");
match &wal_connection_update {
match wal_connection_update {
TaskEvent::Started => {
wal_connection.latest_connection_update = Utc::now().naive_utc();
*walreceiver_state.wal_connection_attempts.entry(wal_connection.sk_id).or_insert(0) += 1;
},
TaskEvent::NewEvent(replication_feedback) => {
wal_connection.latest_connection_update = DateTime::<Local>::from(replication_feedback.ps_replytime).naive_utc();
// reset connection attempts here only, the only place where both nodes
// explicitly confirmn with replication feedback that they are connected to each other
walreceiver_state.wal_connection_attempts.remove(&wal_connection.sk_id);
TaskEvent::NewEvent(status) => {
if status.has_received_wal {
// Reset connection attempts here only, we know that safekeeper is healthy
// because it can send us a WAL update.
walreceiver_state.wal_connection_attempts.remove(&wal_connection.sk_id);
}
wal_connection.status = status;
},
TaskEvent::End(end_result) => {
match end_result {
Ok(()) => debug!("WAL receiving task finished"),
Err(e) => warn!("WAL receiving task failed: {e}"),
Err(e) => {
warn!("WAL receiving task failed: {e}");
// If the task failed, set the connection attempts to at least 1, to try other safekeepers.
let _ = *walreceiver_state.wal_connection_attempts.entry(wal_connection.sk_id).or_insert(1);
}
};
walreceiver_state.wal_connection = None;
},
@@ -257,10 +261,21 @@ struct WalreceiverState {
struct WalConnection {
/// Current safekeeper pageserver is connected to for WAL streaming.
sk_id: NodeId,
/// Connection task start time or the timestamp of a latest connection message received.
latest_connection_update: NaiveDateTime,
/// Status of the connection.
status: WalConnectionStatus,
/// WAL streaming task handle.
connection_task: TaskHandle<ReplicationFeedback>,
connection_task: TaskHandle<WalConnectionStatus>,
/// Have we discovered that other safekeeper has more recent WAL than we do?
discovered_new_wal: Option<NewCommittedWAL>,
}
/// Notion of a new committed WAL, which exists on other safekeeper.
#[derive(Debug, Clone, Copy)]
struct NewCommittedWAL {
/// LSN of the new committed WAL.
lsn: Lsn,
/// When we discovered that the new committed WAL exists on other safekeeper.
discovered_at: NaiveDateTime,
}
/// Data about the timeline to connect to, received from etcd.
@@ -327,10 +342,19 @@ impl WalreceiverState {
.instrument(info_span!("walreceiver_connection", id = %id))
});
let now = Utc::now().naive_utc();
self.wal_connection = Some(WalConnection {
sk_id: new_sk_id,
latest_connection_update: Utc::now().naive_utc(),
status: WalConnectionStatus {
is_connected: false,
has_received_wal: false,
latest_connection_update: now,
latest_wal_update: now,
streaming_lsn: None,
commit_lsn: None,
},
connection_task: connection_handle,
discovered_new_wal: None,
});
}
@@ -361,14 +385,16 @@ impl WalreceiverState {
/// Cleans up stale etcd records and checks the rest for the new connection candidate.
/// Returns a new candidate, if the current state is absent or somewhat lagging, `None` otherwise.
/// The current rules for approving new candidates:
/// * pick from the input data from etcd for currently connected safekeeper (if any)
/// * out of the rest input entries, pick one with biggest `commit_lsn` that's after than pageserver's latest Lsn for the timeline
/// * pick a candidate different from the connected safekeeper with biggest `commit_lsn` and lowest failed connection attemps
/// * if there's no such entry, no new candidate found, abort
/// * check the current connection time data for staleness, reconnect if stale
/// * otherwise, check if etcd updates contain currently connected safekeeper
/// * if not, that means no WAL updates happened after certain time (either none since the connection time or none since the last event after the connection)
/// Reconnect if the time exceeds the threshold.
/// * if there's one, compare its Lsn with the other candidate's, reconnect if candidate's over threshold
/// * otherwise check if the candidate is much better than the current one
///
/// To understand exact rules for determining if the candidate is better than the current one, refer to this function's implementation.
/// General rules are following:
/// * if connected safekeeper is not present, pick the candidate
/// * if we haven't received any updates for some time, pick the candidate
/// * if the candidate commit_lsn is much higher than the current one, pick the candidate
/// * if connected safekeeper stopped sending us new WAL which is available on other safekeeper, pick the candidate
///
/// This way we ensure to keep up with the most up-to-date safekeeper and don't try to jump from one safekeeper to another too frequently.
/// Both thresholds are configured per tenant.
@@ -384,53 +410,128 @@ impl WalreceiverState {
let now = Utc::now().naive_utc();
if let Ok(latest_interaciton) =
(now - existing_wal_connection.latest_connection_update).to_std()
(now - existing_wal_connection.status.latest_connection_update).to_std()
{
if latest_interaciton > self.lagging_wal_timeout {
// Drop connection if we haven't received keepalive message for a while.
if latest_interaciton > self.wal_connect_timeout {
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
wal_source_connstr: new_wal_source_connstr,
reason: ReconnectReason::NoWalTimeout {
last_wal_interaction: Some(
existing_wal_connection.latest_connection_update,
reason: ReconnectReason::NoKeepAlives {
last_keep_alive: Some(
existing_wal_connection.status.latest_connection_update,
),
check_time: now,
threshold: self.lagging_wal_timeout,
threshold: self.wal_connect_timeout,
},
});
}
}
match self.wal_stream_candidates.get(&connected_sk_node) {
Some(current_connection_etcd_data) => {
let new_lsn = new_safekeeper_etcd_data.commit_lsn.unwrap_or(Lsn(0));
let current_lsn = current_connection_etcd_data
.timeline
.commit_lsn
.unwrap_or(Lsn(0));
match new_lsn.0.checked_sub(current_lsn.0)
{
Some(new_sk_lsn_advantage) => {
if new_sk_lsn_advantage >= self.max_lsn_wal_lag.get() {
return Some(
NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
wal_source_connstr: new_wal_source_connstr,
reason: ReconnectReason::LaggingWal { current_lsn, new_lsn, threshold: self.max_lsn_wal_lag },
});
}
}
None => debug!("Best SK candidate has its commit Lsn behind the current timeline's latest consistent Lsn"),
if !existing_wal_connection.status.is_connected {
// We haven't connected yet and we shouldn't switch until connection timeout (condition above).
return None;
}
if let Some(current_commit_lsn) = existing_wal_connection.status.commit_lsn {
let new_commit_lsn = new_safekeeper_etcd_data.commit_lsn.unwrap_or(Lsn(0));
// Check if the new candidate has much more WAL than the current one.
match new_commit_lsn.0.checked_sub(current_commit_lsn.0) {
Some(new_sk_lsn_advantage) => {
if new_sk_lsn_advantage >= self.max_lsn_wal_lag.get() {
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
wal_source_connstr: new_wal_source_connstr,
reason: ReconnectReason::LaggingWal {
current_commit_lsn,
new_commit_lsn,
threshold: self.max_lsn_wal_lag,
},
});
}
}
None => {
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
wal_source_connstr: new_wal_source_connstr,
reason: ReconnectReason::NoEtcdDataForExistingConnection,
})
}
None => debug!(
"Best SK candidate has its commit_lsn behind connected SK's commit_lsn"
),
}
}
let current_lsn = match existing_wal_connection.status.streaming_lsn {
Some(lsn) => lsn,
None => self.local_timeline.get_last_record_lsn(),
};
let current_commit_lsn = existing_wal_connection
.status
.commit_lsn
.unwrap_or(current_lsn);
let candidate_commit_lsn = new_safekeeper_etcd_data.commit_lsn.unwrap_or(Lsn(0));
// Keep discovered_new_wal only if connected safekeeper has not caught up yet.
let mut discovered_new_wal = existing_wal_connection
.discovered_new_wal
.filter(|new_wal| new_wal.lsn > current_commit_lsn);
if discovered_new_wal.is_none() {
// Check if the new candidate has more WAL than the current one.
// If the new candidate has more WAL than the current one, we consider switching to the new candidate.
discovered_new_wal = if candidate_commit_lsn > current_commit_lsn {
trace!(
"New candidate has commit_lsn {}, higher than current_commit_lsn {}",
candidate_commit_lsn,
current_commit_lsn
);
Some(NewCommittedWAL {
lsn: candidate_commit_lsn,
discovered_at: Utc::now().naive_utc(),
})
} else {
None
};
}
let waiting_for_new_lsn_since = if current_lsn < current_commit_lsn {
// Connected safekeeper has more WAL, but we haven't received updates for some time.
trace!(
"Connected safekeeper has more WAL, but we haven't received updates for {:?}. current_lsn: {}, current_commit_lsn: {}",
(now - existing_wal_connection.status.latest_wal_update).to_std(),
current_lsn,
current_commit_lsn
);
Some(existing_wal_connection.status.latest_wal_update)
} else {
discovered_new_wal.as_ref().map(|new_wal| {
// We know that new WAL is available on other safekeeper, but connected safekeeper don't have it.
new_wal
.discovered_at
.max(existing_wal_connection.status.latest_wal_update)
})
};
// If we haven't received any WAL updates for a while and candidate has more WAL, switch to it.
if let Some(waiting_for_new_lsn_since) = waiting_for_new_lsn_since {
if let Ok(waiting_for_new_wal) = (now - waiting_for_new_lsn_since).to_std() {
if candidate_commit_lsn > current_commit_lsn
&& waiting_for_new_wal > self.lagging_wal_timeout
{
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
wal_source_connstr: new_wal_source_connstr,
reason: ReconnectReason::NoWalTimeout {
current_lsn,
current_commit_lsn,
candidate_commit_lsn,
last_wal_interaction: Some(
existing_wal_connection.status.latest_wal_update,
),
check_time: now,
threshold: self.lagging_wal_timeout,
},
});
}
}
}
self.wal_connection.as_mut().unwrap().discovered_new_wal = discovered_new_wal;
}
None => {
let (new_sk_id, _, new_wal_source_connstr) =
@@ -450,7 +551,7 @@ impl WalreceiverState {
/// Optionally, omits the given node, to support gracefully switching from a healthy safekeeper to another.
///
/// The candidate that is chosen:
/// * has fewest connection attempts from pageserver to safekeeper node (reset every time the WAL replication feedback is sent)
/// * has fewest connection attempts from pageserver to safekeeper node (reset every time we receive a WAL message from the node)
/// * has greatest data Lsn among the ones that are left
///
/// NOTE:
@@ -489,14 +590,13 @@ impl WalreceiverState {
.max_by_key(|(_, info, _)| info.commit_lsn)
}
/// Returns a list of safekeepers that have valid info and ready for connection.
fn applicable_connection_candidates(
&self,
) -> impl Iterator<Item = (NodeId, &SkTimelineInfo, String)> {
self.wal_stream_candidates
.iter()
.filter(|(_, etcd_info)| {
etcd_info.timeline.commit_lsn > Some(self.local_timeline.get_last_record_lsn())
})
.filter(|(_, info)| info.timeline.commit_lsn.is_some())
.filter_map(|(sk_id, etcd_info)| {
let info = &etcd_info.timeline;
match wal_stream_connection_string(
@@ -512,6 +612,7 @@ impl WalreceiverState {
})
}
/// Remove candidates which haven't sent etcd updates for a while.
fn cleanup_old_candidates(&mut self) {
let mut node_ids_to_remove = Vec::with_capacity(self.wal_stream_candidates.len());
@@ -546,17 +647,24 @@ struct NewWalConnectionCandidate {
#[derive(Debug, PartialEq, Eq)]
enum ReconnectReason {
NoExistingConnection,
NoEtcdDataForExistingConnection,
LaggingWal {
current_lsn: Lsn,
new_lsn: Lsn,
current_commit_lsn: Lsn,
new_commit_lsn: Lsn,
threshold: NonZeroU64,
},
NoWalTimeout {
current_lsn: Lsn,
current_commit_lsn: Lsn,
candidate_commit_lsn: Lsn,
last_wal_interaction: Option<NaiveDateTime>,
check_time: NaiveDateTime,
threshold: Duration,
},
NoKeepAlives {
last_keep_alive: Option<NaiveDateTime>,
check_time: NaiveDateTime,
threshold: Duration,
},
}
fn wal_stream_connection_string(
@@ -580,7 +688,6 @@ fn wal_stream_connection_string(
#[cfg(test)]
mod tests {
use std::time::SystemTime;
use crate::repository::{
repo_harness::{RepoHarness, TIMELINE_ID},
@@ -658,7 +765,7 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
safekeeper_connstr: None,
},
etcd_version: 0,
latest_update: delay_over_threshold,
@@ -684,22 +791,26 @@ mod tests {
let connected_sk_id = NodeId(0);
let current_lsn = 100_000;
let connection_status = WalConnectionStatus {
is_connected: true,
has_received_wal: true,
latest_connection_update: now,
latest_wal_update: now,
commit_lsn: Some(Lsn(current_lsn)),
streaming_lsn: Some(Lsn(current_lsn)),
};
state.max_lsn_wal_lag = NonZeroU64::new(100).unwrap();
state.wal_connection = Some(WalConnection {
sk_id: connected_sk_id,
latest_connection_update: now,
status: connection_status.clone(),
connection_task: TaskHandle::spawn(move |sender, _| async move {
sender
.send(TaskEvent::NewEvent(ReplicationFeedback {
current_timeline_size: 1,
ps_writelsn: 1,
ps_applylsn: current_lsn,
ps_flushlsn: 1,
ps_replytime: SystemTime::now(),
}))
.send(TaskEvent::NewEvent(connection_status.clone()))
.ok();
Ok(())
}),
discovered_new_wal: None,
});
state.wal_stream_candidates = HashMap::from([
(
@@ -924,65 +1035,6 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn connection_no_etcd_data_candidate() -> anyhow::Result<()> {
let harness = RepoHarness::create("connection_no_etcd_data_candidate")?;
let mut state = dummy_state(&harness);
let now = Utc::now().naive_utc();
let current_lsn = Lsn(100_000).align();
let connected_sk_id = NodeId(0);
let other_sk_id = NodeId(connected_sk_id.0 + 1);
state.wal_connection = Some(WalConnection {
sk_id: connected_sk_id,
latest_connection_update: now,
connection_task: TaskHandle::spawn(move |sender, _| async move {
sender
.send(TaskEvent::NewEvent(ReplicationFeedback {
current_timeline_size: 1,
ps_writelsn: current_lsn.0,
ps_applylsn: 1,
ps_flushlsn: 1,
ps_replytime: SystemTime::now(),
}))
.ok();
Ok(())
}),
});
state.wal_stream_candidates = HashMap::from([(
other_sk_id,
EtcdSkTimeline {
timeline: SkTimelineInfo {
last_log_term: None,
flush_lsn: None,
commit_lsn: Some(Lsn(1 + state.max_lsn_wal_lag.get())),
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
latest_update: now,
},
)]);
let only_candidate = state
.next_connection_candidate()
.expect("Expected one candidate selected out of the only data option, but got none");
assert_eq!(only_candidate.safekeeper_id, other_sk_id);
assert_eq!(
only_candidate.reason,
ReconnectReason::NoEtcdDataForExistingConnection,
"Should select new safekeeper due to missing etcd data, even if there's an existing connection with this safekeeper"
);
assert!(only_candidate
.wal_source_connstr
.contains(DUMMY_SAFEKEEPER_CONNSTR));
Ok(())
}
#[tokio::test]
async fn lsn_wal_over_threshhold_current_candidate() -> anyhow::Result<()> {
let harness = RepoHarness::create("lsn_wal_over_threshcurrent_candidate")?;
@@ -993,21 +1045,25 @@ mod tests {
let connected_sk_id = NodeId(0);
let new_lsn = Lsn(current_lsn.0 + state.max_lsn_wal_lag.get() + 1);
let connection_status = WalConnectionStatus {
is_connected: true,
has_received_wal: true,
latest_connection_update: now,
latest_wal_update: now,
commit_lsn: Some(current_lsn),
streaming_lsn: Some(current_lsn),
};
state.wal_connection = Some(WalConnection {
sk_id: connected_sk_id,
latest_connection_update: now,
status: connection_status.clone(),
connection_task: TaskHandle::spawn(move |sender, _| async move {
sender
.send(TaskEvent::NewEvent(ReplicationFeedback {
current_timeline_size: 1,
ps_writelsn: current_lsn.0,
ps_applylsn: 1,
ps_flushlsn: 1,
ps_replytime: SystemTime::now(),
}))
.send(TaskEvent::NewEvent(connection_status.clone()))
.ok();
Ok(())
}),
discovered_new_wal: None,
});
state.wal_stream_candidates = HashMap::from([
(
@@ -1052,8 +1108,8 @@ mod tests {
assert_eq!(
over_threshcurrent_candidate.reason,
ReconnectReason::LaggingWal {
current_lsn,
new_lsn,
current_commit_lsn: current_lsn,
new_commit_lsn: new_lsn,
threshold: state.max_lsn_wal_lag
},
"Should select bigger WAL safekeeper if it starts to lag enough"
@@ -1066,31 +1122,35 @@ mod tests {
}
#[tokio::test]
async fn timeout_wal_over_threshhold_current_candidate() -> anyhow::Result<()> {
let harness = RepoHarness::create("timeout_wal_over_threshhold_current_candidate")?;
async fn timeout_connection_threshhold_current_candidate() -> anyhow::Result<()> {
let harness = RepoHarness::create("timeout_connection_threshhold_current_candidate")?;
let mut state = dummy_state(&harness);
let current_lsn = Lsn(100_000).align();
let now = Utc::now().naive_utc();
let lagging_wal_timeout = chrono::Duration::from_std(state.lagging_wal_timeout)?;
let wal_connect_timeout = chrono::Duration::from_std(state.wal_connect_timeout)?;
let time_over_threshold =
Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout;
Utc::now().naive_utc() - wal_connect_timeout - wal_connect_timeout;
let connection_status = WalConnectionStatus {
is_connected: true,
has_received_wal: true,
latest_connection_update: time_over_threshold,
latest_wal_update: time_over_threshold,
commit_lsn: Some(current_lsn),
streaming_lsn: Some(current_lsn),
};
state.wal_connection = Some(WalConnection {
sk_id: NodeId(1),
latest_connection_update: time_over_threshold,
status: connection_status.clone(),
connection_task: TaskHandle::spawn(move |sender, _| async move {
sender
.send(TaskEvent::NewEvent(ReplicationFeedback {
current_timeline_size: 1,
ps_writelsn: current_lsn.0,
ps_applylsn: 1,
ps_flushlsn: 1,
ps_replytime: SystemTime::now(),
}))
.send(TaskEvent::NewEvent(connection_status.clone()))
.ok();
Ok(())
}),
discovered_new_wal: None,
});
state.wal_stream_candidates = HashMap::from([(
NodeId(0),
@@ -1115,12 +1175,12 @@ mod tests {
assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
match over_threshcurrent_candidate.reason {
ReconnectReason::NoWalTimeout {
last_wal_interaction,
ReconnectReason::NoKeepAlives {
last_keep_alive,
threshold,
..
} => {
assert_eq!(last_wal_interaction, Some(time_over_threshold));
assert_eq!(last_keep_alive, Some(time_over_threshold));
assert_eq!(threshold, state.lagging_wal_timeout);
}
unexpected => panic!("Unexpected reason: {unexpected:?}"),
@@ -1133,20 +1193,34 @@ mod tests {
}
#[tokio::test]
async fn timeout_connection_over_threshhold_current_candidate() -> anyhow::Result<()> {
let harness = RepoHarness::create("timeout_connection_over_threshhold_current_candidate")?;
async fn timeout_wal_over_threshhold_current_candidate() -> anyhow::Result<()> {
let harness = RepoHarness::create("timeout_wal_over_threshhold_current_candidate")?;
let mut state = dummy_state(&harness);
let current_lsn = Lsn(100_000).align();
let new_lsn = Lsn(100_100).align();
let now = Utc::now().naive_utc();
let lagging_wal_timeout = chrono::Duration::from_std(state.lagging_wal_timeout)?;
let time_over_threshold =
Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout;
let connection_status = WalConnectionStatus {
is_connected: true,
has_received_wal: true,
latest_connection_update: now,
latest_wal_update: time_over_threshold,
commit_lsn: Some(current_lsn),
streaming_lsn: Some(current_lsn),
};
state.wal_connection = Some(WalConnection {
sk_id: NodeId(1),
latest_connection_update: time_over_threshold,
status: connection_status,
connection_task: TaskHandle::spawn(move |_, _| async move { Ok(()) }),
discovered_new_wal: Some(NewCommittedWAL {
discovered_at: time_over_threshold,
lsn: new_lsn,
}),
});
state.wal_stream_candidates = HashMap::from([(
NodeId(0),
@@ -1154,7 +1228,7 @@ mod tests {
timeline: SkTimelineInfo {
last_log_term: None,
flush_lsn: None,
commit_lsn: Some(current_lsn),
commit_lsn: Some(new_lsn),
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
@@ -1172,10 +1246,16 @@ mod tests {
assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
match over_threshcurrent_candidate.reason {
ReconnectReason::NoWalTimeout {
current_lsn,
current_commit_lsn,
candidate_commit_lsn,
last_wal_interaction,
threshold,
..
} => {
assert_eq!(current_lsn, current_lsn);
assert_eq!(current_commit_lsn, current_lsn);
assert_eq!(candidate_commit_lsn, new_lsn);
assert_eq!(last_wal_interaction, Some(time_over_threshold));
assert_eq!(threshold, state.lagging_wal_timeout);
}
@@ -1202,7 +1282,7 @@ mod tests {
.expect("Failed to create an empty timeline for dummy wal connection manager"),
wal_connect_timeout: Duration::from_secs(1),
lagging_wal_timeout: Duration::from_secs(1),
max_lsn_wal_lag: NonZeroU64::new(1).unwrap(),
max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
wal_connection: None,
wal_stream_candidates: HashMap::new(),
wal_connection_attempts: HashMap::new(),

View File

@@ -8,6 +8,7 @@ use std::{
use anyhow::{bail, ensure, Context};
use bytes::BytesMut;
use chrono::{NaiveDateTime, Utc};
use fail::fail_point;
use futures::StreamExt;
use postgres::{SimpleQueryMessage, SimpleQueryRow};
@@ -29,12 +30,29 @@ use crate::{
use postgres_ffi::waldecoder::WalStreamDecoder;
use utils::{lsn::Lsn, pq_proto::ReplicationFeedback, zid::ZTenantTimelineId};
/// Status of the connection.
#[derive(Debug, Clone)]
pub struct WalConnectionStatus {
/// If we were able to initiate a postgres connection, this means that safekeeper process is at least running.
pub is_connected: bool,
/// Defines a healthy connection as one on which we have received at least some WAL bytes.
pub has_received_wal: bool,
/// Connection establishment time or the timestamp of a latest connection message received.
pub latest_connection_update: NaiveDateTime,
/// Time of the latest WAL message received.
pub latest_wal_update: NaiveDateTime,
/// Latest WAL update contained WAL up to this LSN. Next WAL message with start from that LSN.
pub streaming_lsn: Option<Lsn>,
/// Latest commit_lsn received from the safekeeper. Can be zero if no message has been received yet.
pub commit_lsn: Option<Lsn>,
}
/// Open a connection to the given safekeeper and receive WAL, sending back progress
/// messages as we go.
pub async fn handle_walreceiver_connection(
id: ZTenantTimelineId,
wal_source_connstr: &str,
events_sender: &watch::Sender<TaskEvent<ReplicationFeedback>>,
events_sender: &watch::Sender<TaskEvent<WalConnectionStatus>>,
mut cancellation: watch::Receiver<()>,
connect_timeout: Duration,
) -> anyhow::Result<()> {
@@ -49,12 +67,26 @@ pub async fn handle_walreceiver_connection(
.await
.context("Timed out while waiting for walreceiver connection to open")?
.context("Failed to open walreceiver conection")?;
info!("connected!");
let mut connection_status = WalConnectionStatus {
is_connected: true,
has_received_wal: false,
latest_connection_update: Utc::now().naive_utc(),
latest_wal_update: Utc::now().naive_utc(),
streaming_lsn: None,
commit_lsn: None,
};
if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) {
warn!("Wal connection event listener dropped right after connection init, aborting the connection: {e}");
return Ok(());
}
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
let mut connection_cancellation = cancellation.clone();
tokio::spawn(
async move {
info!("connected!");
select! {
connection_result = connection => match connection_result{
Ok(()) => info!("Walreceiver db connection closed"),
@@ -84,6 +116,14 @@ pub async fn handle_walreceiver_connection(
let identify = identify_system(&mut replication_client).await?;
info!("{identify:?}");
connection_status.latest_connection_update = Utc::now().naive_utc();
if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) {
warn!("Wal connection event listener dropped after IDENTIFY_SYSTEM, aborting the connection: {e}");
return Ok(());
}
// NB: this is a flush_lsn, not a commit_lsn.
let end_of_wal = Lsn::from(u64::from(identify.xlogpos));
let mut caught_up = false;
let ZTenantTimelineId {
@@ -118,7 +158,7 @@ pub async fn handle_walreceiver_connection(
// There might be some padding after the last full record, skip it.
startpoint += startpoint.calc_padding(8u32);
info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, server is at {end_of_wal}...");
info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, safekeeper is at {end_of_wal}...");
let query = format!("START_REPLICATION PHYSICAL {startpoint}");
@@ -140,6 +180,33 @@ pub async fn handle_walreceiver_connection(
}
} {
let replication_message = replication_message?;
let now = Utc::now().naive_utc();
// Update the connection status before processing the message. If the message processing
// fails (e.g. in walingest), we still want to know latests LSNs from the safekeeper.
match &replication_message {
ReplicationMessage::XLogData(xlog_data) => {
connection_status.latest_connection_update = now;
connection_status.commit_lsn = Some(Lsn::from(xlog_data.wal_end()));
connection_status.streaming_lsn = Some(Lsn::from(
xlog_data.wal_start() + xlog_data.data().len() as u64,
));
if !xlog_data.data().is_empty() {
connection_status.latest_wal_update = now;
connection_status.has_received_wal = true;
}
}
ReplicationMessage::PrimaryKeepAlive(keepalive) => {
connection_status.latest_connection_update = now;
connection_status.commit_lsn = Some(Lsn::from(keepalive.wal_end()));
}
&_ => {}
};
if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) {
warn!("Wal connection event listener dropped, aborting the connection: {e}");
return Ok(());
}
let status_update = match replication_message {
ReplicationMessage::XLogData(xlog_data) => {
// Pass the WAL data to the decoder, and see if we can decode
@@ -257,10 +324,6 @@ pub async fn handle_walreceiver_connection(
.as_mut()
.zenith_status_update(data.len() as u64, &data)
.await?;
if let Err(e) = events_sender.send(TaskEvent::NewEvent(zenith_status_update)) {
warn!("Wal connection event listener dropped, aborting the connection: {e}");
return Ok(());
}
}
}

View File

@@ -1090,11 +1090,9 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
# Remove initial tenant fully (two branches are active)
response = sk_http.tenant_delete_force(tenant_id)
assert response == {
timeline_id_3: {
"dir_existed": True,
"was_active": True,
}
assert response[timeline_id_3] == {
"dir_existed": True,
"was_active": True,
}
assert not (sk_data_dir / tenant_id).exists()
assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir()

View File

@@ -520,3 +520,68 @@ def test_race_conditions(neon_env_builder: NeonEnvBuilder):
pg = env.postgres.create_start('test_safekeepers_race_conditions')
asyncio.run(run_race_conditions(env, pg))
# Check that pageserver can select safekeeper with largest commit_lsn
# and switch if LSN is not updated for some time (NoWalTimeout).
async def run_wal_lagging(env: NeonEnv, pg: Postgres):
def safekeepers_guc(env: NeonEnv, active_sk: List[bool]) -> str:
# use ports 10, 11 and 12 to simulate unavailable safekeepers
return ','.join([
f'localhost:{sk.port.pg if active else 10 + i}'
for i, (sk, active) in enumerate(zip(env.safekeepers, active_sk))
])
conn = await pg.connect_async()
await conn.execute('CREATE TABLE t(key int primary key, value text)')
await conn.close()
pg.stop()
n_iterations = 20
n_txes = 10000
expected_sum = 0
i = 1
quorum = len(env.safekeepers) // 2 + 1
for it in range(n_iterations):
active_sk = list(map(lambda _: random.random() >= 0.5, env.safekeepers))
active_count = sum(active_sk)
if active_count < quorum:
it -= 1
continue
pg.adjust_for_safekeepers(safekeepers_guc(env, active_sk))
log.info(f'Iteration {it}: {active_sk}')
pg.start()
conn = await pg.connect_async()
for _ in range(n_txes):
await conn.execute(f"INSERT INTO t values ({i}, 'payload')")
expected_sum += i
i += 1
await conn.close()
pg.stop()
pg.adjust_for_safekeepers(safekeepers_guc(env, [True] * len(env.safekeepers)))
pg.start()
conn = await pg.connect_async()
log.info(f'Executed {i-1} queries')
res = await conn.fetchval('SELECT sum(key) FROM t')
assert res == expected_sum
# do inserts while restarting postgres and messing with safekeeper addresses
def test_wal_lagging(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_wal_lagging')
pg = env.postgres.create_start('test_wal_lagging')
asyncio.run(run_wal_lagging(env, pg))