diff --git a/pageserver/src/tenant_tasks.rs b/pageserver/src/tenant_tasks.rs index ce378770fe..64e67bd6d0 100644 --- a/pageserver/src/tenant_tasks.rs +++ b/pageserver/src/tenant_tasks.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use std::ops::ControlFlow; +use std::time::Duration; use crate::repository::Repository; use crate::tenant_mgr::TenantState; @@ -30,10 +31,7 @@ static TENANT_TASK_EVENTS: Lazy = Lazy::new(|| { /// /// Compaction task's main loop /// -async fn compaction_loop( - tenantid: ZTenantId, - mut cancel: watch::Receiver<()>, -) -> anyhow::Result<()> { +async fn compaction_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) { loop { trace!("waking up"); @@ -49,20 +47,27 @@ async fn compaction_loop( }) .await; - // Handle result - match period { - Ok(Ok(ControlFlow::Continue(period))) => { - tokio::select! { - _ = cancel.changed() => { - trace!("received cancellation request"); - break; - } - _ = tokio::time::sleep(period) => {}, - } - } + // Decide whether to sleep or break + let sleep_duration = match period { + Ok(Ok(ControlFlow::Continue(period))) => period, Ok(Ok(ControlFlow::Break(()))) => break, - Ok(Err(e)) => anyhow::bail!("Compaction failed: {}", e), - Err(e) => anyhow::bail!("Compaction join error: {}", e), + Ok(Err(e)) => { + error!("Compaction failed, retrying: {}", e); + Duration::from_secs(2) + } + Err(e) => { + error!("Compaction join error, retrying: {}", e); + Duration::from_secs(2) + } + }; + + // Sleep + tokio::select! { + _ = cancel.changed() => { + trace!("received cancellation request"); + break; + }, + _ = tokio::time::sleep(sleep_duration) => {}, } } @@ -70,7 +75,6 @@ async fn compaction_loop( "compaction loop stopped. State is {:?}", tenant_mgr::get_tenant_state(tenantid) ); - Ok(()) } static START_GC_LOOP: OnceCell> = OnceCell::new(); @@ -153,13 +157,13 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> { let (cancel_send, cancel_recv) = watch::channel(()); let handle = tokio::spawn(gc_loop(tenantid, cancel_recv) .instrument(trace_span!("gc loop", tenant = %tenantid))); - - // Update metrics, remember handle + cancellation channel - TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); - futures.push(handle); if let Some(old_cancel_send) = gc_loops.insert(tenantid, cancel_send) { old_cancel_send.send(()).ok(); } + + // Update metrics, remember handle + TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); + futures.push(handle); }, tenantid = compaction_recv.recv() => { let tenantid = tenantid.expect("Compaction task channel closed unexpectedly"); @@ -168,28 +172,22 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> { let (cancel_send, cancel_recv) = watch::channel(()); let handle = tokio::spawn(compaction_loop(tenantid, cancel_recv) .instrument(trace_span!("compaction loop", tenant = %tenantid))); - - // Update metrics, remember handle + cancellation channel - TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); - futures.push(handle); if let Some(old_cancel_send) = compaction_loops.insert(tenantid, cancel_send) { old_cancel_send.send(()).ok(); } + + // Update metrics, remember handle + TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); + futures.push(handle); }, result = futures.next() => { - // Log any errors and panics + // Log and count any unhandled panics match result { - Some(Ok(Ok(()))) => { + Some(Ok(())) => { TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc(); }, - Some(Ok(Err(e))) => { - TENANT_TASK_EVENTS.with_label_values(&["fail"]).inc(); - // TODO which tenant failed? - error!("loop error {}", e) - }, Some(Err(e)) => { - TENANT_TASK_EVENTS.with_label_values(&["fail"]).inc(); - // TODO which tenant failed? + TENANT_TASK_EVENTS.with_label_values(&["panic"]).inc(); error!("loop join error {}", e) }, None => {}, @@ -208,7 +206,7 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> { /// /// GC task's main loop /// -async fn gc_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) -> anyhow::Result<()> { +async fn gc_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) { loop { trace!("waking up"); @@ -229,25 +227,31 @@ async fn gc_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) -> anyhow }) .await; - // Handle result - match period { - Ok(Ok(ControlFlow::Continue(period))) => { - tokio::select! { - _ = cancel.changed() => { - trace!("received cancellation request"); - break; - } - _ = tokio::time::sleep(period) => {}, - } - } + // Decide whether to sleep or break + let sleep_duration = match period { + Ok(Ok(ControlFlow::Continue(period))) => period, Ok(Ok(ControlFlow::Break(()))) => break, - Ok(Err(e)) => anyhow::bail!("Gc failed: {}", e), - Err(e) => anyhow::bail!("Gc join error: {}", e), + Ok(Err(e)) => { + error!("Gc failed, retrying: {}", e); + Duration::from_secs(2) + } + Err(e) => { + error!("Gc join error, retrying: {}", e); + Duration::from_secs(2) + } + }; + + // Sleep + tokio::select! { + _ = cancel.changed() => { + trace!("received cancellation request"); + break; + }, + _ = tokio::time::sleep(sleep_duration) => {}, } } trace!( "GC loop stopped. State is {:?}", tenant_mgr::get_tenant_state(tenantid) ); - Ok(()) }