diff --git a/pageserver/src/tenant_threads.rs b/pageserver/src/tenant_threads.rs index e337ea5ab2..67121d4cea 100644 --- a/pageserver/src/tenant_threads.rs +++ b/pageserver/src/tenant_threads.rs @@ -1,11 +1,13 @@ //! This module contains functions to serve per-tenant background processes, //! such as compaction and GC +use std::ops::ControlFlow; + use crate::repository::Repository; use crate::tenant_mgr::TenantState; use crate::thread_mgr::ThreadKind; use crate::{tenant_mgr, thread_mgr}; -use anyhow::Result; +use anyhow; use once_cell::sync::OnceCell; use tokio::sync::mpsc::{self, Sender}; use tracing::*; @@ -14,7 +16,7 @@ use utils::zid::ZTenantId; /// /// Compaction task's main loop /// -async fn compaction_loop(tenantid: ZTenantId) -> Result<()> { +async fn compaction_loop(tenantid: ZTenantId) -> anyhow::Result<()> { if let Err(err) = compaction_loop_ext(tenantid).await { error!("compact loop terminated with error: {:?}", err); Err(err) @@ -23,30 +25,35 @@ async fn compaction_loop(tenantid: ZTenantId) -> Result<()> { } } -async fn compaction_loop_ext(tenantid: ZTenantId) -> Result<()> { +async fn compaction_loop_ext(tenantid: ZTenantId) -> anyhow::Result<()> { loop { - if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) { - break; - } - let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - let compaction_period = repo.get_compaction_period(); - - tokio::time::sleep(compaction_period).await; trace!("compaction loop for tenant {} waking up", tenantid); - // Compact timelines - let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - let compaction_result = - tokio::task::spawn_blocking(move || repo.compaction_iteration()).await; - match compaction_result { - Ok(Ok(())) => { - // success, do nothing + // Run blocking part of the task + let period: Result, _> = tokio::task::spawn_blocking(move || { + if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) { + return Ok(ControlFlow::Break(())); + } + let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; + let compaction_period = repo.get_compaction_period(); + repo.compaction_iteration()?; + Ok(ControlFlow::Continue(compaction_period)) + }) + .await; + + // Handle result + match period { + Ok(Ok(ControlFlow::Continue(period))) => { + tokio::time::sleep(period).await; + } + Ok(Ok(ControlFlow::Break(()))) => { + break; } Ok(Err(e)) => { - anyhow::bail!(e.context("Compaction failed")); + anyhow::bail!("Compaction failed: {}", e); } Err(e) => { - anyhow::bail!("Compaction join error {}", e); + anyhow::bail!("Compaction join error: {}", e); } } } @@ -65,30 +72,30 @@ static START_COMPACTION_LOOP: OnceCell> = OnceCell::new(); /// Spawn a task that will periodically schedule garbage collection until /// the tenant becomes inactive. This should be called on tenant /// activation. -pub fn start_gc_loop(tenantid: ZTenantId) -> Result<()> { +pub fn start_gc_loop(tenantid: ZTenantId) -> anyhow::Result<()> { START_GC_LOOP .get() - .unwrap() + .expect("failed to get START_GC_LOOP") .blocking_send(tenantid) - .unwrap(); + .expect("Failed to send to START_GC_LOOP channel"); Ok(()) } /// Spawn a task that will periodically schedule compaction until /// the tenant becomes inactive. This should be called on tenant /// activation. -pub fn start_compaction_loop(tenantid: ZTenantId) -> Result<()> { +pub fn start_compaction_loop(tenantid: ZTenantId) -> anyhow::Result<()> { START_COMPACTION_LOOP .get() - .unwrap() + .expect("failed to get START_COMPACTION_LOOP") .blocking_send(tenantid) - .unwrap(); + .expect("failed to send to START_COMPACTION_LOOP"); Ok(()) } /// Spawn the TenantTaskManager /// This needs to be called before start_gc_loop or start_compaction_loop -pub fn init_tenant_task_pool() -> Result<()> { +pub fn init_tenant_task_pool() -> anyhow::Result<()> { let runtime = tokio::runtime::Builder::new_multi_thread() .thread_name("tenant-task-worker") .worker_threads(40) // Way more than necessary @@ -97,28 +104,37 @@ pub fn init_tenant_task_pool() -> Result<()> { .build()?; let (gc_send, mut gc_recv) = mpsc::channel::(100); - START_GC_LOOP.set(gc_send).unwrap(); + START_GC_LOOP + .set(gc_send) + .expect("Failed to set START_GC_LOOP"); let (compaction_send, mut compaction_recv) = mpsc::channel::(100); - START_COMPACTION_LOOP.set(compaction_send).unwrap(); + START_COMPACTION_LOOP + .set(compaction_send) + .expect("Failed to set START_COMPACTION_LOOP"); thread_mgr::spawn( ThreadKind::TenantTaskManager, None, None, - "WAL receiver manager main thread", + "Tenant task manager main thread", true, move || { runtime.block_on(async move { loop { tokio::select! { - _ = thread_mgr::shutdown_watcher() => break, + _ = thread_mgr::shutdown_watcher() => { + // TODO cancel all running tasks + break + }, // TODO don't spawn if already running tenantid = gc_recv.recv() => { - tokio::spawn(gc_loop(tenantid.unwrap())); + let tenantid = tenantid.expect("Gc task channel closed unexpectedly"); + tokio::spawn(gc_loop(tenantid)); }, tenantid = compaction_recv.recv() => { - tokio::spawn(compaction_loop(tenantid.unwrap())); + let tenantid = tenantid.expect("Compaction task channel closed unexpectedly"); + tokio::spawn(compaction_loop(tenantid)); }, } } @@ -133,38 +149,42 @@ pub fn init_tenant_task_pool() -> Result<()> { /// /// GC thread's main loop /// -async fn gc_loop(tenantid: ZTenantId) -> Result<()> { +async fn gc_loop(tenantid: ZTenantId) -> anyhow::Result<()> { loop { - if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) { - break; - } - trace!("gc loop for tenant {} waking up", tenantid); - let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - let gc_period = repo.get_gc_period(); - let gc_horizon = repo.get_gc_horizon(); - // Garbage collect old files that are not needed for PITR anymore - if gc_horizon > 0 { - let gc_result = tokio::task::spawn_blocking(move || { - repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false) - }) - .await; + // Run blocking part of the task + let period: Result, _> = tokio::task::spawn_blocking(move || { + if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) { + return Ok(ControlFlow::Break(())); + } + let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; + let gc_period = repo.get_gc_period(); + let gc_horizon = repo.get_gc_horizon(); - match gc_result { - Ok(Ok(_gc_result)) => { - // Gc success, do nothing - } - Ok(Err(e)) => { - anyhow::bail!(e.context("Gc failed")); - } - Err(e) => { - anyhow::bail!("Gc join error {}", e); - } + if gc_horizon > 0 { + repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?; + } + + Ok(ControlFlow::Continue(gc_period)) + }) + .await; + + // Handle result + match period { + Ok(Ok(ControlFlow::Continue(period))) => { + tokio::time::sleep(period).await; + } + Ok(Ok(ControlFlow::Break(()))) => { + break; + } + Ok(Err(e)) => { + anyhow::bail!("Gc failed: {}", e); + } + Err(e) => { + anyhow::bail!("Gc join error: {}", e); } } - - tokio::time::sleep(gc_period).await; } trace!( "GC loop stopped for tenant {} state is {:?}",