From d903dd61bd243d37d086f50df19eae5d619b4b40 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 29 Jul 2022 09:09:22 +0300 Subject: [PATCH] Rename 'wal_producer_connstr' to 'wal_source_connstr'. What the WAL receiver really connects to is the safekeeper. The "producer" term is a bit misleading, as the safekeeper doesn't produce the WAL, the compute node does. This change also applies to the name of the field used in the mgmt API in in the response of the '/v1/tenant/:tenant_id/timeline/:timeline_id/wal_receiver' endpoint. AFAICS that's not used anywhere else than one python test, so it should be OK to change it. --- pageserver/src/http/models.rs | 2 +- pageserver/src/http/openapi_spec.yml | 4 +-- pageserver/src/walreceiver.rs | 2 +- .../src/walreceiver/connection_manager.rs | 32 +++++++++---------- .../src/walreceiver/walreceiver_connection.rs | 12 +++---- .../batch_others/test_pageserver_api.py | 4 +-- 6 files changed, 28 insertions(+), 28 deletions(-) diff --git a/pageserver/src/http/models.rs b/pageserver/src/http/models.rs index c947cebcb6..dc9db3a62f 100644 --- a/pageserver/src/http/models.rs +++ b/pageserver/src/http/models.rs @@ -102,7 +102,7 @@ impl TenantConfigRequest { #[serde_as] #[derive(Debug, Serialize, Deserialize, Clone)] pub struct WalReceiverEntry { - pub wal_producer_connstr: Option, + pub wal_source_connstr: Option, #[serde_as(as = "Option")] pub last_received_msg_lsn: Option, /// the timestamp (in microseconds) of the last received message diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index ed190db43a..2109fcbe5b 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -694,11 +694,11 @@ components: type: object required: - thread_id - - wal_producer_connstr + - wal_source_connstr properties: thread_id: type: integer - wal_producer_connstr: + wal_source_connstr: type: string last_received_msg_lsn: type: string diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index c36343db17..946230b0d3 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -326,7 +326,7 @@ async fn wal_receiver_main_thread_loop_step<'a>( WAL_RECEIVER_ENTRIES.write().await.insert( id, WalReceiverEntry { - wal_producer_connstr: None, + wal_source_connstr: None, last_received_msg_lsn: None, last_received_msg_ts: None, }, diff --git a/pageserver/src/walreceiver/connection_manager.rs b/pageserver/src/walreceiver/connection_manager.rs index f2b1671eb4..ae1c787517 100644 --- a/pageserver/src/walreceiver/connection_manager.rs +++ b/pageserver/src/walreceiver/connection_manager.rs @@ -168,7 +168,7 @@ async fn connection_manager_loop_step( walreceiver_state .change_connection( new_candidate.safekeeper_id, - new_candidate.wal_producer_connstr, + new_candidate.wal_source_connstr, ) .await } @@ -302,7 +302,7 @@ impl WalreceiverState { } /// Shuts down the current connection (if any) and immediately starts another one with the given connection string. - async fn change_connection(&mut self, new_sk_id: NodeId, new_wal_producer_connstr: String) { + async fn change_connection(&mut self, new_sk_id: NodeId, new_wal_source_connstr: String) { if let Some(old_connection) = self.wal_connection.take() { old_connection.connection_task.shutdown().await } @@ -324,7 +324,7 @@ impl WalreceiverState { .await; super::walreceiver_connection::handle_walreceiver_connection( id, - &new_wal_producer_connstr, + &new_wal_source_connstr, events_sender.as_ref(), cancellation, connect_timeout, @@ -387,7 +387,7 @@ impl WalreceiverState { Some(existing_wal_connection) => { let connected_sk_node = existing_wal_connection.sk_id; - let (new_sk_id, new_safekeeper_etcd_data, new_wal_producer_connstr) = + let (new_sk_id, new_safekeeper_etcd_data, new_wal_source_connstr) = self.select_connection_candidate(Some(connected_sk_node))?; let now = Utc::now().naive_utc(); @@ -397,7 +397,7 @@ impl WalreceiverState { if latest_interaciton > self.lagging_wal_timeout { return Some(NewWalConnectionCandidate { safekeeper_id: new_sk_id, - wal_producer_connstr: new_wal_producer_connstr, + wal_source_connstr: new_wal_source_connstr, reason: ReconnectReason::NoWalTimeout { last_wal_interaction: Some( existing_wal_connection.latest_connection_update, @@ -423,7 +423,7 @@ impl WalreceiverState { return Some( NewWalConnectionCandidate { safekeeper_id: new_sk_id, - wal_producer_connstr: new_wal_producer_connstr, + wal_source_connstr: new_wal_source_connstr, reason: ReconnectReason::LaggingWal { current_lsn, new_lsn, threshold: self.max_lsn_wal_lag }, }); } @@ -434,18 +434,18 @@ impl WalreceiverState { None => { return Some(NewWalConnectionCandidate { safekeeper_id: new_sk_id, - wal_producer_connstr: new_wal_producer_connstr, + wal_source_connstr: new_wal_source_connstr, reason: ReconnectReason::NoEtcdDataForExistingConnection, }) } } } None => { - let (new_sk_id, _, new_wal_producer_connstr) = + let (new_sk_id, _, new_wal_source_connstr) = self.select_connection_candidate(None)?; return Some(NewWalConnectionCandidate { safekeeper_id: new_sk_id, - wal_producer_connstr: new_wal_producer_connstr, + wal_source_connstr: new_wal_source_connstr, reason: ReconnectReason::NoExistingConnection, }); } @@ -546,7 +546,7 @@ impl WalreceiverState { #[derive(Debug, PartialEq, Eq)] struct NewWalConnectionCandidate { safekeeper_id: NodeId, - wal_producer_connstr: String, + wal_source_connstr: String, reason: ReconnectReason, } @@ -803,7 +803,7 @@ mod tests { "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold" ); assert!(only_candidate - .wal_producer_connstr + .wal_source_connstr .contains(DUMMY_SAFEKEEPER_CONNSTR)); let selected_lsn = 100_000; @@ -868,7 +868,7 @@ mod tests { "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold" ); assert!(biggest_wal_candidate - .wal_producer_connstr + .wal_source_connstr .contains(DUMMY_SAFEKEEPER_CONNSTR)); Ok(()) @@ -985,7 +985,7 @@ mod tests { "Should select new safekeeper due to missing etcd data, even if there's an existing connection with this safekeeper" ); assert!(only_candidate - .wal_producer_connstr + .wal_source_connstr .contains(DUMMY_SAFEKEEPER_CONNSTR)); Ok(()) @@ -1067,7 +1067,7 @@ mod tests { "Should select bigger WAL safekeeper if it starts to lag enough" ); assert!(over_threshcurrent_candidate - .wal_producer_connstr + .wal_source_connstr .contains("advanced by Lsn safekeeper")); Ok(()) @@ -1134,7 +1134,7 @@ mod tests { unexpected => panic!("Unexpected reason: {unexpected:?}"), } assert!(over_threshcurrent_candidate - .wal_producer_connstr + .wal_source_connstr .contains(DUMMY_SAFEKEEPER_CONNSTR)); Ok(()) @@ -1190,7 +1190,7 @@ mod tests { unexpected => panic!("Unexpected reason: {unexpected:?}"), } assert!(over_threshcurrent_candidate - .wal_producer_connstr + .wal_source_connstr .contains(DUMMY_SAFEKEEPER_CONNSTR)); Ok(()) diff --git a/pageserver/src/walreceiver/walreceiver_connection.rs b/pageserver/src/walreceiver/walreceiver_connection.rs index ca29c00771..8846e27256 100644 --- a/pageserver/src/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -29,18 +29,18 @@ use crate::{ use postgres_ffi::waldecoder::WalStreamDecoder; use utils::{lsn::Lsn, pq_proto::ReplicationFeedback, zid::ZTenantTimelineId}; -/// Opens a conneciton to the given wal producer and streams the WAL, sending progress messages during streaming. +/// Open a connection to the given safekeeper and receive WAL, sending back progress +/// messages as we go. pub async fn handle_walreceiver_connection( id: ZTenantTimelineId, - wal_producer_connstr: &str, + wal_source_connstr: &str, events_sender: &watch::Sender>, mut cancellation: watch::Receiver<()>, connect_timeout: Duration, ) -> anyhow::Result<()> { // Connect to the database in replication mode. - info!("connecting to {wal_producer_connstr}"); - let connect_cfg = - format!("{wal_producer_connstr} application_name=pageserver replication=true"); + info!("connecting to {wal_source_connstr}"); + let connect_cfg = format!("{wal_source_connstr} application_name=pageserver replication=true"); let (mut replication_client, connection) = time::timeout( connect_timeout, @@ -237,7 +237,7 @@ pub async fn handle_walreceiver_connection( super::WAL_RECEIVER_ENTRIES.write().await.insert( id, WalReceiverEntry { - wal_producer_connstr: Some(wal_producer_connstr.to_owned()), + wal_source_connstr: Some(wal_source_connstr.to_owned()), last_received_msg_lsn: Some(last_lsn), last_received_msg_ts: Some( ts.duration_since(SystemTime::UNIX_EPOCH) diff --git a/test_runner/batch_others/test_pageserver_api.py b/test_runner/batch_others/test_pageserver_api.py index 7f9cb9493d..a298f1d701 100644 --- a/test_runner/batch_others/test_pageserver_api.py +++ b/test_runner/batch_others/test_pageserver_api.py @@ -65,7 +65,7 @@ def test_pageserver_http_get_wal_receiver_not_found(neon_simple_env: NeonEnv): empty_response = client.wal_receiver_get(tenant_id, timeline_id) - assert empty_response.get('wal_producer_connstr') is None, 'Should not be able to connect to WAL streaming without PG compute node running' + assert empty_response.get('wal_source_connstr') is None, 'Should not be able to connect to WAL streaming without PG compute node running' assert empty_response.get('last_received_msg_lsn') is None, 'Should not be able to connect to WAL streaming without PG compute node running' assert empty_response.get('last_received_msg_ts') is None, 'Should not be able to connect to WAL streaming without PG compute node running' @@ -82,7 +82,7 @@ def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv): # a successful `wal_receiver_get` response must contain the below fields assert list(res.keys()) == [ - "wal_producer_connstr", + "wal_source_connstr", "last_received_msg_lsn", "last_received_msg_ts", ]