From d7d4cc8c772e689d7288830b3c8ec3050d22be90 Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Thu, 23 Jun 2022 12:56:21 -0400 Subject: [PATCH 1/3] Error instead of panic --- pageserver/src/tenant_threads.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pageserver/src/tenant_threads.rs b/pageserver/src/tenant_threads.rs index 67121d4cea..50d321dc75 100644 --- a/pageserver/src/tenant_threads.rs +++ b/pageserver/src/tenant_threads.rs @@ -7,7 +7,7 @@ use crate::repository::Repository; use crate::tenant_mgr::TenantState; use crate::thread_mgr::ThreadKind; use crate::{tenant_mgr, thread_mgr}; -use anyhow; +use anyhow::{self, Context}; use once_cell::sync::OnceCell; use tokio::sync::mpsc::{self, Sender}; use tracing::*; @@ -75,9 +75,10 @@ static START_COMPACTION_LOOP: OnceCell> = OnceCell::new(); pub fn start_gc_loop(tenantid: ZTenantId) -> anyhow::Result<()> { START_GC_LOOP .get() - .expect("failed to get START_GC_LOOP") + .context("Failed to get START_GC_LOOP")? .blocking_send(tenantid) - .expect("Failed to send to START_GC_LOOP channel"); + .map_err(|e| anyhow::anyhow!(e)) + .context("Failed to send to START_GC_LOOP channel")?; Ok(()) } @@ -87,9 +88,10 @@ pub fn start_gc_loop(tenantid: ZTenantId) -> anyhow::Result<()> { pub fn start_compaction_loop(tenantid: ZTenantId) -> anyhow::Result<()> { START_COMPACTION_LOOP .get() - .expect("failed to get START_COMPACTION_LOOP") + .context("failed to get START_COMPACTION_LOOP")? .blocking_send(tenantid) - .expect("failed to send to START_COMPACTION_LOOP"); + .map_err(|e| anyhow::anyhow!(e)) + .context("failed to send to START_COMPACTION_LOOP")?; Ok(()) } From 0f4552a54433abfb98144a6c5922002831c610fe Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Thu, 23 Jun 2022 12:59:20 -0400 Subject: [PATCH 2/3] Update TODO --- pageserver/src/tenant_threads.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pageserver/src/tenant_threads.rs b/pageserver/src/tenant_threads.rs index 50d321dc75..349a1f7060 100644 --- a/pageserver/src/tenant_threads.rs +++ b/pageserver/src/tenant_threads.rs @@ -129,13 +129,14 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> { // TODO cancel all running tasks break }, - // TODO don't spawn if already running tenantid = gc_recv.recv() => { let tenantid = tenantid.expect("Gc task channel closed unexpectedly"); + // TODO cancel existing loop, if any. tokio::spawn(gc_loop(tenantid)); }, tenantid = compaction_recv.recv() => { let tenantid = tenantid.expect("Compaction task channel closed unexpectedly"); + // TODO cancel existing loop, if any. tokio::spawn(compaction_loop(tenantid)); }, } From 692496d7330cc1cca0abeddfb4d1e6e012fc2947 Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Thu, 23 Jun 2022 13:20:53 -0400 Subject: [PATCH 3/3] Cancel tasks --- pageserver/src/tenant_threads.rs | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/pageserver/src/tenant_threads.rs b/pageserver/src/tenant_threads.rs index 349a1f7060..ce75df21bb 100644 --- a/pageserver/src/tenant_threads.rs +++ b/pageserver/src/tenant_threads.rs @@ -1,6 +1,7 @@ //! This module contains functions to serve per-tenant background processes, //! such as compaction and GC +use std::collections::HashMap; use std::ops::ControlFlow; use crate::repository::Repository; @@ -10,6 +11,7 @@ use crate::{tenant_mgr, thread_mgr}; use anyhow::{self, Context}; use once_cell::sync::OnceCell; use tokio::sync::mpsc::{self, Sender}; +use tokio::task::JoinHandle; use tracing::*; use utils::zid::ZTenantId; @@ -115,6 +117,10 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> { .set(compaction_send) .expect("Failed to set START_COMPACTION_LOOP"); + // TODO this is getting repetitive + let mut gc_loops = HashMap::>>::new(); + let mut compaction_loops = HashMap::>>::new(); + thread_mgr::spawn( ThreadKind::TenantTaskManager, None, @@ -126,18 +132,29 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> { loop { tokio::select! { _ = thread_mgr::shutdown_watcher() => { - // TODO cancel all running tasks - break + for (_, handle) in gc_loops.drain() { + handle.abort(); + } + for (_, handle) in compaction_loops.drain() { + handle.abort(); + } + break; }, tenantid = gc_recv.recv() => { let tenantid = tenantid.expect("Gc task channel closed unexpectedly"); - // TODO cancel existing loop, if any. - tokio::spawn(gc_loop(tenantid)); + let new_handle = tokio::spawn(gc_loop(tenantid)); + if let Some(old_handle) = gc_loops.insert(tenantid, new_handle) { + // TODO use non-blocking cancel chan instead + old_handle.abort(); + } }, tenantid = compaction_recv.recv() => { let tenantid = tenantid.expect("Compaction task channel closed unexpectedly"); - // TODO cancel existing loop, if any. - tokio::spawn(compaction_loop(tenantid)); + let new_handle = tokio::spawn(compaction_loop(tenantid)); + if let Some(old_handle) = compaction_loops.insert(tenantid, new_handle) { + // TODO use non-blocking cancel chan instead + old_handle.abort(); + } }, } }