cleaner error propagation in thread creation

This commit is contained in:
Alek Westover
2023-07-05 09:56:03 -04:00
parent 7e4b55a933
commit 3d402f39e6
4 changed files with 24 additions and 29 deletions

View File

@@ -244,7 +244,8 @@ fn main() -> Result<()> {
let _configurator_handle =
launch_configurator(&compute).expect("cannot launch configurator thread");
let _download_extensions_handle = launch_download_extensions(&compute);
let _download_extensions_handle =
launch_download_extensions(&compute).expect("cannot launch download extensions thread");
// Start Postgres
let mut delay_exit = false;

View File

@@ -42,13 +42,15 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
}
}
pub fn launch_configurator(compute: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
pub fn launch_configurator(
compute: &Arc<ComputeNode>,
) -> Result<thread::JoinHandle<()>, std::io::Error> {
let compute = Arc::clone(compute);
Ok(thread::Builder::new()
thread::Builder::new()
.name("compute-configurator".into())
.spawn(move || {
configurator_main_loop(&compute);
info!("configurator thread is exited");
})?)
})
}

View File

@@ -1,5 +1,6 @@
// Download extension files from the extension store
// and put them in the right place in the postgres directory
use crate::compute::ComputeNode;
use anyhow::{self, bail, Context, Result};
use futures::future::join_all;
use remote_storage::*;
@@ -10,13 +11,10 @@ use std::io::{BufWriter, Write};
use std::num::{NonZeroU32, NonZeroUsize};
use std::path::{Path, PathBuf};
use std::str;
use tokio::io::AsyncReadExt;
use tracing::info;
use crate::compute::ComputeNode;
use std::sync::Arc;
use std::thread;
use tracing::instrument;
use tokio::io::AsyncReadExt;
use tracing::info;
// remote!
const SHARE_EXT_PATH: &str = "share/extension";
@@ -432,24 +430,18 @@ async fn organized_extension_files(
Ok((grouped_dependencies, control_files))
}
#[instrument(skip_all)]
fn prepare_external_extensions(compute: &Arc<ComputeNode>) -> Result<()> {
info!("start download_extension_files");
let compute_state = compute.state.lock().unwrap();
compute.prepare_external_extensions(&compute_state)?;
info!("download_extension_files done");
Ok(())
}
pub fn launch_download_extensions(compute: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
pub fn launch_download_extensions(
compute: &Arc<ComputeNode>,
) -> Result<thread::JoinHandle<()>, std::io::Error> {
let compute = Arc::clone(compute);
Ok(thread::Builder::new()
thread::Builder::new()
.name("download-extensions".into())
.spawn(move || {
// FIX unwrap
prepare_external_extensions(&compute).unwrap();
info!("download_extensions_thread is exited");
})?)
info!("start download_extension_files");
let compute_state = compute.state.lock().expect("error unlocking compute.state");
compute
.prepare_external_extensions(&compute_state)
.expect("error preparing extensions");
info!("download_extension_files done, exiting thread");
})
}

View File

@@ -105,10 +105,10 @@ fn watch_compute_activity(compute: &ComputeNode) {
}
/// Launch a separate compute monitor thread and return its `JoinHandle`.
pub fn launch_monitor(state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
pub fn launch_monitor(state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>, std::io::Error> {
let state = Arc::clone(state);
Ok(thread::Builder::new()
thread::Builder::new()
.name("compute-monitor".into())
.spawn(move || watch_compute_activity(&state))?)
.spawn(move || watch_compute_activity(&state))
}