From ee36ca54d59161f896c1d353dd3a1a1d0ef22ef8 Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Wed, 15 Jun 2022 09:16:57 -0400 Subject: [PATCH] Run compaction as task --- pageserver/src/tenant_mgr.rs | 17 +---------- pageserver/src/tenant_threads.rs | 48 ++++++++++++++++++++++++-------- 2 files changed, 38 insertions(+), 27 deletions(-) diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index f69f9cfe07..7d1187866f 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -330,23 +330,8 @@ pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow: } (TenantState::Idle, TenantState::Active) => { info!("activating tenant {tenant_id}"); - let compactor_spawn_result = thread_mgr::spawn( - ThreadKind::Compactor, - Some(tenant_id), - None, - "Compactor thread", - false, - move || crate::tenant_threads::compact_loop(tenant_id), - ); - if compactor_spawn_result.is_err() { - let mut m = tenants_state::write_tenants(); - m.get_mut(&tenant_id) - .with_context(|| format!("Tenant not found for id {tenant_id}"))? - .state = old_state; - drop(m); - } - compactor_spawn_result?; + crate::tenant_threads::start_compaction_loop(tenant_id)?; crate::tenant_threads::start_gc_loop(tenant_id)?; } (TenantState::Idle, TenantState::Stopping) => { diff --git a/pageserver/src/tenant_threads.rs b/pageserver/src/tenant_threads.rs index f798c74493..ead445ef15 100644 --- a/pageserver/src/tenant_threads.rs +++ b/pageserver/src/tenant_threads.rs @@ -14,8 +14,8 @@ use utils::zid::ZTenantId; /// /// Compaction thread's main loop /// -pub fn compact_loop(tenantid: ZTenantId) -> Result<()> { - if let Err(err) = compact_loop_ext(tenantid) { +pub async fn compaction_loop(tenantid: ZTenantId) -> Result<()> { + if let Err(err) = compaction_loop_ext(tenantid).await { error!("compact loop terminated with error: {:?}", err); Err(err) } else { @@ -23,7 +23,7 @@ pub fn compact_loop(tenantid: ZTenantId) -> Result<()> { } } -fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> { +async fn compaction_loop_ext(tenantid: ZTenantId) -> Result<()> { loop { if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) { break; @@ -36,7 +36,19 @@ fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> { // Compact timelines let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - repo.compaction_iteration()?; + let compaction_result = + tokio::task::spawn_blocking(move || repo.compaction_iteration()).await; + match compaction_result { + Ok(Ok(())) => { + // success, do nothing + } + Ok(Err(e)) => { + anyhow::bail!(e.context("Compaction failed")); + } + Err(e) => { + anyhow::bail!("Compaction join error {}", e); + } + } } trace!( @@ -48,6 +60,7 @@ fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> { } static START_GC_LOOP: OnceCell> = OnceCell::new(); +static START_COMPACTION_LOOP: OnceCell> = OnceCell::new(); pub fn start_gc_loop(tenantid: ZTenantId) -> Result<()> { START_GC_LOOP @@ -58,6 +71,15 @@ pub fn start_gc_loop(tenantid: ZTenantId) -> Result<()> { Ok(()) } +pub fn start_compaction_loop(tenantid: ZTenantId) -> Result<()> { + START_COMPACTION_LOOP + .get() + .unwrap() + .blocking_send(tenantid) + .unwrap(); + Ok(()) +} + pub fn init_tenant_task_pool() -> Result<()> { let runtime = tokio::runtime::Builder::new_multi_thread() .thread_name("tenant-task-worker") @@ -66,8 +88,11 @@ pub fn init_tenant_task_pool() -> Result<()> { .enable_all() .build()?; - let (send, mut recv) = mpsc::channel::(100); - START_GC_LOOP.set(send).unwrap(); + let (gc_send, mut gc_recv) = mpsc::channel::(100); + START_GC_LOOP.set(gc_send).unwrap(); + + let (compaction_send, mut compaction_recv) = mpsc::channel::(100); + START_COMPACTION_LOOP.set(compaction_send).unwrap(); thread_mgr::spawn( ThreadKind::WalReceiverManager, // TODO @@ -80,9 +105,12 @@ pub fn init_tenant_task_pool() -> Result<()> { loop { tokio::select! { _ = thread_mgr::shutdown_watcher() => break, - tenantid = recv.recv() => { + tenantid = gc_recv.recv() => { tokio::spawn(gc_loop(tenantid.unwrap())); }, + tenantid = compaction_recv.recv() => { + tokio::spawn(compaction_loop(tenantid.unwrap())); + }, } } }); @@ -119,12 +147,10 @@ pub async fn gc_loop(tenantid: ZTenantId) -> Result<()> { // Gc success, do nothing } Ok(Err(e)) => { - error!("Gc failed: {}", e); - // TODO maybe also don't reschedule on error? + anyhow::bail!(e.context("Gc failed")); } Err(e) => { - error!("Gc failed: {}", e); - // TODO maybe also don't reschedule on error? + anyhow::bail!("Gc join error {}", e); } } }