From c9472434c99d3c92168101eb76e846150619aa79 Mon Sep 17 00:00:00 2001 From: Em Sharnoff Date: Wed, 1 May 2024 12:01:18 -0700 Subject: [PATCH] compute_ctl: Break up main() into discrete phases This commit is intentionally designed to have as small a diff as possible. To that end, the basic idea is that each distinct "chunk" of the previous main() has been wrapped in its own function, with the return values from each function being passed directly into the next. The structure of main() is now visible from its contents: 1. init() 2. process_cli() 3. wait_spec() 4. start_postgres() 5. wait_postgres() 6. cleanup_and_exit() There's a lot of other work that can / should(?) be done beyond this, but I figure that's more opinionated, and this should be a solid start. --- compute_tools/src/bin/compute_ctl.rs | 134 ++++++++++++++++++++++++++- 1 file changed, 132 insertions(+), 2 deletions(-) diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 0d70ca430a..8b50776336 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -51,6 +51,7 @@ use tracing::{error, info}; use url::Url; use compute_api::responses::ComputeStatus; +use compute_api::spec::ComputeSpec; use compute_tools::compute::{ forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID, @@ -68,6 +69,20 @@ use compute_tools::spec::*; const BUILD_TAG_DEFAULT: &str = "latest"; fn main() -> Result<()> { + let (build_tag, clap_args) = init()?; + + let (startup_context_guard, cli_result) = process_cli(&clap_args)?; + + let wait_spec_result = wait_spec(build_tag, cli_result)?; + + let (pg_handle, start_pg_result) = start_postgres(&clap_args, wait_spec_result)?; + + let wait_pg_result = wait_postgres(pg_handle, startup_context_guard)?; + + cleanup_and_exit(start_pg_result, wait_pg_result) +} + +fn init() -> Result<(String, clap::ArgMatches)> { init_tracing_and_logging(DEFAULT_LOG_LEVEL)?; let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?; @@ -82,7 +97,12 @@ fn main() -> Result<()> { .to_string(); info!("build_tag: {build_tag}"); - let matches = cli().get_matches(); + Ok((build_tag, cli().get_matches())) +} + +fn process_cli( + matches: &clap::ArgMatches, +) -> Result<(Option, ProcessCliResult)> { let pgbin_default = "postgres"; let pgbin = matches .get_one::("pgbin") @@ -202,6 +222,47 @@ fn main() -> Result<()> { } }; + let result = ProcessCliResult { + // directly from CLI: + connstr, + pgdata, + pgbin, + ext_remote_storage, + http_port, + // others: + spec, + live_config_allowed, + }; + + // TODO: Move startup_context_guard out of this function. It's here right now only because + // that's where it was before, and moving it would've made the diff big. + Ok((startup_context_guard, result)) +} + +struct ProcessCliResult<'clap> { + connstr: &'clap str, + pgdata: &'clap str, + pgbin: &'clap str, + ext_remote_storage: Option<&'clap str>, + http_port: u16, + + /// If a spec was provided via CLI or file, the [`ComputeSpec`] + spec: Option, + live_config_allowed: bool, +} + +fn wait_spec( + build_tag: String, + ProcessCliResult { + connstr, + pgdata, + pgbin, + ext_remote_storage, + http_port, + spec, + live_config_allowed, + }: ProcessCliResult, +) -> Result { let mut new_state = ComputeState::new(); let spec_set; @@ -256,6 +317,19 @@ fn main() -> Result<()> { } } + Ok(WaitSpecResult { compute, http_port }) +} + +struct WaitSpecResult { + compute: Arc, + // passed through from ProcessCliResult + http_port: u16, +} + +fn start_postgres( + matches: &clap::ArgMatches, + WaitSpecResult { compute, http_port }: WaitSpecResult, +) -> Result<(Option, StartPostgresResult)> { // We got all we need, update the state. let mut state = compute.state.lock().unwrap(); @@ -336,7 +410,7 @@ fn main() -> Result<()> { // This token is used internally by the monitor to clean up all threads let token = CancellationToken::new(); - let vm_monitor = &rt.as_ref().map(|rt| { + let vm_monitor = rt.as_ref().map(|rt| { rt.spawn(vm_monitor::start( Box::leak(Box::new(vm_monitor::Args { cgroup: cgroup.cloned(), @@ -349,11 +423,47 @@ fn main() -> Result<()> { } } + Ok(( + pg, + StartPostgresResult { + delay_exit, + compute, + #[cfg(target_os = "linux")] + rt, + #[cfg(target_os = "linux")] + token, + #[cfg(target_os = "linux")] + vm_monitor, + }, + )) +} + +type PostgresHandle = (std::process::Child, std::thread::JoinHandle<()>); + +struct StartPostgresResult { + delay_exit: bool, + // passed through from WaitSpecResult + compute: Arc, + + #[cfg(target_os = "linux")] + rt: Option, + #[cfg(target_os = "linux")] + token: tokio_util::sync::CancellationToken, + #[cfg(target_os = "linux")] + vm_monitor: Option>>, +} + +fn wait_postgres( + pg: Option, + startup_context_guard: Option, +) -> Result { // Wait for the child Postgres process forever. In this state Ctrl+C will // propagate to Postgres and it will be shut down as well. let mut exit_code = None; if let Some((mut pg, logs_handle)) = pg { // Startup is finished, exit the startup tracing span + // TODO: Probably easier to drop startup_context_guard outside this function. It's here + // right now because keeping it here reduced the size of the diff. drop(startup_context_guard); let ecode = pg @@ -370,6 +480,26 @@ fn main() -> Result<()> { exit_code = ecode.code() } + Ok(WaitPostgresResult { exit_code }) +} + +struct WaitPostgresResult { + exit_code: Option, +} + +fn cleanup_and_exit( + StartPostgresResult { + mut delay_exit, + compute, + #[cfg(target_os = "linux")] + vm_monitor, + #[cfg(target_os = "linux")] + token, + #[cfg(target_os = "linux")] + rt, + }: StartPostgresResult, + WaitPostgresResult { exit_code }: WaitPostgresResult, +) -> Result<()> { // Terminate the vm_monitor so it releases the file watcher on // /sys/fs/cgroup/neon-postgres. // Note: the vm-monitor only runs on linux because it requires cgroups.