diff --git a/pageserver/src/http/models.rs b/pageserver/src/http/models.rs index c947cebcb6..dc9db3a62f 100644 --- a/pageserver/src/http/models.rs +++ b/pageserver/src/http/models.rs @@ -102,7 +102,7 @@ impl TenantConfigRequest { #[serde_as] #[derive(Debug, Serialize, Deserialize, Clone)] pub struct WalReceiverEntry { - pub wal_producer_connstr: Option, + pub wal_source_connstr: Option, #[serde_as(as = "Option")] pub last_received_msg_lsn: Option, /// the timestamp (in microseconds) of the last received message diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index ed190db43a..2109fcbe5b 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -694,11 +694,11 @@ components: type: object required: - thread_id - - wal_producer_connstr + - wal_source_connstr properties: thread_id: type: integer - wal_producer_connstr: + wal_source_connstr: type: string last_received_msg_lsn: type: string diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index c36343db17..946230b0d3 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -326,7 +326,7 @@ async fn wal_receiver_main_thread_loop_step<'a>( WAL_RECEIVER_ENTRIES.write().await.insert( id, WalReceiverEntry { - wal_producer_connstr: None, + wal_source_connstr: None, last_received_msg_lsn: None, last_received_msg_ts: None, }, diff --git a/pageserver/src/walreceiver/connection_manager.rs b/pageserver/src/walreceiver/connection_manager.rs index f2b1671eb4..ae1c787517 100644 --- a/pageserver/src/walreceiver/connection_manager.rs +++ b/pageserver/src/walreceiver/connection_manager.rs @@ -168,7 +168,7 @@ async fn connection_manager_loop_step( walreceiver_state .change_connection( new_candidate.safekeeper_id, - new_candidate.wal_producer_connstr, + new_candidate.wal_source_connstr, ) .await } @@ -302,7 +302,7 @@ impl WalreceiverState { } /// Shuts down the current connection (if any) and immediately starts another one with the given connection string. - async fn change_connection(&mut self, new_sk_id: NodeId, new_wal_producer_connstr: String) { + async fn change_connection(&mut self, new_sk_id: NodeId, new_wal_source_connstr: String) { if let Some(old_connection) = self.wal_connection.take() { old_connection.connection_task.shutdown().await } @@ -324,7 +324,7 @@ impl WalreceiverState { .await; super::walreceiver_connection::handle_walreceiver_connection( id, - &new_wal_producer_connstr, + &new_wal_source_connstr, events_sender.as_ref(), cancellation, connect_timeout, @@ -387,7 +387,7 @@ impl WalreceiverState { Some(existing_wal_connection) => { let connected_sk_node = existing_wal_connection.sk_id; - let (new_sk_id, new_safekeeper_etcd_data, new_wal_producer_connstr) = + let (new_sk_id, new_safekeeper_etcd_data, new_wal_source_connstr) = self.select_connection_candidate(Some(connected_sk_node))?; let now = Utc::now().naive_utc(); @@ -397,7 +397,7 @@ impl WalreceiverState { if latest_interaciton > self.lagging_wal_timeout { return Some(NewWalConnectionCandidate { safekeeper_id: new_sk_id, - wal_producer_connstr: new_wal_producer_connstr, + wal_source_connstr: new_wal_source_connstr, reason: ReconnectReason::NoWalTimeout { last_wal_interaction: Some( existing_wal_connection.latest_connection_update, @@ -423,7 +423,7 @@ impl WalreceiverState { return Some( NewWalConnectionCandidate { safekeeper_id: new_sk_id, - wal_producer_connstr: new_wal_producer_connstr, + wal_source_connstr: new_wal_source_connstr, reason: ReconnectReason::LaggingWal { current_lsn, new_lsn, threshold: self.max_lsn_wal_lag }, }); } @@ -434,18 +434,18 @@ impl WalreceiverState { None => { return Some(NewWalConnectionCandidate { safekeeper_id: new_sk_id, - wal_producer_connstr: new_wal_producer_connstr, + wal_source_connstr: new_wal_source_connstr, reason: ReconnectReason::NoEtcdDataForExistingConnection, }) } } } None => { - let (new_sk_id, _, new_wal_producer_connstr) = + let (new_sk_id, _, new_wal_source_connstr) = self.select_connection_candidate(None)?; return Some(NewWalConnectionCandidate { safekeeper_id: new_sk_id, - wal_producer_connstr: new_wal_producer_connstr, + wal_source_connstr: new_wal_source_connstr, reason: ReconnectReason::NoExistingConnection, }); } @@ -546,7 +546,7 @@ impl WalreceiverState { #[derive(Debug, PartialEq, Eq)] struct NewWalConnectionCandidate { safekeeper_id: NodeId, - wal_producer_connstr: String, + wal_source_connstr: String, reason: ReconnectReason, } @@ -803,7 +803,7 @@ mod tests { "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold" ); assert!(only_candidate - .wal_producer_connstr + .wal_source_connstr .contains(DUMMY_SAFEKEEPER_CONNSTR)); let selected_lsn = 100_000; @@ -868,7 +868,7 @@ mod tests { "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold" ); assert!(biggest_wal_candidate - .wal_producer_connstr + .wal_source_connstr .contains(DUMMY_SAFEKEEPER_CONNSTR)); Ok(()) @@ -985,7 +985,7 @@ mod tests { "Should select new safekeeper due to missing etcd data, even if there's an existing connection with this safekeeper" ); assert!(only_candidate - .wal_producer_connstr + .wal_source_connstr .contains(DUMMY_SAFEKEEPER_CONNSTR)); Ok(()) @@ -1067,7 +1067,7 @@ mod tests { "Should select bigger WAL safekeeper if it starts to lag enough" ); assert!(over_threshcurrent_candidate - .wal_producer_connstr + .wal_source_connstr .contains("advanced by Lsn safekeeper")); Ok(()) @@ -1134,7 +1134,7 @@ mod tests { unexpected => panic!("Unexpected reason: {unexpected:?}"), } assert!(over_threshcurrent_candidate - .wal_producer_connstr + .wal_source_connstr .contains(DUMMY_SAFEKEEPER_CONNSTR)); Ok(()) @@ -1190,7 +1190,7 @@ mod tests { unexpected => panic!("Unexpected reason: {unexpected:?}"), } assert!(over_threshcurrent_candidate - .wal_producer_connstr + .wal_source_connstr .contains(DUMMY_SAFEKEEPER_CONNSTR)); Ok(()) diff --git a/pageserver/src/walreceiver/walreceiver_connection.rs b/pageserver/src/walreceiver/walreceiver_connection.rs index ca29c00771..8846e27256 100644 --- a/pageserver/src/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -29,18 +29,18 @@ use crate::{ use postgres_ffi::waldecoder::WalStreamDecoder; use utils::{lsn::Lsn, pq_proto::ReplicationFeedback, zid::ZTenantTimelineId}; -/// Opens a conneciton to the given wal producer and streams the WAL, sending progress messages during streaming. +/// Open a connection to the given safekeeper and receive WAL, sending back progress +/// messages as we go. pub async fn handle_walreceiver_connection( id: ZTenantTimelineId, - wal_producer_connstr: &str, + wal_source_connstr: &str, events_sender: &watch::Sender>, mut cancellation: watch::Receiver<()>, connect_timeout: Duration, ) -> anyhow::Result<()> { // Connect to the database in replication mode. - info!("connecting to {wal_producer_connstr}"); - let connect_cfg = - format!("{wal_producer_connstr} application_name=pageserver replication=true"); + info!("connecting to {wal_source_connstr}"); + let connect_cfg = format!("{wal_source_connstr} application_name=pageserver replication=true"); let (mut replication_client, connection) = time::timeout( connect_timeout, @@ -237,7 +237,7 @@ pub async fn handle_walreceiver_connection( super::WAL_RECEIVER_ENTRIES.write().await.insert( id, WalReceiverEntry { - wal_producer_connstr: Some(wal_producer_connstr.to_owned()), + wal_source_connstr: Some(wal_source_connstr.to_owned()), last_received_msg_lsn: Some(last_lsn), last_received_msg_ts: Some( ts.duration_since(SystemTime::UNIX_EPOCH) diff --git a/test_runner/batch_others/test_pageserver_api.py b/test_runner/batch_others/test_pageserver_api.py index 7f9cb9493d..a298f1d701 100644 --- a/test_runner/batch_others/test_pageserver_api.py +++ b/test_runner/batch_others/test_pageserver_api.py @@ -65,7 +65,7 @@ def test_pageserver_http_get_wal_receiver_not_found(neon_simple_env: NeonEnv): empty_response = client.wal_receiver_get(tenant_id, timeline_id) - assert empty_response.get('wal_producer_connstr') is None, 'Should not be able to connect to WAL streaming without PG compute node running' + assert empty_response.get('wal_source_connstr') is None, 'Should not be able to connect to WAL streaming without PG compute node running' assert empty_response.get('last_received_msg_lsn') is None, 'Should not be able to connect to WAL streaming without PG compute node running' assert empty_response.get('last_received_msg_ts') is None, 'Should not be able to connect to WAL streaming without PG compute node running' @@ -82,7 +82,7 @@ def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv): # a successful `wal_receiver_get` response must contain the below fields assert list(res.keys()) == [ - "wal_producer_connstr", + "wal_source_connstr", "last_received_msg_lsn", "last_received_msg_ts", ]