diff --git a/pageserver/src/tenant_threads.rs b/pageserver/src/tenant_threads.rs index 2b82b706fc..69e2e0f5f8 100644 --- a/pageserver/src/tenant_threads.rs +++ b/pageserver/src/tenant_threads.rs @@ -9,6 +9,8 @@ use crate::tenant_mgr::TenantState; use crate::thread_mgr::ThreadKind; use crate::{tenant_mgr, thread_mgr}; use anyhow::{self, Context}; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use once_cell::sync::OnceCell; use tokio::sync::mpsc; use tokio::sync::watch; @@ -50,15 +52,9 @@ async fn compaction_loop( _ = tokio::time::sleep(period) => {}, } } - Ok(Ok(ControlFlow::Break(()))) => { - break; - } - Ok(Err(e)) => { - anyhow::bail!("Compaction failed: {}", e); - } - Err(e) => { - anyhow::bail!("Compaction join error: {}", e); - } + Ok(Ok(ControlFlow::Break(()))) => break, + Ok(Err(e)) => anyhow::bail!("Compaction failed: {}", e), + Err(e) => anyhow::bail!("Compaction join error: {}", e), } } @@ -128,6 +124,7 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> { true, move || { runtime.block_on(async move { + let mut futures = FuturesUnordered::new(); loop { tokio::select! { _ = thread_mgr::shutdown_watcher() => { @@ -146,8 +143,9 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> { // Spawn new task, request cancellation of the old one if exists let (cancel_send, cancel_recv) = watch::channel(()); - let _handle = tokio::spawn(gc_loop(tenantid, cancel_recv) + let handle = tokio::spawn(gc_loop(tenantid, cancel_recv) .instrument(trace_span!("gc loop", tenant = %tenantid))); + futures.push(handle); if let Some(old_cancel_send) = gc_loops.insert(tenantid, cancel_send) { old_cancel_send.send(()).ok(); } @@ -157,13 +155,22 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> { // Spawn new task, request cancellation of the old one if exists let (cancel_send, cancel_recv) = watch::channel(()); - let _handle = tokio::spawn(compaction_loop(tenantid, cancel_recv) + let handle = tokio::spawn(compaction_loop(tenantid, cancel_recv) .instrument(trace_span!("compaction loop", tenant = %tenantid))); + futures.push(handle); if let Some(old_cancel_send) = compaction_loops.insert(tenantid, cancel_send) { old_cancel_send.send(()).ok(); } }, - // TODO await return values? Report errors? + result = futures.next() => { + // Log any errors and panics + match result { + Some(Ok(Ok(()))) => {}, + Some(Ok(Err(e))) => error!("loop error {}", e), + Some(Err(e)) => error!("loop join error {}", e), + None => {}, + }; + }, } } }); @@ -209,15 +216,9 @@ async fn gc_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) -> anyhow _ = tokio::time::sleep(period) => {}, } } - Ok(Ok(ControlFlow::Break(()))) => { - break; - } - Ok(Err(e)) => { - anyhow::bail!("Gc failed: {}", e); - } - Err(e) => { - anyhow::bail!("Gc join error: {}", e); - } + Ok(Ok(ControlFlow::Break(()))) => break, + Ok(Err(e)) => anyhow::bail!("Gc failed: {}", e), + Err(e) => anyhow::bail!("Gc join error: {}", e), } } trace!(