Handle errors inside loop

This commit is contained in:
Bojan Serafimov
2022-06-24 17:08:44 -04:00
parent cdc81996b4
commit 98062865f4

View File

@@ -3,6 +3,7 @@
use std::collections::HashMap;
use std::ops::ControlFlow;
use std::time::Duration;
use crate::repository::Repository;
use crate::tenant_mgr::TenantState;
@@ -30,10 +31,7 @@ static TENANT_TASK_EVENTS: Lazy<IntCounterVec> = Lazy::new(|| {
///
/// Compaction task's main loop
///
async fn compaction_loop(
tenantid: ZTenantId,
mut cancel: watch::Receiver<()>,
) -> anyhow::Result<()> {
async fn compaction_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) {
loop {
trace!("waking up");
@@ -49,20 +47,27 @@ async fn compaction_loop(
})
.await;
// Handle result
match period {
Ok(Ok(ControlFlow::Continue(period))) => {
tokio::select! {
_ = cancel.changed() => {
trace!("received cancellation request");
break;
}
_ = tokio::time::sleep(period) => {},
}
}
// Decide whether to sleep or break
let sleep_duration = match period {
Ok(Ok(ControlFlow::Continue(period))) => period,
Ok(Ok(ControlFlow::Break(()))) => break,
Ok(Err(e)) => anyhow::bail!("Compaction failed: {}", e),
Err(e) => anyhow::bail!("Compaction join error: {}", e),
Ok(Err(e)) => {
error!("Compaction failed, retrying: {}", e);
Duration::from_secs(2)
}
Err(e) => {
error!("Compaction join error, retrying: {}", e);
Duration::from_secs(2)
}
};
// Sleep
tokio::select! {
_ = cancel.changed() => {
trace!("received cancellation request");
break;
},
_ = tokio::time::sleep(sleep_duration) => {},
}
}
@@ -70,7 +75,6 @@ async fn compaction_loop(
"compaction loop stopped. State is {:?}",
tenant_mgr::get_tenant_state(tenantid)
);
Ok(())
}
static START_GC_LOOP: OnceCell<mpsc::Sender<ZTenantId>> = OnceCell::new();
@@ -153,13 +157,13 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> {
let (cancel_send, cancel_recv) = watch::channel(());
let handle = tokio::spawn(gc_loop(tenantid, cancel_recv)
.instrument(trace_span!("gc loop", tenant = %tenantid)));
// Update metrics, remember handle + cancellation channel
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
futures.push(handle);
if let Some(old_cancel_send) = gc_loops.insert(tenantid, cancel_send) {
old_cancel_send.send(()).ok();
}
// Update metrics, remember handle
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
futures.push(handle);
},
tenantid = compaction_recv.recv() => {
let tenantid = tenantid.expect("Compaction task channel closed unexpectedly");
@@ -168,28 +172,22 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> {
let (cancel_send, cancel_recv) = watch::channel(());
let handle = tokio::spawn(compaction_loop(tenantid, cancel_recv)
.instrument(trace_span!("compaction loop", tenant = %tenantid)));
// Update metrics, remember handle + cancellation channel
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
futures.push(handle);
if let Some(old_cancel_send) = compaction_loops.insert(tenantid, cancel_send) {
old_cancel_send.send(()).ok();
}
// Update metrics, remember handle
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
futures.push(handle);
},
result = futures.next() => {
// Log any errors and panics
// Log and count any unhandled panics
match result {
Some(Ok(Ok(()))) => {
Some(Ok(())) => {
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
},
Some(Ok(Err(e))) => {
TENANT_TASK_EVENTS.with_label_values(&["fail"]).inc();
// TODO which tenant failed?
error!("loop error {}", e)
},
Some(Err(e)) => {
TENANT_TASK_EVENTS.with_label_values(&["fail"]).inc();
// TODO which tenant failed?
TENANT_TASK_EVENTS.with_label_values(&["panic"]).inc();
error!("loop join error {}", e)
},
None => {},
@@ -208,7 +206,7 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> {
///
/// GC task's main loop
///
async fn gc_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) -> anyhow::Result<()> {
async fn gc_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) {
loop {
trace!("waking up");
@@ -229,25 +227,31 @@ async fn gc_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) -> anyhow
})
.await;
// Handle result
match period {
Ok(Ok(ControlFlow::Continue(period))) => {
tokio::select! {
_ = cancel.changed() => {
trace!("received cancellation request");
break;
}
_ = tokio::time::sleep(period) => {},
}
}
// Decide whether to sleep or break
let sleep_duration = match period {
Ok(Ok(ControlFlow::Continue(period))) => period,
Ok(Ok(ControlFlow::Break(()))) => break,
Ok(Err(e)) => anyhow::bail!("Gc failed: {}", e),
Err(e) => anyhow::bail!("Gc join error: {}", e),
Ok(Err(e)) => {
error!("Gc failed, retrying: {}", e);
Duration::from_secs(2)
}
Err(e) => {
error!("Gc join error, retrying: {}", e);
Duration::from_secs(2)
}
};
// Sleep
tokio::select! {
_ = cancel.changed() => {
trace!("received cancellation request");
break;
},
_ = tokio::time::sleep(sleep_duration) => {},
}
}
trace!(
"GC loop stopped. State is {:?}",
tenant_mgr::get_tenant_state(tenantid)
);
Ok(())
}