diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index 5be8091a7e..3d75fec587 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -90,7 +90,7 @@ impl ElectionLeader { } } -pub async fn get_leader(req: &Election) -> Result { +pub async fn get_leader(req: &Election, leader: &mut Option) -> Result<()> { let mut client = Client::connect(req.broker_endpoints.clone(), None) .await .context("Could not connect to etcd")?; @@ -102,22 +102,27 @@ pub async fn get_leader(req: &Election) -> Result { let lease_id = lease.map(|l| l.id()).unwrap(); - let keep_alive = spawn::<_>(lease_keep_alive(client.clone(), lease_id)); + // kill previous keepalive, if any + if let Some(l) = leader.take() { + l.give_up().await; + } - if let Err(e) = client + let keep_alive = spawn::<_>(lease_keep_alive(client.clone(), lease_id)); + // immediately save handle to kill task if we get canceled below + *leader = Some(ElectionLeader { + client: client.clone(), + keep_alive, + }); + + client .campaign( req.election_name.clone(), req.candidate_name.clone(), lease_id, ) - .await - { - keep_alive.abort(); - let _ = keep_alive.await; - return Err(e.into()); - } + .await?; - Ok(ElectionLeader { client, keep_alive }) + Ok(()) } async fn lease_keep_alive(mut client: Client, lease_id: i64) -> Result<()> { diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 30364ce434..1f2e9c303a 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -200,20 +200,11 @@ impl WalBackupTask { loop { let mut retry_attempt = 0u32; - if let Some(l) = self.leader.take() { - l.give_up().await; - } - info!("acquiring leadership"); - match broker::get_leader(&self.election).await { - Ok(l) => { - self.leader = Some(l); - } - Err(e) => { - error!("error during leader election {:?}", e); - sleep(Duration::from_millis(BROKER_CONNECTION_RETRY_DELAY_MS)).await; - continue; - } + if let Err(e) = broker::get_leader(&self.election, &mut self.leader).await { + error!("error during leader election {:?}", e); + sleep(Duration::from_millis(BROKER_CONNECTION_RETRY_DELAY_MS)).await; + continue; } info!("acquired leadership");