mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-06 21:12:55 +00:00
Rename 'wal_producer_connstr' to 'wal_source_connstr'.
What the WAL receiver really connects to is the safekeeper. The "producer" term is a bit misleading, as the safekeeper doesn't produce the WAL, the compute node does. This change also applies to the name of the field used in the mgmt API in in the response of the '/v1/tenant/:tenant_id/timeline/:timeline_id/wal_receiver' endpoint. AFAICS that's not used anywhere else than one python test, so it should be OK to change it.
This commit is contained in:
@@ -102,7 +102,7 @@ impl TenantConfigRequest {
|
||||
#[serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct WalReceiverEntry {
|
||||
pub wal_producer_connstr: Option<String>,
|
||||
pub wal_source_connstr: Option<String>,
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
pub last_received_msg_lsn: Option<Lsn>,
|
||||
/// the timestamp (in microseconds) of the last received message
|
||||
|
||||
@@ -694,11 +694,11 @@ components:
|
||||
type: object
|
||||
required:
|
||||
- thread_id
|
||||
- wal_producer_connstr
|
||||
- wal_source_connstr
|
||||
properties:
|
||||
thread_id:
|
||||
type: integer
|
||||
wal_producer_connstr:
|
||||
wal_source_connstr:
|
||||
type: string
|
||||
last_received_msg_lsn:
|
||||
type: string
|
||||
|
||||
@@ -326,7 +326,7 @@ async fn wal_receiver_main_thread_loop_step<'a>(
|
||||
WAL_RECEIVER_ENTRIES.write().await.insert(
|
||||
id,
|
||||
WalReceiverEntry {
|
||||
wal_producer_connstr: None,
|
||||
wal_source_connstr: None,
|
||||
last_received_msg_lsn: None,
|
||||
last_received_msg_ts: None,
|
||||
},
|
||||
|
||||
@@ -168,7 +168,7 @@ async fn connection_manager_loop_step(
|
||||
walreceiver_state
|
||||
.change_connection(
|
||||
new_candidate.safekeeper_id,
|
||||
new_candidate.wal_producer_connstr,
|
||||
new_candidate.wal_source_connstr,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -302,7 +302,7 @@ impl WalreceiverState {
|
||||
}
|
||||
|
||||
/// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
|
||||
async fn change_connection(&mut self, new_sk_id: NodeId, new_wal_producer_connstr: String) {
|
||||
async fn change_connection(&mut self, new_sk_id: NodeId, new_wal_source_connstr: String) {
|
||||
if let Some(old_connection) = self.wal_connection.take() {
|
||||
old_connection.connection_task.shutdown().await
|
||||
}
|
||||
@@ -324,7 +324,7 @@ impl WalreceiverState {
|
||||
.await;
|
||||
super::walreceiver_connection::handle_walreceiver_connection(
|
||||
id,
|
||||
&new_wal_producer_connstr,
|
||||
&new_wal_source_connstr,
|
||||
events_sender.as_ref(),
|
||||
cancellation,
|
||||
connect_timeout,
|
||||
@@ -387,7 +387,7 @@ impl WalreceiverState {
|
||||
Some(existing_wal_connection) => {
|
||||
let connected_sk_node = existing_wal_connection.sk_id;
|
||||
|
||||
let (new_sk_id, new_safekeeper_etcd_data, new_wal_producer_connstr) =
|
||||
let (new_sk_id, new_safekeeper_etcd_data, new_wal_source_connstr) =
|
||||
self.select_connection_candidate(Some(connected_sk_node))?;
|
||||
|
||||
let now = Utc::now().naive_utc();
|
||||
@@ -397,7 +397,7 @@ impl WalreceiverState {
|
||||
if latest_interaciton > self.lagging_wal_timeout {
|
||||
return Some(NewWalConnectionCandidate {
|
||||
safekeeper_id: new_sk_id,
|
||||
wal_producer_connstr: new_wal_producer_connstr,
|
||||
wal_source_connstr: new_wal_source_connstr,
|
||||
reason: ReconnectReason::NoWalTimeout {
|
||||
last_wal_interaction: Some(
|
||||
existing_wal_connection.latest_connection_update,
|
||||
@@ -423,7 +423,7 @@ impl WalreceiverState {
|
||||
return Some(
|
||||
NewWalConnectionCandidate {
|
||||
safekeeper_id: new_sk_id,
|
||||
wal_producer_connstr: new_wal_producer_connstr,
|
||||
wal_source_connstr: new_wal_source_connstr,
|
||||
reason: ReconnectReason::LaggingWal { current_lsn, new_lsn, threshold: self.max_lsn_wal_lag },
|
||||
});
|
||||
}
|
||||
@@ -434,18 +434,18 @@ impl WalreceiverState {
|
||||
None => {
|
||||
return Some(NewWalConnectionCandidate {
|
||||
safekeeper_id: new_sk_id,
|
||||
wal_producer_connstr: new_wal_producer_connstr,
|
||||
wal_source_connstr: new_wal_source_connstr,
|
||||
reason: ReconnectReason::NoEtcdDataForExistingConnection,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
let (new_sk_id, _, new_wal_producer_connstr) =
|
||||
let (new_sk_id, _, new_wal_source_connstr) =
|
||||
self.select_connection_candidate(None)?;
|
||||
return Some(NewWalConnectionCandidate {
|
||||
safekeeper_id: new_sk_id,
|
||||
wal_producer_connstr: new_wal_producer_connstr,
|
||||
wal_source_connstr: new_wal_source_connstr,
|
||||
reason: ReconnectReason::NoExistingConnection,
|
||||
});
|
||||
}
|
||||
@@ -546,7 +546,7 @@ impl WalreceiverState {
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
struct NewWalConnectionCandidate {
|
||||
safekeeper_id: NodeId,
|
||||
wal_producer_connstr: String,
|
||||
wal_source_connstr: String,
|
||||
reason: ReconnectReason,
|
||||
}
|
||||
|
||||
@@ -803,7 +803,7 @@ mod tests {
|
||||
"Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
|
||||
);
|
||||
assert!(only_candidate
|
||||
.wal_producer_connstr
|
||||
.wal_source_connstr
|
||||
.contains(DUMMY_SAFEKEEPER_CONNSTR));
|
||||
|
||||
let selected_lsn = 100_000;
|
||||
@@ -868,7 +868,7 @@ mod tests {
|
||||
"Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
|
||||
);
|
||||
assert!(biggest_wal_candidate
|
||||
.wal_producer_connstr
|
||||
.wal_source_connstr
|
||||
.contains(DUMMY_SAFEKEEPER_CONNSTR));
|
||||
|
||||
Ok(())
|
||||
@@ -985,7 +985,7 @@ mod tests {
|
||||
"Should select new safekeeper due to missing etcd data, even if there's an existing connection with this safekeeper"
|
||||
);
|
||||
assert!(only_candidate
|
||||
.wal_producer_connstr
|
||||
.wal_source_connstr
|
||||
.contains(DUMMY_SAFEKEEPER_CONNSTR));
|
||||
|
||||
Ok(())
|
||||
@@ -1067,7 +1067,7 @@ mod tests {
|
||||
"Should select bigger WAL safekeeper if it starts to lag enough"
|
||||
);
|
||||
assert!(over_threshcurrent_candidate
|
||||
.wal_producer_connstr
|
||||
.wal_source_connstr
|
||||
.contains("advanced by Lsn safekeeper"));
|
||||
|
||||
Ok(())
|
||||
@@ -1134,7 +1134,7 @@ mod tests {
|
||||
unexpected => panic!("Unexpected reason: {unexpected:?}"),
|
||||
}
|
||||
assert!(over_threshcurrent_candidate
|
||||
.wal_producer_connstr
|
||||
.wal_source_connstr
|
||||
.contains(DUMMY_SAFEKEEPER_CONNSTR));
|
||||
|
||||
Ok(())
|
||||
@@ -1190,7 +1190,7 @@ mod tests {
|
||||
unexpected => panic!("Unexpected reason: {unexpected:?}"),
|
||||
}
|
||||
assert!(over_threshcurrent_candidate
|
||||
.wal_producer_connstr
|
||||
.wal_source_connstr
|
||||
.contains(DUMMY_SAFEKEEPER_CONNSTR));
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -29,18 +29,18 @@ use crate::{
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use utils::{lsn::Lsn, pq_proto::ReplicationFeedback, zid::ZTenantTimelineId};
|
||||
|
||||
/// Opens a conneciton to the given wal producer and streams the WAL, sending progress messages during streaming.
|
||||
/// Open a connection to the given safekeeper and receive WAL, sending back progress
|
||||
/// messages as we go.
|
||||
pub async fn handle_walreceiver_connection(
|
||||
id: ZTenantTimelineId,
|
||||
wal_producer_connstr: &str,
|
||||
wal_source_connstr: &str,
|
||||
events_sender: &watch::Sender<TaskEvent<ReplicationFeedback>>,
|
||||
mut cancellation: watch::Receiver<()>,
|
||||
connect_timeout: Duration,
|
||||
) -> anyhow::Result<()> {
|
||||
// Connect to the database in replication mode.
|
||||
info!("connecting to {wal_producer_connstr}");
|
||||
let connect_cfg =
|
||||
format!("{wal_producer_connstr} application_name=pageserver replication=true");
|
||||
info!("connecting to {wal_source_connstr}");
|
||||
let connect_cfg = format!("{wal_source_connstr} application_name=pageserver replication=true");
|
||||
|
||||
let (mut replication_client, connection) = time::timeout(
|
||||
connect_timeout,
|
||||
@@ -237,7 +237,7 @@ pub async fn handle_walreceiver_connection(
|
||||
super::WAL_RECEIVER_ENTRIES.write().await.insert(
|
||||
id,
|
||||
WalReceiverEntry {
|
||||
wal_producer_connstr: Some(wal_producer_connstr.to_owned()),
|
||||
wal_source_connstr: Some(wal_source_connstr.to_owned()),
|
||||
last_received_msg_lsn: Some(last_lsn),
|
||||
last_received_msg_ts: Some(
|
||||
ts.duration_since(SystemTime::UNIX_EPOCH)
|
||||
|
||||
@@ -65,7 +65,7 @@ def test_pageserver_http_get_wal_receiver_not_found(neon_simple_env: NeonEnv):
|
||||
|
||||
empty_response = client.wal_receiver_get(tenant_id, timeline_id)
|
||||
|
||||
assert empty_response.get('wal_producer_connstr') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
|
||||
assert empty_response.get('wal_source_connstr') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
|
||||
assert empty_response.get('last_received_msg_lsn') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
|
||||
assert empty_response.get('last_received_msg_ts') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
|
||||
|
||||
@@ -82,7 +82,7 @@ def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv):
|
||||
|
||||
# a successful `wal_receiver_get` response must contain the below fields
|
||||
assert list(res.keys()) == [
|
||||
"wal_producer_connstr",
|
||||
"wal_source_connstr",
|
||||
"last_received_msg_lsn",
|
||||
"last_received_msg_ts",
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user