diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 0d70ca430a..8b50776336 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -51,6 +51,7 @@ use tracing::{error, info}; use url::Url; use compute_api::responses::ComputeStatus; +use compute_api::spec::ComputeSpec; use compute_tools::compute::{ forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID, @@ -68,6 +69,20 @@ use compute_tools::spec::*; const BUILD_TAG_DEFAULT: &str = "latest"; fn main() -> Result<()> { + let (build_tag, clap_args) = init()?; + + let (startup_context_guard, cli_result) = process_cli(&clap_args)?; + + let wait_spec_result = wait_spec(build_tag, cli_result)?; + + let (pg_handle, start_pg_result) = start_postgres(&clap_args, wait_spec_result)?; + + let wait_pg_result = wait_postgres(pg_handle, startup_context_guard)?; + + cleanup_and_exit(start_pg_result, wait_pg_result) +} + +fn init() -> Result<(String, clap::ArgMatches)> { init_tracing_and_logging(DEFAULT_LOG_LEVEL)?; let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?; @@ -82,7 +97,12 @@ fn main() -> Result<()> { .to_string(); info!("build_tag: {build_tag}"); - let matches = cli().get_matches(); + Ok((build_tag, cli().get_matches())) +} + +fn process_cli( + matches: &clap::ArgMatches, +) -> Result<(Option, ProcessCliResult)> { let pgbin_default = "postgres"; let pgbin = matches .get_one::("pgbin") @@ -202,6 +222,47 @@ fn main() -> Result<()> { } }; + let result = ProcessCliResult { + // directly from CLI: + connstr, + pgdata, + pgbin, + ext_remote_storage, + http_port, + // others: + spec, + live_config_allowed, + }; + + // TODO: Move startup_context_guard out of this function. It's here right now only because + // that's where it was before, and moving it would've made the diff big. + Ok((startup_context_guard, result)) +} + +struct ProcessCliResult<'clap> { + connstr: &'clap str, + pgdata: &'clap str, + pgbin: &'clap str, + ext_remote_storage: Option<&'clap str>, + http_port: u16, + + /// If a spec was provided via CLI or file, the [`ComputeSpec`] + spec: Option, + live_config_allowed: bool, +} + +fn wait_spec( + build_tag: String, + ProcessCliResult { + connstr, + pgdata, + pgbin, + ext_remote_storage, + http_port, + spec, + live_config_allowed, + }: ProcessCliResult, +) -> Result { let mut new_state = ComputeState::new(); let spec_set; @@ -256,6 +317,19 @@ fn main() -> Result<()> { } } + Ok(WaitSpecResult { compute, http_port }) +} + +struct WaitSpecResult { + compute: Arc, + // passed through from ProcessCliResult + http_port: u16, +} + +fn start_postgres( + matches: &clap::ArgMatches, + WaitSpecResult { compute, http_port }: WaitSpecResult, +) -> Result<(Option, StartPostgresResult)> { // We got all we need, update the state. let mut state = compute.state.lock().unwrap(); @@ -336,7 +410,7 @@ fn main() -> Result<()> { // This token is used internally by the monitor to clean up all threads let token = CancellationToken::new(); - let vm_monitor = &rt.as_ref().map(|rt| { + let vm_monitor = rt.as_ref().map(|rt| { rt.spawn(vm_monitor::start( Box::leak(Box::new(vm_monitor::Args { cgroup: cgroup.cloned(), @@ -349,11 +423,47 @@ fn main() -> Result<()> { } } + Ok(( + pg, + StartPostgresResult { + delay_exit, + compute, + #[cfg(target_os = "linux")] + rt, + #[cfg(target_os = "linux")] + token, + #[cfg(target_os = "linux")] + vm_monitor, + }, + )) +} + +type PostgresHandle = (std::process::Child, std::thread::JoinHandle<()>); + +struct StartPostgresResult { + delay_exit: bool, + // passed through from WaitSpecResult + compute: Arc, + + #[cfg(target_os = "linux")] + rt: Option, + #[cfg(target_os = "linux")] + token: tokio_util::sync::CancellationToken, + #[cfg(target_os = "linux")] + vm_monitor: Option>>, +} + +fn wait_postgres( + pg: Option, + startup_context_guard: Option, +) -> Result { // Wait for the child Postgres process forever. In this state Ctrl+C will // propagate to Postgres and it will be shut down as well. let mut exit_code = None; if let Some((mut pg, logs_handle)) = pg { // Startup is finished, exit the startup tracing span + // TODO: Probably easier to drop startup_context_guard outside this function. It's here + // right now because keeping it here reduced the size of the diff. drop(startup_context_guard); let ecode = pg @@ -370,6 +480,26 @@ fn main() -> Result<()> { exit_code = ecode.code() } + Ok(WaitPostgresResult { exit_code }) +} + +struct WaitPostgresResult { + exit_code: Option, +} + +fn cleanup_and_exit( + StartPostgresResult { + mut delay_exit, + compute, + #[cfg(target_os = "linux")] + vm_monitor, + #[cfg(target_os = "linux")] + token, + #[cfg(target_os = "linux")] + rt, + }: StartPostgresResult, + WaitPostgresResult { exit_code }: WaitPostgresResult, +) -> Result<()> { // Terminate the vm_monitor so it releases the file watcher on // /sys/fs/cgroup/neon-postgres. // Note: the vm-monitor only runs on linux because it requires cgroups.