diff --git a/README.md b/README.md index 43f3e3a02b..55df67f6c7 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,8 @@ pacman -S base-devel readline zlib libseccomp openssl clang \ postgresql-libs cmake postgresql protobuf ``` +Building Neon requires 3.15+ version of `protoc` (protobuf-compiler). If your distribution provides an older version, you can install a newer version from [here](https://github.com/protocolbuffers/protobuf/releases). + 2. [Install Rust](https://www.rust-lang.org/tools/install) ``` # recommended approach from https://www.rust-lang.org/tools/install diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 0c770136db..de07676ffe 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -237,11 +237,7 @@ async fn connection_manager_loop_step( if let Some(new_candidate) = walreceiver_state.next_connection_candidate() { info!("Switching to new connection candidate: {new_candidate:?}"); walreceiver_state - .change_connection( - new_candidate.safekeeper_id, - new_candidate.wal_source_connconf, - ctx, - ) + .change_connection(new_candidate, ctx) .await } } @@ -346,6 +342,8 @@ struct WalConnection { started_at: NaiveDateTime, /// Current safekeeper pageserver is connected to for WAL streaming. sk_id: NodeId, + /// Availability zone of the safekeeper. + availability_zone: Option, /// Status of the connection. status: WalConnectionStatus, /// WAL streaming task handle. @@ -405,12 +403,7 @@ impl WalreceiverState { } /// Shuts down the current connection (if any) and immediately starts another one with the given connection string. - async fn change_connection( - &mut self, - new_sk_id: NodeId, - new_wal_source_connconf: PgConnectionConfig, - ctx: &RequestContext, - ) { + async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &RequestContext) { self.drop_old_connection(true).await; let id = self.id; @@ -424,7 +417,7 @@ impl WalreceiverState { async move { super::walreceiver_connection::handle_walreceiver_connection( timeline, - new_wal_source_connconf, + new_sk.wal_source_connconf, events_sender, cancellation, connect_timeout, @@ -433,13 +426,16 @@ impl WalreceiverState { .await .context("walreceiver connection handling failure") } - .instrument(info_span!("walreceiver_connection", id = %id, node_id = %new_sk_id)) + .instrument( + info_span!("walreceiver_connection", id = %id, node_id = %new_sk.safekeeper_id), + ) }); let now = Utc::now().naive_utc(); self.wal_connection = Some(WalConnection { started_at: now, - sk_id: new_sk_id, + sk_id: new_sk.safekeeper_id, + availability_zone: new_sk.availability_zone, status: WalConnectionStatus { is_connected: false, has_processed_wal: false, @@ -546,6 +542,7 @@ impl WalreceiverState { /// * if connected safekeeper is not present, pick the candidate /// * if we haven't received any updates for some time, pick the candidate /// * if the candidate commit_lsn is much higher than the current one, pick the candidate + /// * if the candidate commit_lsn is same, but candidate is located in the same AZ as the pageserver, pick the candidate /// * if connected safekeeper stopped sending us new WAL which is available on other safekeeper, pick the candidate /// /// This way we ensure to keep up with the most up-to-date safekeeper and don't try to jump from one safekeeper to another too frequently. @@ -559,6 +556,7 @@ impl WalreceiverState { let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) = self.select_connection_candidate(Some(connected_sk_node))?; + let new_availability_zone = new_safekeeper_broker_data.availability_zone.clone(); let now = Utc::now().naive_utc(); if let Ok(latest_interaciton) = @@ -569,6 +567,7 @@ impl WalreceiverState { return Some(NewWalConnectionCandidate { safekeeper_id: new_sk_id, wal_source_connconf: new_wal_source_connconf, + availability_zone: new_availability_zone, reason: ReconnectReason::NoKeepAlives { last_keep_alive: Some( existing_wal_connection.status.latest_connection_update, @@ -594,6 +593,7 @@ impl WalreceiverState { return Some(NewWalConnectionCandidate { safekeeper_id: new_sk_id, wal_source_connconf: new_wal_source_connconf, + availability_zone: new_availability_zone, reason: ReconnectReason::LaggingWal { current_commit_lsn, new_commit_lsn, @@ -601,6 +601,20 @@ impl WalreceiverState { }, }); } + // If we have a candidate with the same commit_lsn as the current one, which is in the same AZ as pageserver, + // and the current one is not, switch to the new one. + if self.availability_zone.is_some() + && existing_wal_connection.availability_zone + != self.availability_zone + && self.availability_zone == new_availability_zone + { + return Some(NewWalConnectionCandidate { + safekeeper_id: new_sk_id, + availability_zone: new_availability_zone, + wal_source_connconf: new_wal_source_connconf, + reason: ReconnectReason::SwitchAvailabilityZone, + }); + } } None => debug!( "Best SK candidate has its commit_lsn behind connected SK's commit_lsn" @@ -668,6 +682,7 @@ impl WalreceiverState { return Some(NewWalConnectionCandidate { safekeeper_id: new_sk_id, wal_source_connconf: new_wal_source_connconf, + availability_zone: new_availability_zone, reason: ReconnectReason::NoWalTimeout { current_lsn, current_commit_lsn, @@ -686,10 +701,11 @@ impl WalreceiverState { self.wal_connection.as_mut().unwrap().discovered_new_wal = discovered_new_wal; } None => { - let (new_sk_id, _, new_wal_source_connconf) = + let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) = self.select_connection_candidate(None)?; return Some(NewWalConnectionCandidate { safekeeper_id: new_sk_id, + availability_zone: new_safekeeper_broker_data.availability_zone.clone(), wal_source_connconf: new_wal_source_connconf, reason: ReconnectReason::NoExistingConnection, }); @@ -794,6 +810,7 @@ impl WalreceiverState { struct NewWalConnectionCandidate { safekeeper_id: NodeId, wal_source_connconf: PgConnectionConfig, + availability_zone: Option, // This field is used in `derive(Debug)` only. #[allow(dead_code)] reason: ReconnectReason, @@ -808,6 +825,7 @@ enum ReconnectReason { new_commit_lsn: Lsn, threshold: NonZeroU64, }, + SwitchAvailabilityZone, NoWalTimeout { current_lsn: Lsn, current_commit_lsn: Lsn, @@ -873,6 +891,7 @@ mod tests { peer_horizon_lsn: 0, local_start_lsn: 0, safekeeper_connstr: safekeeper_connstr.to_owned(), + availability_zone: None, }, latest_update, } @@ -933,6 +952,7 @@ mod tests { state.wal_connection = Some(WalConnection { started_at: now, sk_id: connected_sk_id, + availability_zone: None, status: connection_status, connection_task: TaskHandle::spawn(move |sender, _| async move { sender @@ -1095,6 +1115,7 @@ mod tests { state.wal_connection = Some(WalConnection { started_at: now, sk_id: connected_sk_id, + availability_zone: None, status: connection_status, connection_task: TaskHandle::spawn(move |sender, _| async move { sender @@ -1160,6 +1181,7 @@ mod tests { state.wal_connection = Some(WalConnection { started_at: now, sk_id: NodeId(1), + availability_zone: None, status: connection_status, connection_task: TaskHandle::spawn(move |sender, _| async move { sender @@ -1222,6 +1244,7 @@ mod tests { state.wal_connection = Some(WalConnection { started_at: now, sk_id: NodeId(1), + availability_zone: None, status: connection_status, connection_task: TaskHandle::spawn(move |_, _| async move { Ok(()) }), discovered_new_wal: Some(NewCommittedWAL { @@ -1289,4 +1312,74 @@ mod tests { availability_zone: None, } } + + #[tokio::test] + async fn switch_to_same_availability_zone() -> anyhow::Result<()> { + // Pageserver and one of safekeepers will be in the same availability zone + // and pageserver should prefer to connect to it. + let test_az = Some("test_az".to_owned()); + + let harness = TenantHarness::create("switch_to_same_availability_zone")?; + let mut state = dummy_state(&harness).await; + state.availability_zone = test_az.clone(); + let current_lsn = Lsn(100_000).align(); + let now = Utc::now().naive_utc(); + + let connected_sk_id = NodeId(0); + + let connection_status = WalConnectionStatus { + is_connected: true, + has_processed_wal: true, + latest_connection_update: now, + latest_wal_update: now, + commit_lsn: Some(current_lsn), + streaming_lsn: Some(current_lsn), + }; + + state.wal_connection = Some(WalConnection { + started_at: now, + sk_id: connected_sk_id, + availability_zone: None, + status: connection_status, + connection_task: TaskHandle::spawn(move |sender, _| async move { + sender + .send(TaskStateUpdate::Progress(connection_status)) + .ok(); + Ok(()) + }), + discovered_new_wal: None, + }); + + // We have another safekeeper with the same commit_lsn, and it have the same availability zone as + // the current pageserver. + let mut same_az_sk = dummy_broker_sk_timeline(current_lsn.0, "same_az", now); + same_az_sk.timeline.availability_zone = test_az.clone(); + + state.wal_stream_candidates = HashMap::from([ + ( + connected_sk_id, + dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now), + ), + (NodeId(1), same_az_sk), + ]); + + // We expect that pageserver will switch to the safekeeper in the same availability zone, + // even if it has the same commit_lsn. + let next_candidate = state.next_connection_candidate().expect( + "Expected one candidate selected out of multiple valid data options, but got none", + ); + + assert_eq!(next_candidate.safekeeper_id, NodeId(1)); + assert_eq!( + next_candidate.reason, + ReconnectReason::SwitchAvailabilityZone, + "Should switch to the safekeeper in the same availability zone, if it has the same commit_lsn" + ); + assert_eq!( + next_candidate.wal_source_connconf.host(), + &Host::Domain("same_az".to_owned()) + ); + + Ok(()) + } } diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 14badebd95..cdec45c148 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -242,6 +242,7 @@ async fn record_safekeeper_info(mut request: Request) -> Result, n_keys: u64) { peer_horizon_lsn: 5, safekeeper_connstr: "zenith-1-sk-1.local:7676".to_owned(), local_start_lsn: 0, + availability_zone: None, }; counter += 1; yield info; diff --git a/storage_broker/proto/broker.proto b/storage_broker/proto/broker.proto index 1a46896d02..4b2de1a8e5 100644 --- a/storage_broker/proto/broker.proto +++ b/storage_broker/proto/broker.proto @@ -36,9 +36,11 @@ message SafekeeperTimelineInfo { uint64 local_start_lsn = 9; // A connection string to use for WAL receiving. string safekeeper_connstr = 10; + // Availability zone of a safekeeper. + optional string availability_zone = 11; } message TenantTimelineId { bytes tenant_id = 1; bytes timeline_id = 2; -} \ No newline at end of file +} diff --git a/storage_broker/src/bin/storage_broker.rs b/storage_broker/src/bin/storage_broker.rs index 57f975b0df..d7ace28426 100644 --- a/storage_broker/src/bin/storage_broker.rs +++ b/storage_broker/src/bin/storage_broker.rs @@ -525,6 +525,7 @@ mod tests { peer_horizon_lsn: 5, safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(), local_start_lsn: 0, + availability_zone: None, } }