diff --git a/pageserver/src/tenant_threads.rs b/pageserver/src/tenant_threads.rs index 67121d4cea..ce75df21bb 100644 --- a/pageserver/src/tenant_threads.rs +++ b/pageserver/src/tenant_threads.rs @@ -1,15 +1,17 @@ //! This module contains functions to serve per-tenant background processes, //! such as compaction and GC +use std::collections::HashMap; use std::ops::ControlFlow; use crate::repository::Repository; use crate::tenant_mgr::TenantState; use crate::thread_mgr::ThreadKind; use crate::{tenant_mgr, thread_mgr}; -use anyhow; +use anyhow::{self, Context}; use once_cell::sync::OnceCell; use tokio::sync::mpsc::{self, Sender}; +use tokio::task::JoinHandle; use tracing::*; use utils::zid::ZTenantId; @@ -75,9 +77,10 @@ static START_COMPACTION_LOOP: OnceCell> = OnceCell::new(); pub fn start_gc_loop(tenantid: ZTenantId) -> anyhow::Result<()> { START_GC_LOOP .get() - .expect("failed to get START_GC_LOOP") + .context("Failed to get START_GC_LOOP")? .blocking_send(tenantid) - .expect("Failed to send to START_GC_LOOP channel"); + .map_err(|e| anyhow::anyhow!(e)) + .context("Failed to send to START_GC_LOOP channel")?; Ok(()) } @@ -87,9 +90,10 @@ pub fn start_gc_loop(tenantid: ZTenantId) -> anyhow::Result<()> { pub fn start_compaction_loop(tenantid: ZTenantId) -> anyhow::Result<()> { START_COMPACTION_LOOP .get() - .expect("failed to get START_COMPACTION_LOOP") + .context("failed to get START_COMPACTION_LOOP")? .blocking_send(tenantid) - .expect("failed to send to START_COMPACTION_LOOP"); + .map_err(|e| anyhow::anyhow!(e)) + .context("failed to send to START_COMPACTION_LOOP")?; Ok(()) } @@ -113,6 +117,10 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> { .set(compaction_send) .expect("Failed to set START_COMPACTION_LOOP"); + // TODO this is getting repetitive + let mut gc_loops = HashMap::>>::new(); + let mut compaction_loops = HashMap::>>::new(); + thread_mgr::spawn( ThreadKind::TenantTaskManager, None, @@ -124,17 +132,29 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> { loop { tokio::select! { _ = thread_mgr::shutdown_watcher() => { - // TODO cancel all running tasks - break + for (_, handle) in gc_loops.drain() { + handle.abort(); + } + for (_, handle) in compaction_loops.drain() { + handle.abort(); + } + break; }, - // TODO don't spawn if already running tenantid = gc_recv.recv() => { let tenantid = tenantid.expect("Gc task channel closed unexpectedly"); - tokio::spawn(gc_loop(tenantid)); + let new_handle = tokio::spawn(gc_loop(tenantid)); + if let Some(old_handle) = gc_loops.insert(tenantid, new_handle) { + // TODO use non-blocking cancel chan instead + old_handle.abort(); + } }, tenantid = compaction_recv.recv() => { let tenantid = tenantid.expect("Compaction task channel closed unexpectedly"); - tokio::spawn(compaction_loop(tenantid)); + let new_handle = tokio::spawn(compaction_loop(tenantid)); + if let Some(old_handle) = compaction_loops.insert(tenantid, new_handle) { + // TODO use non-blocking cancel chan instead + old_handle.abort(); + } }, } }