mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 05:42:57 +00:00
fix: drop region alive countdown tasks when deregistering table (#1808)
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -8520,6 +8520,7 @@ dependencies = [
|
||||
"axum-macros",
|
||||
"axum-test-helper",
|
||||
"base64 0.13.1",
|
||||
"build-data",
|
||||
"bytes",
|
||||
"catalog",
|
||||
"chrono",
|
||||
|
||||
@@ -104,10 +104,14 @@ impl RegionAliveKeepers {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn deregister_table(&self, table_ident: &TableIdent) {
|
||||
if self.keepers.lock().await.remove(table_ident).is_some() {
|
||||
pub async fn deregister_table(
|
||||
&self,
|
||||
table_ident: &TableIdent,
|
||||
) -> Option<Arc<RegionAliveKeeper>> {
|
||||
self.keepers.lock().await.remove(table_ident).map(|x| {
|
||||
info!("Deregister RegionAliveKeeper for table {table_ident}");
|
||||
}
|
||||
x
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn register_region(&self, region_ident: &RegionIdent) {
|
||||
@@ -127,7 +131,7 @@ impl RegionAliveKeepers {
|
||||
warn!("Alive keeper for region {region_ident} is not found!");
|
||||
return;
|
||||
};
|
||||
keeper.deregister_region(region_ident.region_number).await
|
||||
let _ = keeper.deregister_region(region_ident.region_number).await;
|
||||
}
|
||||
|
||||
pub async fn start(&self) {
|
||||
@@ -230,9 +234,11 @@ impl RegionAliveKeeper {
|
||||
return;
|
||||
}
|
||||
|
||||
let countdown_task_handles = self.countdown_task_handles.clone();
|
||||
let countdown_task_handles = Arc::downgrade(&self.countdown_task_handles);
|
||||
let on_task_finished = async move {
|
||||
let _ = countdown_task_handles.lock().await.remove(®ion);
|
||||
if let Some(x) = countdown_task_handles.upgrade() {
|
||||
x.lock().await.remove(®ion);
|
||||
} // Else the countdown task handles map could be dropped because the keeper is dropped.
|
||||
};
|
||||
let handle = Arc::new(CountdownTaskHandle::new(
|
||||
self.table_engine.clone(),
|
||||
@@ -259,19 +265,18 @@ impl RegionAliveKeeper {
|
||||
}
|
||||
}
|
||||
|
||||
async fn deregister_region(&self, region: RegionNumber) {
|
||||
if self
|
||||
.countdown_task_handles
|
||||
async fn deregister_region(&self, region: RegionNumber) -> Option<Arc<CountdownTaskHandle>> {
|
||||
self.countdown_task_handles
|
||||
.lock()
|
||||
.await
|
||||
.remove(®ion)
|
||||
.is_some()
|
||||
{
|
||||
info!(
|
||||
"Deregister alive countdown for region {region} in table {}",
|
||||
self.table_ident
|
||||
)
|
||||
}
|
||||
.map(|x| {
|
||||
info!(
|
||||
"Deregister alive countdown for region {region} in table {}",
|
||||
self.table_ident
|
||||
);
|
||||
x
|
||||
})
|
||||
}
|
||||
|
||||
async fn start(&self) {
|
||||
@@ -319,6 +324,8 @@ enum CountdownCommand {
|
||||
struct CountdownTaskHandle {
|
||||
tx: mpsc::Sender<CountdownCommand>,
|
||||
handler: JoinHandle<()>,
|
||||
table_ident: TableIdent,
|
||||
region: RegionNumber,
|
||||
}
|
||||
|
||||
impl CountdownTaskHandle {
|
||||
@@ -341,7 +348,7 @@ impl CountdownTaskHandle {
|
||||
|
||||
let mut countdown_task = CountdownTask {
|
||||
table_engine,
|
||||
table_ident,
|
||||
table_ident: table_ident.clone(),
|
||||
region,
|
||||
rx,
|
||||
};
|
||||
@@ -350,7 +357,12 @@ impl CountdownTaskHandle {
|
||||
on_task_finished().await;
|
||||
});
|
||||
|
||||
Self { tx, handler }
|
||||
Self {
|
||||
tx,
|
||||
handler,
|
||||
table_ident,
|
||||
region,
|
||||
}
|
||||
}
|
||||
|
||||
async fn start(&self, heartbeat_interval_millis: u64) {
|
||||
@@ -378,7 +390,11 @@ impl CountdownTaskHandle {
|
||||
|
||||
impl Drop for CountdownTaskHandle {
|
||||
fn drop(&mut self) {
|
||||
self.handler.abort()
|
||||
debug!(
|
||||
"Aborting region alive countdown task for region {} in table {}",
|
||||
self.region, self.table_ident,
|
||||
);
|
||||
self.handler.abort();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -640,7 +656,8 @@ mod test {
|
||||
regions.sort();
|
||||
assert_eq!(regions, vec![2, 3, 4]);
|
||||
|
||||
keepers.deregister_table(&table_ident).await;
|
||||
let keeper = keepers.deregister_table(&table_ident).await.unwrap();
|
||||
assert!(Arc::try_unwrap(keeper).is_ok(), "keeper is not dropped");
|
||||
assert!(keepers.keepers.lock().await.is_empty());
|
||||
}
|
||||
|
||||
@@ -676,7 +693,8 @@ mod test {
|
||||
// assert keep_lived works if keeper is started
|
||||
assert!(keeper.deadline(region).await.unwrap() <= ten_seconds_later());
|
||||
|
||||
keeper.deregister_region(region).await;
|
||||
let handle = keeper.deregister_region(region).await.unwrap();
|
||||
assert!(Arc::try_unwrap(handle).is_ok(), "handle is not dropped");
|
||||
assert!(keeper.find_handle(®ion).await.is_none());
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user