Cancel tasks

This commit is contained in:
Bojan Serafimov
2022-06-23 13:20:53 -04:00
parent 0f4552a544
commit 692496d733

View File

@@ -1,6 +1,7 @@
//! This module contains functions to serve per-tenant background processes,
//! such as compaction and GC
use std::collections::HashMap;
use std::ops::ControlFlow;
use crate::repository::Repository;
@@ -10,6 +11,7 @@ use crate::{tenant_mgr, thread_mgr};
use anyhow::{self, Context};
use once_cell::sync::OnceCell;
use tokio::sync::mpsc::{self, Sender};
use tokio::task::JoinHandle;
use tracing::*;
use utils::zid::ZTenantId;
@@ -115,6 +117,10 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> {
.set(compaction_send)
.expect("Failed to set START_COMPACTION_LOOP");
// TODO this is getting repetitive
let mut gc_loops = HashMap::<ZTenantId, JoinHandle<Result<(), anyhow::Error>>>::new();
let mut compaction_loops = HashMap::<ZTenantId, JoinHandle<Result<(), anyhow::Error>>>::new();
thread_mgr::spawn(
ThreadKind::TenantTaskManager,
None,
@@ -126,18 +132,29 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> {
loop {
tokio::select! {
_ = thread_mgr::shutdown_watcher() => {
// TODO cancel all running tasks
break
for (_, handle) in gc_loops.drain() {
handle.abort();
}
for (_, handle) in compaction_loops.drain() {
handle.abort();
}
break;
},
tenantid = gc_recv.recv() => {
let tenantid = tenantid.expect("Gc task channel closed unexpectedly");
// TODO cancel existing loop, if any.
tokio::spawn(gc_loop(tenantid));
let new_handle = tokio::spawn(gc_loop(tenantid));
if let Some(old_handle) = gc_loops.insert(tenantid, new_handle) {
// TODO use non-blocking cancel chan instead
old_handle.abort();
}
},
tenantid = compaction_recv.recv() => {
let tenantid = tenantid.expect("Compaction task channel closed unexpectedly");
// TODO cancel existing loop, if any.
tokio::spawn(compaction_loop(tenantid));
let new_handle = tokio::spawn(compaction_loop(tenantid));
if let Some(old_handle) = compaction_loops.insert(tenantid, new_handle) {
// TODO use non-blocking cancel chan instead
old_handle.abort();
}
},
}
}