mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-26 09:30:37 +00:00
Fix leaked keepalive task in s3 offloading leader election.
I still don't like the surroundings and feel we'd better get away without using election API at all, but this is a quick fix to keep CI green. ref #1815
This commit is contained in:
@@ -90,7 +90,7 @@ impl ElectionLeader {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_leader(req: &Election) -> Result<ElectionLeader> {
|
||||
pub async fn get_leader(req: &Election, leader: &mut Option<ElectionLeader>) -> Result<()> {
|
||||
let mut client = Client::connect(req.broker_endpoints.clone(), None)
|
||||
.await
|
||||
.context("Could not connect to etcd")?;
|
||||
@@ -102,22 +102,27 @@ pub async fn get_leader(req: &Election) -> Result<ElectionLeader> {
|
||||
|
||||
let lease_id = lease.map(|l| l.id()).unwrap();
|
||||
|
||||
let keep_alive = spawn::<_>(lease_keep_alive(client.clone(), lease_id));
|
||||
// kill previous keepalive, if any
|
||||
if let Some(l) = leader.take() {
|
||||
l.give_up().await;
|
||||
}
|
||||
|
||||
if let Err(e) = client
|
||||
let keep_alive = spawn::<_>(lease_keep_alive(client.clone(), lease_id));
|
||||
// immediately save handle to kill task if we get canceled below
|
||||
*leader = Some(ElectionLeader {
|
||||
client: client.clone(),
|
||||
keep_alive,
|
||||
});
|
||||
|
||||
client
|
||||
.campaign(
|
||||
req.election_name.clone(),
|
||||
req.candidate_name.clone(),
|
||||
lease_id,
|
||||
)
|
||||
.await
|
||||
{
|
||||
keep_alive.abort();
|
||||
let _ = keep_alive.await;
|
||||
return Err(e.into());
|
||||
}
|
||||
.await?;
|
||||
|
||||
Ok(ElectionLeader { client, keep_alive })
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn lease_keep_alive(mut client: Client, lease_id: i64) -> Result<()> {
|
||||
|
||||
@@ -200,20 +200,11 @@ impl WalBackupTask {
|
||||
loop {
|
||||
let mut retry_attempt = 0u32;
|
||||
|
||||
if let Some(l) = self.leader.take() {
|
||||
l.give_up().await;
|
||||
}
|
||||
|
||||
info!("acquiring leadership");
|
||||
match broker::get_leader(&self.election).await {
|
||||
Ok(l) => {
|
||||
self.leader = Some(l);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("error during leader election {:?}", e);
|
||||
sleep(Duration::from_millis(BROKER_CONNECTION_RETRY_DELAY_MS)).await;
|
||||
continue;
|
||||
}
|
||||
if let Err(e) = broker::get_leader(&self.election, &mut self.leader).await {
|
||||
error!("error during leader election {:?}", e);
|
||||
sleep(Duration::from_millis(BROKER_CONNECTION_RETRY_DELAY_MS)).await;
|
||||
continue;
|
||||
}
|
||||
info!("acquired leadership");
|
||||
|
||||
|
||||
Reference in New Issue
Block a user