diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 1a8d72cddf..63b5532390 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -331,6 +331,7 @@ pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow: // Spawn gc and compaction loops. The loops will shut themselves // down when they notice that the tenant is inactive. + // TODO maybe use tokio::sync::watch instead? crate::tenant_threads::start_compaction_loop(tenant_id)?; crate::tenant_threads::start_gc_loop(tenant_id)?; } diff --git a/pageserver/src/tenant_threads.rs b/pageserver/src/tenant_threads.rs index 57f23da033..00f44bdb49 100644 --- a/pageserver/src/tenant_threads.rs +++ b/pageserver/src/tenant_threads.rs @@ -10,17 +10,22 @@ use crate::thread_mgr::ThreadKind; use crate::{tenant_mgr, thread_mgr}; use anyhow::{self, Context}; use once_cell::sync::OnceCell; -use tokio::sync::mpsc::{self, Sender}; -use tokio::task::JoinHandle; +use tokio::sync::mpsc; +use tokio::sync::watch; use tracing::*; use utils::zid::ZTenantId; +// TODO metrics + /// /// Compaction task's main loop /// -async fn compaction_loop(tenantid: ZTenantId) -> anyhow::Result<()> { +async fn compaction_loop( + tenantid: ZTenantId, + mut cancel: watch::Receiver<()>, +) -> anyhow::Result<()> { loop { - trace!("compaction loop for tenant {} waking up", tenantid); + trace!("waking up"); // Run blocking part of the task let period: Result, _> = tokio::task::spawn_blocking(move || { @@ -37,7 +42,13 @@ async fn compaction_loop(tenantid: ZTenantId) -> anyhow::Result<()> { // Handle result match period { Ok(Ok(ControlFlow::Continue(period))) => { - tokio::time::sleep(period).await; + tokio::select! { + _ = cancel.changed() => { + trace!("received cancellation request"); + break; + } + _ = tokio::time::sleep(period) => {}, + } } Ok(Ok(ControlFlow::Break(()))) => { break; @@ -52,15 +63,14 @@ async fn compaction_loop(tenantid: ZTenantId) -> anyhow::Result<()> { } trace!( - "compaction loop stopped for tenant {} state is {:?}", - tenantid, + "compaction loop stopped. State is {:?}", tenant_mgr::get_tenant_state(tenantid) ); Ok(()) } -static START_GC_LOOP: OnceCell> = OnceCell::new(); -static START_COMPACTION_LOOP: OnceCell> = OnceCell::new(); +static START_GC_LOOP: OnceCell> = OnceCell::new(); +static START_COMPACTION_LOOP: OnceCell> = OnceCell::new(); /// Spawn a task that will periodically schedule garbage collection until /// the tenant becomes inactive. This should be called on tenant @@ -109,8 +119,8 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> { .expect("Failed to set START_COMPACTION_LOOP"); // TODO this is getting repetitive - let mut gc_loops = HashMap::>>::new(); - let mut compaction_loops = HashMap::>>::new(); + let mut gc_loops = HashMap::>::new(); + let mut compaction_loops = HashMap::>::new(); thread_mgr::spawn( ThreadKind::TenantTaskManager, @@ -123,30 +133,39 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> { loop { tokio::select! { _ = thread_mgr::shutdown_watcher() => { - for (_, handle) in gc_loops.drain() { - handle.abort(); + // TODO do this from thread_mgr? Extend it to work with tasks? + for (_, cancel) in gc_loops.drain() { + cancel.send(()).ok(); } - for (_, handle) in compaction_loops.drain() { - handle.abort(); + for (_, cancel) in compaction_loops.drain() { + cancel.send(()).ok(); } + // TODO wait for tasks to die? break; }, tenantid = gc_recv.recv() => { let tenantid = tenantid.expect("Gc task channel closed unexpectedly"); - let new_handle = tokio::spawn(gc_loop(tenantid)); - if let Some(old_handle) = gc_loops.insert(tenantid, new_handle) { - // TODO use non-blocking cancel chan instead - old_handle.abort(); + + // Spawn new task, request cancellation of the old one if exists + let (cancel_send, cancel_recv) = watch::channel(()); + let _handle = tokio::spawn(gc_loop(tenantid, cancel_recv)) + .instrument(trace_span!("gc loop", tenant = %tenantid)); + if let Some(old_cancel_send) = gc_loops.insert(tenantid, cancel_send) { + old_cancel_send.send(()).ok(); } }, tenantid = compaction_recv.recv() => { let tenantid = tenantid.expect("Compaction task channel closed unexpectedly"); - let new_handle = tokio::spawn(compaction_loop(tenantid)); - if let Some(old_handle) = compaction_loops.insert(tenantid, new_handle) { - // TODO use non-blocking cancel chan instead - old_handle.abort(); + + // Spawn new task, request cancellation of the old one if exists + let (cancel_send, cancel_recv) = watch::channel(()); + let _handle = tokio::spawn(compaction_loop(tenantid, cancel_recv)) + .instrument(trace_span!("compaction loop", tenant = %tenantid)); + if let Some(old_cancel_send) = compaction_loops.insert(tenantid, cancel_send) { + old_cancel_send.send(()).ok(); } }, + // TODO await return values? Report errors? } } }); @@ -160,9 +179,9 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> { /// /// GC task's main loop /// -async fn gc_loop(tenantid: ZTenantId) -> anyhow::Result<()> { +async fn gc_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) -> anyhow::Result<()> { loop { - trace!("gc loop for tenant {} waking up", tenantid); + trace!("waking up"); // Run blocking part of the task let period: Result, _> = tokio::task::spawn_blocking(move || { @@ -184,7 +203,13 @@ async fn gc_loop(tenantid: ZTenantId) -> anyhow::Result<()> { // Handle result match period { Ok(Ok(ControlFlow::Continue(period))) => { - tokio::time::sleep(period).await; + tokio::select! { + _ = cancel.changed() => { + trace!("received cancellation request"); + break; + } + _ = tokio::time::sleep(period) => {}, + } } Ok(Ok(ControlFlow::Break(()))) => { break; @@ -198,8 +223,7 @@ async fn gc_loop(tenantid: ZTenantId) -> anyhow::Result<()> { } } trace!( - "GC loop stopped for tenant {} state is {:?}", - tenantid, + "GC loop stopped. State is {:?}", tenant_mgr::get_tenant_state(tenantid) ); Ok(())