diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 1d407a29bc..22cc1c2edc 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -276,6 +276,8 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<() let remote_index = tenant_mgr::init_tenant_mgr(conf)?; + pageserver::tenant_threads::init_tenant_task_pool()?; + // Spawn a new thread for the http endpoint // bind before launching separate thread so the error reported before startup exits let auth_cloned = auth.clone(); diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index c48b021d1f..f69f9cfe07 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -347,27 +347,7 @@ pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow: } compactor_spawn_result?; - let gc_spawn_result = thread_mgr::spawn( - ThreadKind::GarbageCollector, - Some(tenant_id), - None, - "GC thread", - false, - move || crate::tenant_threads::gc_loop(tenant_id), - ) - .map(|_thread_id| ()) // update the `Result::Ok` type to match the outer function's return signature - .with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}")); - - if let Err(e) = &gc_spawn_result { - let mut m = tenants_state::write_tenants(); - m.get_mut(&tenant_id) - .with_context(|| format!("Tenant not found for id {tenant_id}"))? - .state = old_state; - drop(m); - error!("Failed to start GC thread for tenant {tenant_id}, stopping its checkpointer thread: {e:?}"); - thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None); - return gc_spawn_result; - } + crate::tenant_threads::start_gc_loop(tenant_id)?; } (TenantState::Idle, TenantState::Stopping) => { info!("stopping idle tenant {tenant_id}"); diff --git a/pageserver/src/tenant_threads.rs b/pageserver/src/tenant_threads.rs index 2268846d1e..7f44552904 100644 --- a/pageserver/src/tenant_threads.rs +++ b/pageserver/src/tenant_threads.rs @@ -1,11 +1,16 @@ //! This module contains functions to serve per-tenant background processes, //! such as compaction and GC +use std::str::FromStr; + use crate::repository::Repository; -use crate::tenant_mgr; use crate::tenant_mgr::TenantState; +use crate::thread_mgr::ThreadKind; +use crate::{tenant_mgr, thread_mgr}; use anyhow::Result; -use tokio::task::JoinError; -use std::time::Duration; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use once_cell::sync::OnceCell; +use tokio::sync::mpsc::{self, Sender}; use tracing::*; use utils::zid::ZTenantId; @@ -45,6 +50,58 @@ fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> { Ok(()) } +static START_GC_LOOP: OnceCell> = OnceCell::new(); + +pub fn start_gc_loop(tenantid: ZTenantId) -> Result<()> { + START_GC_LOOP + .get() + .unwrap() + .blocking_send(tenantid) + .unwrap(); + Ok(()) +} + +pub fn init_tenant_task_pool() -> Result<()> { + let runtime = tokio::runtime::Builder::new_multi_thread() + .thread_name("tenant-task-worker") + .worker_threads(40) + .max_blocking_threads(100) + .enable_all() + .build()?; + + let (send, mut recv) = mpsc::channel::(100); + START_GC_LOOP.set(send).unwrap(); + + thread_mgr::spawn( + ThreadKind::WalReceiverManager, // TODO + None, + None, + "WAL receiver manager main thread", + true, + move || { + runtime.block_on(async move { + let mut futures = FuturesUnordered::new(); + loop { + tokio::select! { + _ = thread_mgr::shutdown_watcher() => break, + tenantid = recv.recv() => { + let tenantid = tenantid.unwrap(); + // Await concurrently with all other futures + futures.push(tokio::spawn(gc_loop(tenantid))); + }, + _ = futures.next() => { + // Do nothing. We're waiting on them just to make progress + }, + } + } + }); + Ok(()) + }, + )?; + + Ok(()) +} + /// /// GC thread's main loop /// @@ -63,10 +120,11 @@ pub async fn gc_loop(tenantid: ZTenantId) -> Result<()> { if gc_horizon > 0 { let gc_result = tokio::task::spawn_blocking(move || { repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false) - }).await; + }) + .await; match gc_result { - Ok(Ok(gc_result)) => { + Ok(Ok(_gc_result)) => { // Gc success, do nothing } Ok(Err(e)) => { @@ -80,7 +138,7 @@ pub async fn gc_loop(tenantid: ZTenantId) -> Result<()> { } } - tokio::time::sleep(gc_period); + tokio::time::sleep(gc_period).await; } trace!( "GC loop stopped for tenant {} state is {:?}",