mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-03 19:42:55 +00:00
Switch to safekeeper in the same AZ (#3883)
Add a condition to switch walreceiver connection to safekeeper that is located in the same availability zone. Switch happens when commit_lsn of a candidate is not less than commit_lsn from the active connection. This condition is expected not to trigger instantly, because commit_lsn of a current connection is usually greater than commit_lsn of updates from the broker. That means that if WAL is written continuously, switch can take a lot of time, but it should happen eventually. Now protoc 3.15+ is required for building neon. Fixes https://github.com/neondatabase/neon/issues/3200
This commit is contained in:
committed by
GitHub
parent
75ffe34b17
commit
814abd9f84
@@ -40,6 +40,8 @@ pacman -S base-devel readline zlib libseccomp openssl clang \
|
||||
postgresql-libs cmake postgresql protobuf
|
||||
```
|
||||
|
||||
Building Neon requires 3.15+ version of `protoc` (protobuf-compiler). If your distribution provides an older version, you can install a newer version from [here](https://github.com/protocolbuffers/protobuf/releases).
|
||||
|
||||
2. [Install Rust](https://www.rust-lang.org/tools/install)
|
||||
```
|
||||
# recommended approach from https://www.rust-lang.org/tools/install
|
||||
|
||||
@@ -237,11 +237,7 @@ async fn connection_manager_loop_step(
|
||||
if let Some(new_candidate) = walreceiver_state.next_connection_candidate() {
|
||||
info!("Switching to new connection candidate: {new_candidate:?}");
|
||||
walreceiver_state
|
||||
.change_connection(
|
||||
new_candidate.safekeeper_id,
|
||||
new_candidate.wal_source_connconf,
|
||||
ctx,
|
||||
)
|
||||
.change_connection(new_candidate, ctx)
|
||||
.await
|
||||
}
|
||||
}
|
||||
@@ -346,6 +342,8 @@ struct WalConnection {
|
||||
started_at: NaiveDateTime,
|
||||
/// Current safekeeper pageserver is connected to for WAL streaming.
|
||||
sk_id: NodeId,
|
||||
/// Availability zone of the safekeeper.
|
||||
availability_zone: Option<String>,
|
||||
/// Status of the connection.
|
||||
status: WalConnectionStatus,
|
||||
/// WAL streaming task handle.
|
||||
@@ -405,12 +403,7 @@ impl WalreceiverState {
|
||||
}
|
||||
|
||||
/// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
|
||||
async fn change_connection(
|
||||
&mut self,
|
||||
new_sk_id: NodeId,
|
||||
new_wal_source_connconf: PgConnectionConfig,
|
||||
ctx: &RequestContext,
|
||||
) {
|
||||
async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &RequestContext) {
|
||||
self.drop_old_connection(true).await;
|
||||
|
||||
let id = self.id;
|
||||
@@ -424,7 +417,7 @@ impl WalreceiverState {
|
||||
async move {
|
||||
super::walreceiver_connection::handle_walreceiver_connection(
|
||||
timeline,
|
||||
new_wal_source_connconf,
|
||||
new_sk.wal_source_connconf,
|
||||
events_sender,
|
||||
cancellation,
|
||||
connect_timeout,
|
||||
@@ -433,13 +426,16 @@ impl WalreceiverState {
|
||||
.await
|
||||
.context("walreceiver connection handling failure")
|
||||
}
|
||||
.instrument(info_span!("walreceiver_connection", id = %id, node_id = %new_sk_id))
|
||||
.instrument(
|
||||
info_span!("walreceiver_connection", id = %id, node_id = %new_sk.safekeeper_id),
|
||||
)
|
||||
});
|
||||
|
||||
let now = Utc::now().naive_utc();
|
||||
self.wal_connection = Some(WalConnection {
|
||||
started_at: now,
|
||||
sk_id: new_sk_id,
|
||||
sk_id: new_sk.safekeeper_id,
|
||||
availability_zone: new_sk.availability_zone,
|
||||
status: WalConnectionStatus {
|
||||
is_connected: false,
|
||||
has_processed_wal: false,
|
||||
@@ -546,6 +542,7 @@ impl WalreceiverState {
|
||||
/// * if connected safekeeper is not present, pick the candidate
|
||||
/// * if we haven't received any updates for some time, pick the candidate
|
||||
/// * if the candidate commit_lsn is much higher than the current one, pick the candidate
|
||||
/// * if the candidate commit_lsn is same, but candidate is located in the same AZ as the pageserver, pick the candidate
|
||||
/// * if connected safekeeper stopped sending us new WAL which is available on other safekeeper, pick the candidate
|
||||
///
|
||||
/// This way we ensure to keep up with the most up-to-date safekeeper and don't try to jump from one safekeeper to another too frequently.
|
||||
@@ -559,6 +556,7 @@ impl WalreceiverState {
|
||||
|
||||
let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
|
||||
self.select_connection_candidate(Some(connected_sk_node))?;
|
||||
let new_availability_zone = new_safekeeper_broker_data.availability_zone.clone();
|
||||
|
||||
let now = Utc::now().naive_utc();
|
||||
if let Ok(latest_interaciton) =
|
||||
@@ -569,6 +567,7 @@ impl WalreceiverState {
|
||||
return Some(NewWalConnectionCandidate {
|
||||
safekeeper_id: new_sk_id,
|
||||
wal_source_connconf: new_wal_source_connconf,
|
||||
availability_zone: new_availability_zone,
|
||||
reason: ReconnectReason::NoKeepAlives {
|
||||
last_keep_alive: Some(
|
||||
existing_wal_connection.status.latest_connection_update,
|
||||
@@ -594,6 +593,7 @@ impl WalreceiverState {
|
||||
return Some(NewWalConnectionCandidate {
|
||||
safekeeper_id: new_sk_id,
|
||||
wal_source_connconf: new_wal_source_connconf,
|
||||
availability_zone: new_availability_zone,
|
||||
reason: ReconnectReason::LaggingWal {
|
||||
current_commit_lsn,
|
||||
new_commit_lsn,
|
||||
@@ -601,6 +601,20 @@ impl WalreceiverState {
|
||||
},
|
||||
});
|
||||
}
|
||||
// If we have a candidate with the same commit_lsn as the current one, which is in the same AZ as pageserver,
|
||||
// and the current one is not, switch to the new one.
|
||||
if self.availability_zone.is_some()
|
||||
&& existing_wal_connection.availability_zone
|
||||
!= self.availability_zone
|
||||
&& self.availability_zone == new_availability_zone
|
||||
{
|
||||
return Some(NewWalConnectionCandidate {
|
||||
safekeeper_id: new_sk_id,
|
||||
availability_zone: new_availability_zone,
|
||||
wal_source_connconf: new_wal_source_connconf,
|
||||
reason: ReconnectReason::SwitchAvailabilityZone,
|
||||
});
|
||||
}
|
||||
}
|
||||
None => debug!(
|
||||
"Best SK candidate has its commit_lsn behind connected SK's commit_lsn"
|
||||
@@ -668,6 +682,7 @@ impl WalreceiverState {
|
||||
return Some(NewWalConnectionCandidate {
|
||||
safekeeper_id: new_sk_id,
|
||||
wal_source_connconf: new_wal_source_connconf,
|
||||
availability_zone: new_availability_zone,
|
||||
reason: ReconnectReason::NoWalTimeout {
|
||||
current_lsn,
|
||||
current_commit_lsn,
|
||||
@@ -686,10 +701,11 @@ impl WalreceiverState {
|
||||
self.wal_connection.as_mut().unwrap().discovered_new_wal = discovered_new_wal;
|
||||
}
|
||||
None => {
|
||||
let (new_sk_id, _, new_wal_source_connconf) =
|
||||
let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
|
||||
self.select_connection_candidate(None)?;
|
||||
return Some(NewWalConnectionCandidate {
|
||||
safekeeper_id: new_sk_id,
|
||||
availability_zone: new_safekeeper_broker_data.availability_zone.clone(),
|
||||
wal_source_connconf: new_wal_source_connconf,
|
||||
reason: ReconnectReason::NoExistingConnection,
|
||||
});
|
||||
@@ -794,6 +810,7 @@ impl WalreceiverState {
|
||||
struct NewWalConnectionCandidate {
|
||||
safekeeper_id: NodeId,
|
||||
wal_source_connconf: PgConnectionConfig,
|
||||
availability_zone: Option<String>,
|
||||
// This field is used in `derive(Debug)` only.
|
||||
#[allow(dead_code)]
|
||||
reason: ReconnectReason,
|
||||
@@ -808,6 +825,7 @@ enum ReconnectReason {
|
||||
new_commit_lsn: Lsn,
|
||||
threshold: NonZeroU64,
|
||||
},
|
||||
SwitchAvailabilityZone,
|
||||
NoWalTimeout {
|
||||
current_lsn: Lsn,
|
||||
current_commit_lsn: Lsn,
|
||||
@@ -873,6 +891,7 @@ mod tests {
|
||||
peer_horizon_lsn: 0,
|
||||
local_start_lsn: 0,
|
||||
safekeeper_connstr: safekeeper_connstr.to_owned(),
|
||||
availability_zone: None,
|
||||
},
|
||||
latest_update,
|
||||
}
|
||||
@@ -933,6 +952,7 @@ mod tests {
|
||||
state.wal_connection = Some(WalConnection {
|
||||
started_at: now,
|
||||
sk_id: connected_sk_id,
|
||||
availability_zone: None,
|
||||
status: connection_status,
|
||||
connection_task: TaskHandle::spawn(move |sender, _| async move {
|
||||
sender
|
||||
@@ -1095,6 +1115,7 @@ mod tests {
|
||||
state.wal_connection = Some(WalConnection {
|
||||
started_at: now,
|
||||
sk_id: connected_sk_id,
|
||||
availability_zone: None,
|
||||
status: connection_status,
|
||||
connection_task: TaskHandle::spawn(move |sender, _| async move {
|
||||
sender
|
||||
@@ -1160,6 +1181,7 @@ mod tests {
|
||||
state.wal_connection = Some(WalConnection {
|
||||
started_at: now,
|
||||
sk_id: NodeId(1),
|
||||
availability_zone: None,
|
||||
status: connection_status,
|
||||
connection_task: TaskHandle::spawn(move |sender, _| async move {
|
||||
sender
|
||||
@@ -1222,6 +1244,7 @@ mod tests {
|
||||
state.wal_connection = Some(WalConnection {
|
||||
started_at: now,
|
||||
sk_id: NodeId(1),
|
||||
availability_zone: None,
|
||||
status: connection_status,
|
||||
connection_task: TaskHandle::spawn(move |_, _| async move { Ok(()) }),
|
||||
discovered_new_wal: Some(NewCommittedWAL {
|
||||
@@ -1289,4 +1312,74 @@ mod tests {
|
||||
availability_zone: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn switch_to_same_availability_zone() -> anyhow::Result<()> {
|
||||
// Pageserver and one of safekeepers will be in the same availability zone
|
||||
// and pageserver should prefer to connect to it.
|
||||
let test_az = Some("test_az".to_owned());
|
||||
|
||||
let harness = TenantHarness::create("switch_to_same_availability_zone")?;
|
||||
let mut state = dummy_state(&harness).await;
|
||||
state.availability_zone = test_az.clone();
|
||||
let current_lsn = Lsn(100_000).align();
|
||||
let now = Utc::now().naive_utc();
|
||||
|
||||
let connected_sk_id = NodeId(0);
|
||||
|
||||
let connection_status = WalConnectionStatus {
|
||||
is_connected: true,
|
||||
has_processed_wal: true,
|
||||
latest_connection_update: now,
|
||||
latest_wal_update: now,
|
||||
commit_lsn: Some(current_lsn),
|
||||
streaming_lsn: Some(current_lsn),
|
||||
};
|
||||
|
||||
state.wal_connection = Some(WalConnection {
|
||||
started_at: now,
|
||||
sk_id: connected_sk_id,
|
||||
availability_zone: None,
|
||||
status: connection_status,
|
||||
connection_task: TaskHandle::spawn(move |sender, _| async move {
|
||||
sender
|
||||
.send(TaskStateUpdate::Progress(connection_status))
|
||||
.ok();
|
||||
Ok(())
|
||||
}),
|
||||
discovered_new_wal: None,
|
||||
});
|
||||
|
||||
// We have another safekeeper with the same commit_lsn, and it have the same availability zone as
|
||||
// the current pageserver.
|
||||
let mut same_az_sk = dummy_broker_sk_timeline(current_lsn.0, "same_az", now);
|
||||
same_az_sk.timeline.availability_zone = test_az.clone();
|
||||
|
||||
state.wal_stream_candidates = HashMap::from([
|
||||
(
|
||||
connected_sk_id,
|
||||
dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
|
||||
),
|
||||
(NodeId(1), same_az_sk),
|
||||
]);
|
||||
|
||||
// We expect that pageserver will switch to the safekeeper in the same availability zone,
|
||||
// even if it has the same commit_lsn.
|
||||
let next_candidate = state.next_connection_candidate().expect(
|
||||
"Expected one candidate selected out of multiple valid data options, but got none",
|
||||
);
|
||||
|
||||
assert_eq!(next_candidate.safekeeper_id, NodeId(1));
|
||||
assert_eq!(
|
||||
next_candidate.reason,
|
||||
ReconnectReason::SwitchAvailabilityZone,
|
||||
"Should switch to the safekeeper in the same availability zone, if it has the same commit_lsn"
|
||||
);
|
||||
assert_eq!(
|
||||
next_candidate.wal_source_connconf.host(),
|
||||
&Host::Domain("same_az".to_owned())
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -242,6 +242,7 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
|
||||
safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
|
||||
backup_lsn: sk_info.backup_lsn.0,
|
||||
local_start_lsn: sk_info.local_start_lsn.0,
|
||||
availability_zone: None,
|
||||
};
|
||||
|
||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
||||
|
||||
@@ -337,6 +337,7 @@ impl SharedState {
|
||||
safekeeper_connstr: conf.listen_pg_addr.clone(),
|
||||
backup_lsn: self.sk.inmem.backup_lsn.0,
|
||||
local_start_lsn: self.sk.state.local_start_lsn.0,
|
||||
availability_zone: conf.availability_zone.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -133,6 +133,7 @@ async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
|
||||
peer_horizon_lsn: 5,
|
||||
safekeeper_connstr: "zenith-1-sk-1.local:7676".to_owned(),
|
||||
local_start_lsn: 0,
|
||||
availability_zone: None,
|
||||
};
|
||||
counter += 1;
|
||||
yield info;
|
||||
|
||||
@@ -36,6 +36,8 @@ message SafekeeperTimelineInfo {
|
||||
uint64 local_start_lsn = 9;
|
||||
// A connection string to use for WAL receiving.
|
||||
string safekeeper_connstr = 10;
|
||||
// Availability zone of a safekeeper.
|
||||
optional string availability_zone = 11;
|
||||
}
|
||||
|
||||
message TenantTimelineId {
|
||||
|
||||
@@ -525,6 +525,7 @@ mod tests {
|
||||
peer_horizon_lsn: 5,
|
||||
safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(),
|
||||
local_start_lsn: 0,
|
||||
availability_zone: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user