Remove pageserver_connstr from WAL stream logic

This commit is contained in:
Kirill Bulatov
2022-06-02 23:26:28 +03:00
committed by Kirill Bulatov
parent 70a53c4b03
commit 2623193876
6 changed files with 13 additions and 116 deletions

View File

@@ -57,8 +57,6 @@ pub struct SkTimelineInfo {
pub peer_horizon_lsn: Option<Lsn>,
#[serde(default)]
pub safekeeper_connstr: Option<String>,
#[serde(default)]
pub pageserver_connstr: Option<String>,
}
#[derive(Debug, thiserror::Error)]

View File

@@ -659,7 +659,6 @@ impl WalConnectionManager {
match wal_stream_connection_string(
self.id,
info.safekeeper_connstr.as_deref()?,
info.pageserver_connstr.as_deref()?,
) {
Ok(connstr) => Some((sk_id, info, connstr)),
Err(e) => {
@@ -749,7 +748,6 @@ fn wal_stream_connection_string(
timeline_id,
}: ZTenantTimelineId,
listen_pg_addr_str: &str,
pageserver_connstr: &str,
) -> anyhow::Result<String> {
let sk_connstr = format!("postgresql://no_user@{listen_pg_addr_str}/no_db");
let me_conf = sk_connstr
@@ -759,7 +757,7 @@ fn wal_stream_connection_string(
})?;
let (host, port) = utils::connstring::connection_host_port(&me_conf);
Ok(format!(
"host={host} port={port} options='-c ztimelineid={timeline_id} ztenantid={tenant_id} pageserver_connstr={pageserver_connstr}'",
"host={host} port={port} options='-c ztimelineid={timeline_id} ztenantid={tenant_id}'"
))
}
@@ -792,20 +790,6 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: None,
pageserver_connstr: Some("no safekeeper_connstr".to_string()),
},
),
(
NodeId(1),
SkTimelineInfo {
last_log_term: None,
flush_lsn: None,
commit_lsn: Some(Lsn(1)),
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some("no pageserver_connstr".to_string()),
pageserver_connstr: None,
},
),
(
@@ -818,7 +802,6 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some("no commit_lsn".to_string()),
pageserver_connstr: Some("no commit_lsn (p)".to_string()),
},
),
(
@@ -831,7 +814,6 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some("no commit_lsn".to_string()),
pageserver_connstr: Some("no commit_lsn (p)".to_string()),
},
),
]));
@@ -887,7 +869,6 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()),
},
),
(
@@ -900,7 +881,6 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some("not advanced Lsn".to_string()),
pageserver_connstr: Some("not advanced Lsn (p)".to_string()),
},
),
(
@@ -915,7 +895,6 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some("not enough advanced Lsn".to_string()),
pageserver_connstr: Some("not enough advanced Lsn (p)".to_string()),
},
),
]));
@@ -947,7 +926,6 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()),
},
)]))
.expect("Expected one candidate selected out of the only data option, but got none");
@@ -960,9 +938,6 @@ mod tests {
assert!(only_candidate
.wal_producer_connstr
.contains(DUMMY_SAFEKEEPER_CONNSTR));
assert!(only_candidate
.wal_producer_connstr
.contains(DUMMY_PAGESERVER_CONNSTR));
let selected_lsn = 100_000;
let biggest_wal_candidate = data_manager_with_no_connection
@@ -977,7 +952,6 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some("smaller commit_lsn".to_string()),
pageserver_connstr: Some("smaller commit_lsn (p)".to_string()),
},
),
(
@@ -990,7 +964,6 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()),
},
),
(
@@ -1003,9 +976,6 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: None,
pageserver_connstr: Some(
"no safekeeper_connstr despite bigger commit_lsn".to_string(),
),
},
),
]))
@@ -1022,9 +992,6 @@ mod tests {
assert!(biggest_wal_candidate
.wal_producer_connstr
.contains(DUMMY_SAFEKEEPER_CONNSTR));
assert!(biggest_wal_candidate
.wal_producer_connstr
.contains(DUMMY_PAGESERVER_CONNSTR));
Ok(())
}
@@ -1071,7 +1038,6 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()),
},
),
(
@@ -1084,7 +1050,6 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some("advanced by Lsn safekeeper".to_string()),
pageserver_connstr: Some("advanced by Lsn safekeeper (p)".to_string()),
},
),
]);
@@ -1108,9 +1073,6 @@ mod tests {
assert!(over_threshcurrent_candidate
.wal_producer_connstr
.contains("advanced by Lsn safekeeper"));
assert!(over_threshcurrent_candidate
.wal_producer_connstr
.contains("advanced by Lsn safekeeper (p)"));
Ok(())
}
@@ -1146,7 +1108,6 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()),
},
)]))
.expect(
@@ -1168,9 +1129,6 @@ mod tests {
assert!(over_threshcurrent_candidate
.wal_producer_connstr
.contains(DUMMY_SAFEKEEPER_CONNSTR));
assert!(over_threshcurrent_candidate
.wal_producer_connstr
.contains(DUMMY_PAGESERVER_CONNSTR));
Ok(())
}
@@ -1197,7 +1155,6 @@ mod tests {
}
const DUMMY_SAFEKEEPER_CONNSTR: &str = "safekeeper_connstr";
const DUMMY_PAGESERVER_CONNSTR: &str = "pageserver_connstr";
// the function itself does not need async, but it spawns a tokio::task underneath hence neeed
// a runtime to not to panic
@@ -1205,9 +1162,8 @@ mod tests {
id: ZTenantTimelineId,
safekeeper_id: NodeId,
) -> WalConnectionData {
let dummy_connstr =
wal_stream_connection_string(id, DUMMY_SAFEKEEPER_CONNSTR, DUMMY_PAGESERVER_CONNSTR)
.expect("Failed to construct dummy wal producer connstr");
let dummy_connstr = wal_stream_connection_string(id, DUMMY_SAFEKEEPER_CONNSTR)
.expect("Failed to construct dummy wal producer connstr");
WalConnectionData {
safekeeper_id,
connection: WalReceiverConnection::open(

View File

@@ -29,12 +29,11 @@ pub struct SafekeeperPostgresHandler {
pub ztenantid: Option<ZTenantId>,
pub ztimelineid: Option<ZTimelineId>,
pub timeline: Option<Arc<Timeline>>,
pageserver_connstr: Option<String>,
}
/// Parsed Postgres command.
enum SafekeeperPostgresCommand {
StartWalPush { pageserver_connstr: Option<String> },
StartWalPush,
StartReplication { start_lsn: Lsn },
IdentifySystem,
JSONCtrl { cmd: AppendLogicalMessage },
@@ -42,11 +41,7 @@ enum SafekeeperPostgresCommand {
fn parse_cmd(cmd: &str) -> Result<SafekeeperPostgresCommand> {
if cmd.starts_with("START_WAL_PUSH") {
let re = Regex::new(r"START_WAL_PUSH(?: (.+))?").unwrap();
let caps = re.captures(cmd).unwrap();
let pageserver_connstr = caps.get(1).map(|m| m.as_str().to_owned());
Ok(SafekeeperPostgresCommand::StartWalPush { pageserver_connstr })
Ok(SafekeeperPostgresCommand::StartWalPush)
} else if cmd.starts_with("START_REPLICATION") {
let re =
Regex::new(r"START_REPLICATION(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)").unwrap();
@@ -86,8 +81,6 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler {
self.appname = Some(app_name.clone());
}
self.pageserver_connstr = params.get("pageserver_connstr").cloned();
Ok(())
} else {
bail!("Safekeeper received unexpected initial message: {:?}", sm);
@@ -113,14 +106,14 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler {
}
match cmd {
SafekeeperPostgresCommand::StartWalPush { pageserver_connstr } => {
ReceiveWalConn::new(pgb, pageserver_connstr)
SafekeeperPostgresCommand::StartWalPush => {
ReceiveWalConn::new(pgb)
.run(self)
.context("failed to run ReceiveWalConn")?;
}
SafekeeperPostgresCommand::StartReplication { start_lsn } => {
ReplicationConn::new(pgb)
.run(self, pgb, start_lsn, self.pageserver_connstr.clone())
.run(self, pgb, start_lsn)
.context("failed to run ReplicationConn")?;
}
SafekeeperPostgresCommand::IdentifySystem => {
@@ -142,7 +135,6 @@ impl SafekeeperPostgresHandler {
ztenantid: None,
ztimelineid: None,
timeline: None,
pageserver_connstr: None,
}
}

View File

@@ -32,22 +32,14 @@ pub struct ReceiveWalConn<'pg> {
pg_backend: &'pg mut PostgresBackend,
/// The cached result of `pg_backend.socket().peer_addr()` (roughly)
peer_addr: SocketAddr,
/// Pageserver connection string forwarded from compute
/// NOTE that it is allowed to operate without a pageserver.
/// So if compute has no pageserver configured do not use it.
pageserver_connstr: Option<String>,
}
impl<'pg> ReceiveWalConn<'pg> {
pub fn new(
pg: &'pg mut PostgresBackend,
pageserver_connstr: Option<String>,
) -> ReceiveWalConn<'pg> {
pub fn new(pg: &'pg mut PostgresBackend) -> ReceiveWalConn<'pg> {
let peer_addr = *pg.get_peer_addr();
ReceiveWalConn {
pg_backend: pg,
peer_addr,
pageserver_connstr,
}
}
@@ -120,9 +112,7 @@ impl<'pg> ReceiveWalConn<'pg> {
// Register the connection and defer unregister. Do that only
// after processing first message, as it sets wal_seg_size,
// wanted by many.
spg.timeline
.get()
.on_compute_connect(self.pageserver_connstr.as_ref())?;
spg.timeline.get().on_compute_connect()?;
_guard = Some(ComputeConnectionGuard {
timeline: Arc::clone(spg.timeline.get()),
});

View File

@@ -162,9 +162,8 @@ impl ReplicationConn {
spg: &mut SafekeeperPostgresHandler,
pgb: &mut PostgresBackend,
mut start_pos: Lsn,
pageserver_connstr: Option<String>,
) -> Result<()> {
let _enter = info_span!("WAL sender", timeline = %spg.ztimelineid.unwrap(), pageserver_connstr = %pageserver_connstr.as_deref().unwrap_or_default()).entered();
let _enter = info_span!("WAL sender", timeline = %spg.ztimelineid.unwrap()).entered();
// spawn the background thread which receives HotStandbyFeedback messages.
let bg_timeline = Arc::clone(spg.timeline.get());

View File

@@ -95,7 +95,6 @@ struct SharedState {
/// when tli is inactive instead of having this flag.
active: bool,
num_computes: u32,
pageserver_connstr: Option<String>,
last_removed_segno: XLogSegNo,
}
@@ -119,7 +118,6 @@ impl SharedState {
wal_backup_active: false,
active: false,
num_computes: 0,
pageserver_connstr: None,
last_removed_segno: 0,
})
}
@@ -139,7 +137,6 @@ impl SharedState {
wal_backup_active: false,
active: false,
num_computes: 0,
pageserver_connstr: None,
last_removed_segno: 0,
})
}
@@ -190,35 +187,6 @@ impl SharedState {
self.wal_backup_active
}
/// Activate timeline's walsender: start/change timeline information propagated into etcd for further pageserver connections.
fn activate_walsender(
&mut self,
zttid: &ZTenantTimelineId,
new_pageserver_connstr: Option<String>,
) {
if self.pageserver_connstr != new_pageserver_connstr {
self.deactivate_walsender(zttid);
if new_pageserver_connstr.is_some() {
info!(
"timeline {} has activated its walsender with connstr {new_pageserver_connstr:?}",
zttid.timeline_id,
);
}
self.pageserver_connstr = new_pageserver_connstr;
}
}
/// Deactivate the timeline: stop sending the timeline data into etcd, so no pageserver can connect for WAL streaming.
fn deactivate_walsender(&mut self, zttid: &ZTenantTimelineId) {
if let Some(pageserver_connstr) = self.pageserver_connstr.take() {
info!(
"timeline {} had deactivated its wallsender with connstr {pageserver_connstr:?}",
zttid.timeline_id,
)
}
}
fn get_wal_seg_size(&self) -> usize {
self.sk.state.server.wal_seg_size as usize
}
@@ -318,17 +286,12 @@ impl Timeline {
/// Register compute connection, starting timeline-related activity if it is
/// not running yet.
/// Can fail only if channel to a static thread got closed, which is not normal at all.
pub fn on_compute_connect(&self, pageserver_connstr: Option<&String>) -> Result<()> {
pub fn on_compute_connect(&self) -> Result<()> {
let is_wal_backup_action_pending: bool;
{
let mut shared_state = self.mutex.lock().unwrap();
shared_state.num_computes += 1;
is_wal_backup_action_pending = shared_state.update_status();
// FIXME: currently we always adopt latest pageserver connstr, but we
// should have kind of generations assigned by compute to distinguish
// the latest one or even pass it through consensus to reliably deliver
// to all safekeepers.
shared_state.activate_walsender(&self.zttid, pageserver_connstr.cloned());
}
// Wake up wal backup launcher, if offloading not started yet.
if is_wal_backup_action_pending {
@@ -364,7 +327,7 @@ impl Timeline {
(replica_state.remote_consistent_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet.
replica_state.remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn);
if stop {
shared_state.deactivate_walsender(&self.zttid);
shared_state.update_status();
return Ok(true);
}
}
@@ -525,7 +488,6 @@ impl Timeline {
)),
peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn),
safekeeper_connstr: Some(conf.listen_pg_addr.clone()),
pageserver_connstr: shared_state.pageserver_connstr.clone(),
backup_lsn: Some(shared_state.sk.inmem.backup_lsn),
})
}