Log errors

This commit is contained in:
Bojan Serafimov
2022-06-24 11:05:07 -04:00
parent 796ee4d8af
commit 3a23869780

View File

@@ -9,6 +9,8 @@ use crate::tenant_mgr::TenantState;
use crate::thread_mgr::ThreadKind;
use crate::{tenant_mgr, thread_mgr};
use anyhow::{self, Context};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use once_cell::sync::OnceCell;
use tokio::sync::mpsc;
use tokio::sync::watch;
@@ -50,15 +52,9 @@ async fn compaction_loop(
_ = tokio::time::sleep(period) => {},
}
}
Ok(Ok(ControlFlow::Break(()))) => {
break;
}
Ok(Err(e)) => {
anyhow::bail!("Compaction failed: {}", e);
}
Err(e) => {
anyhow::bail!("Compaction join error: {}", e);
}
Ok(Ok(ControlFlow::Break(()))) => break,
Ok(Err(e)) => anyhow::bail!("Compaction failed: {}", e),
Err(e) => anyhow::bail!("Compaction join error: {}", e),
}
}
@@ -128,6 +124,7 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> {
true,
move || {
runtime.block_on(async move {
let mut futures = FuturesUnordered::new();
loop {
tokio::select! {
_ = thread_mgr::shutdown_watcher() => {
@@ -146,8 +143,9 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> {
// Spawn new task, request cancellation of the old one if exists
let (cancel_send, cancel_recv) = watch::channel(());
let _handle = tokio::spawn(gc_loop(tenantid, cancel_recv)
let handle = tokio::spawn(gc_loop(tenantid, cancel_recv)
.instrument(trace_span!("gc loop", tenant = %tenantid)));
futures.push(handle);
if let Some(old_cancel_send) = gc_loops.insert(tenantid, cancel_send) {
old_cancel_send.send(()).ok();
}
@@ -157,13 +155,22 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> {
// Spawn new task, request cancellation of the old one if exists
let (cancel_send, cancel_recv) = watch::channel(());
let _handle = tokio::spawn(compaction_loop(tenantid, cancel_recv)
let handle = tokio::spawn(compaction_loop(tenantid, cancel_recv)
.instrument(trace_span!("compaction loop", tenant = %tenantid)));
futures.push(handle);
if let Some(old_cancel_send) = compaction_loops.insert(tenantid, cancel_send) {
old_cancel_send.send(()).ok();
}
},
// TODO await return values? Report errors?
result = futures.next() => {
// Log any errors and panics
match result {
Some(Ok(Ok(()))) => {},
Some(Ok(Err(e))) => error!("loop error {}", e),
Some(Err(e)) => error!("loop join error {}", e),
None => {},
};
},
}
}
});
@@ -209,15 +216,9 @@ async fn gc_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) -> anyhow
_ = tokio::time::sleep(period) => {},
}
}
Ok(Ok(ControlFlow::Break(()))) => {
break;
}
Ok(Err(e)) => {
anyhow::bail!("Gc failed: {}", e);
}
Err(e) => {
anyhow::bail!("Gc join error: {}", e);
}
Ok(Ok(ControlFlow::Break(()))) => break,
Ok(Err(e)) => anyhow::bail!("Gc failed: {}", e),
Err(e) => anyhow::bail!("Gc join error: {}", e),
}
}
trace!(