diff --git a/pageserver/src/tenant_threads.rs b/pageserver/src/tenant_threads.rs index 349a1f7060..ce75df21bb 100644 --- a/pageserver/src/tenant_threads.rs +++ b/pageserver/src/tenant_threads.rs @@ -1,6 +1,7 @@ //! This module contains functions to serve per-tenant background processes, //! such as compaction and GC +use std::collections::HashMap; use std::ops::ControlFlow; use crate::repository::Repository; @@ -10,6 +11,7 @@ use crate::{tenant_mgr, thread_mgr}; use anyhow::{self, Context}; use once_cell::sync::OnceCell; use tokio::sync::mpsc::{self, Sender}; +use tokio::task::JoinHandle; use tracing::*; use utils::zid::ZTenantId; @@ -115,6 +117,10 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> { .set(compaction_send) .expect("Failed to set START_COMPACTION_LOOP"); + // TODO this is getting repetitive + let mut gc_loops = HashMap::>>::new(); + let mut compaction_loops = HashMap::>>::new(); + thread_mgr::spawn( ThreadKind::TenantTaskManager, None, @@ -126,18 +132,29 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> { loop { tokio::select! { _ = thread_mgr::shutdown_watcher() => { - // TODO cancel all running tasks - break + for (_, handle) in gc_loops.drain() { + handle.abort(); + } + for (_, handle) in compaction_loops.drain() { + handle.abort(); + } + break; }, tenantid = gc_recv.recv() => { let tenantid = tenantid.expect("Gc task channel closed unexpectedly"); - // TODO cancel existing loop, if any. - tokio::spawn(gc_loop(tenantid)); + let new_handle = tokio::spawn(gc_loop(tenantid)); + if let Some(old_handle) = gc_loops.insert(tenantid, new_handle) { + // TODO use non-blocking cancel chan instead + old_handle.abort(); + } }, tenantid = compaction_recv.recv() => { let tenantid = tenantid.expect("Compaction task channel closed unexpectedly"); - // TODO cancel existing loop, if any. - tokio::spawn(compaction_loop(tenantid)); + let new_handle = tokio::spawn(compaction_loop(tenantid)); + if let Some(old_handle) = compaction_loops.insert(tenantid, new_handle) { + // TODO use non-blocking cancel chan instead + old_handle.abort(); + } }, } }