From fa522bc57979b581ac84f2dfe9add6de22f2291a Mon Sep 17 00:00:00 2001 From: LFC Date: Wed, 21 Jun 2023 14:49:32 +0800 Subject: [PATCH] fix: drop region alive countdown tasks when deregistering table (#1808) --- Cargo.lock | 1 + src/catalog/src/remote/region_alive_keeper.rs | 60 ++++++++++++------- 2 files changed, 40 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 24f9769e5d..d9e8c1ca52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8520,6 +8520,7 @@ dependencies = [ "axum-macros", "axum-test-helper", "base64 0.13.1", + "build-data", "bytes", "catalog", "chrono", diff --git a/src/catalog/src/remote/region_alive_keeper.rs b/src/catalog/src/remote/region_alive_keeper.rs index 61daee4cf1..9b64e35559 100644 --- a/src/catalog/src/remote/region_alive_keeper.rs +++ b/src/catalog/src/remote/region_alive_keeper.rs @@ -104,10 +104,14 @@ impl RegionAliveKeepers { Ok(()) } - pub async fn deregister_table(&self, table_ident: &TableIdent) { - if self.keepers.lock().await.remove(table_ident).is_some() { + pub async fn deregister_table( + &self, + table_ident: &TableIdent, + ) -> Option> { + self.keepers.lock().await.remove(table_ident).map(|x| { info!("Deregister RegionAliveKeeper for table {table_ident}"); - } + x + }) } pub async fn register_region(&self, region_ident: &RegionIdent) { @@ -127,7 +131,7 @@ impl RegionAliveKeepers { warn!("Alive keeper for region {region_ident} is not found!"); return; }; - keeper.deregister_region(region_ident.region_number).await + let _ = keeper.deregister_region(region_ident.region_number).await; } pub async fn start(&self) { @@ -230,9 +234,11 @@ impl RegionAliveKeeper { return; } - let countdown_task_handles = self.countdown_task_handles.clone(); + let countdown_task_handles = Arc::downgrade(&self.countdown_task_handles); let on_task_finished = async move { - let _ = countdown_task_handles.lock().await.remove(®ion); + if let Some(x) = countdown_task_handles.upgrade() { + x.lock().await.remove(®ion); + } // Else the countdown task handles map could be dropped because the keeper is dropped. }; let handle = Arc::new(CountdownTaskHandle::new( self.table_engine.clone(), @@ -259,19 +265,18 @@ impl RegionAliveKeeper { } } - async fn deregister_region(&self, region: RegionNumber) { - if self - .countdown_task_handles + async fn deregister_region(&self, region: RegionNumber) -> Option> { + self.countdown_task_handles .lock() .await .remove(®ion) - .is_some() - { - info!( - "Deregister alive countdown for region {region} in table {}", - self.table_ident - ) - } + .map(|x| { + info!( + "Deregister alive countdown for region {region} in table {}", + self.table_ident + ); + x + }) } async fn start(&self) { @@ -319,6 +324,8 @@ enum CountdownCommand { struct CountdownTaskHandle { tx: mpsc::Sender, handler: JoinHandle<()>, + table_ident: TableIdent, + region: RegionNumber, } impl CountdownTaskHandle { @@ -341,7 +348,7 @@ impl CountdownTaskHandle { let mut countdown_task = CountdownTask { table_engine, - table_ident, + table_ident: table_ident.clone(), region, rx, }; @@ -350,7 +357,12 @@ impl CountdownTaskHandle { on_task_finished().await; }); - Self { tx, handler } + Self { + tx, + handler, + table_ident, + region, + } } async fn start(&self, heartbeat_interval_millis: u64) { @@ -378,7 +390,11 @@ impl CountdownTaskHandle { impl Drop for CountdownTaskHandle { fn drop(&mut self) { - self.handler.abort() + debug!( + "Aborting region alive countdown task for region {} in table {}", + self.region, self.table_ident, + ); + self.handler.abort(); } } @@ -640,7 +656,8 @@ mod test { regions.sort(); assert_eq!(regions, vec![2, 3, 4]); - keepers.deregister_table(&table_ident).await; + let keeper = keepers.deregister_table(&table_ident).await.unwrap(); + assert!(Arc::try_unwrap(keeper).is_ok(), "keeper is not dropped"); assert!(keepers.keepers.lock().await.is_empty()); } @@ -676,7 +693,8 @@ mod test { // assert keep_lived works if keeper is started assert!(keeper.deadline(region).await.unwrap() <= ten_seconds_later()); - keeper.deregister_region(region).await; + let handle = keeper.deregister_region(region).await.unwrap(); + assert!(Arc::try_unwrap(handle).is_ok(), "handle is not dropped"); assert!(keeper.find_handle(®ion).await.is_none()); }