diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 5dd8ca4499..d3b779e363 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -172,21 +172,13 @@ impl Cli { } } -fn main() -> Result<()> { +#[tokio::main] +async fn main() -> Result<()> { let cli = Cli::parse(); let scenario = failpoint_support::init(); - // For historical reasons, the main thread that processes the config and launches postgres - // is synchronous, but we always have this tokio runtime available and we "enter" it so - // that you can use tokio::spawn() and tokio::runtime::Handle::current().block_on(...) - // from all parts of compute_ctl. - let runtime = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build()?; - let _rt_guard = runtime.enter(); - - runtime.block_on(init())?; + init().await?; // enable core dumping for all child processes setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?; @@ -218,7 +210,7 @@ fn main() -> Result<()> { config, )?; - let exit_code = compute_node.run()?; + let exit_code = compute_node.run().await?; scenario.teardown(); diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index bd6ed910be..bf6a88261a 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -15,9 +15,6 @@ use itertools::Itertools; use nix::sys::signal::{Signal, kill}; use nix::unistd::Pid; use once_cell::sync::Lazy; -use postgres; -use postgres::NoTls; -use postgres::error::SqlState; use remote_storage::{DownloadError, RemotePath}; use std::collections::{HashMap, HashSet}; use std::net::SocketAddr; @@ -30,6 +27,7 @@ use std::sync::{Arc, Condvar, Mutex, RwLock}; use std::time::{Duration, Instant}; use std::{env, fs}; use tokio::spawn; +use tokio_postgres::{NoTls, error::SqlState}; use tracing::{Instrument, debug, error, info, instrument, warn}; use url::Url; use utils::id::{TenantId, TimelineId}; @@ -386,7 +384,7 @@ impl ComputeNode { /// Top-level control flow of compute_ctl. Returns a process exit code we should /// exit with. - pub fn run(self) -> Result> { + pub async fn run(self) -> Result> { let this = Arc::new(self); let cli_spec = this.state.lock().unwrap().pspec.clone(); @@ -438,7 +436,7 @@ impl ComputeNode { let mut vm_monitor = None; let mut pg_process: Option = None; - match this.start_compute(&mut pg_process) { + match this.start_compute(&mut pg_process).await { Ok(()) => { // Success! Launch remaining services (just vm-monitor currently) vm_monitor = @@ -487,7 +485,7 @@ impl ComputeNode { } // Reap the postgres process - delay_exit |= this.cleanup_after_postgres_exit()?; + delay_exit |= this.cleanup_after_postgres_exit().await?; // If launch failed, keep serving HTTP requests for a while, so the cloud // control plane can get the actual error. @@ -539,7 +537,7 @@ impl ComputeNode { /// /// Note that this is in the critical path of a compute cold start. Keep this fast. /// Try to do things concurrently, to hide the latencies. - fn start_compute(self: &Arc, pg_handle: &mut Option) -> Result<()> { + async fn start_compute(self: &Arc, pg_handle: &mut Option) -> Result<()> { let compute_state: ComputeState; let start_compute_span; @@ -618,7 +616,7 @@ impl ComputeNode { // Prepare pgdata directory. This downloads the basebackup, among other things. { let (this, cs) = (self.clone(), compute_state.clone()); - pre_tasks.spawn_blocking_child(move || this.prepare_pgdata(&cs)); + pre_tasks.spawn(async move { this.prepare_pgdata(&cs).await }); } // Resize swap to the desired size if the compute spec says so @@ -732,8 +730,7 @@ impl ComputeNode { let _configurator_handle = launch_configurator(self); // Wait for all the pre-tasks to finish before starting postgres - let rt = tokio::runtime::Handle::current(); - while let Some(res) = rt.block_on(pre_tasks.join_next()) { + while let Some(res) = pre_tasks.join_next().await { res??; } @@ -862,14 +859,14 @@ impl ComputeNode { } } - fn cleanup_after_postgres_exit(&self) -> Result { + async fn cleanup_after_postgres_exit(&self) -> Result { // Maybe sync safekeepers again, to speed up next startup let compute_state = self.state.lock().unwrap().clone(); let pspec = compute_state.pspec.as_ref().expect("spec must be set"); if matches!(pspec.spec.mode, compute_api::spec::ComputeMode::Primary) { info!("syncing safekeepers on shutdown"); let storage_auth_token = pspec.storage_auth_token.clone(); - let lsn = self.sync_safekeepers(storage_auth_token)?; + let lsn = self.sync_safekeepers(storage_auth_token).await?; info!("synced safekeepers at lsn {lsn}"); } @@ -964,7 +961,7 @@ impl ComputeNode { } // Connect to pageserver - let mut client = config.connect(NoTls)?; + let mut client = config.connect(postgres::NoTls)?; let pageserver_connect_micros = start_time.elapsed().as_micros() as u64; let basebackup_cmd = match lsn { @@ -1128,11 +1125,13 @@ impl ComputeNode { // Fast path for sync_safekeepers. If they're already synced we get the lsn // in one roundtrip. If not, we should do a full sync_safekeepers. #[instrument(skip_all)] - pub fn check_safekeepers_synced(&self, compute_state: &ComputeState) -> Result> { + pub async fn check_safekeepers_synced( + &self, + compute_state: &ComputeState, + ) -> Result> { let start_time = Utc::now(); - let rt = tokio::runtime::Handle::current(); - let result = rt.block_on(self.check_safekeepers_synced_async(compute_state)); + let result = self.check_safekeepers_synced_async(compute_state).await; // Record runtime self.state.lock().unwrap().metrics.sync_sk_check_ms = Utc::now() @@ -1146,7 +1145,7 @@ impl ComputeNode { // Run `postgres` in a special mode with `--sync-safekeepers` argument // and return the reported LSN back to the caller. #[instrument(skip_all)] - pub fn sync_safekeepers(&self, storage_auth_token: Option) -> Result { + pub async fn sync_safekeepers(&self, storage_auth_token: Option) -> Result { let start_time = Utc::now(); let mut sync_handle = maybe_cgexec(&self.params.pgbin) @@ -1178,8 +1177,8 @@ impl ComputeNode { SYNC_SAFEKEEPERS_PID.store(0, Ordering::SeqCst); // Process has exited, so we can join the logs thread. - let _ = tokio::runtime::Handle::current() - .block_on(logs_handle) + let _ = logs_handle + .await .map_err(|e| tracing::error!("log task panicked: {:?}", e)); if !sync_output.status.success() { @@ -1205,7 +1204,7 @@ impl ComputeNode { /// Do all the preparations like PGDATA directory creation, configuration, /// safekeepers sync, basebackup, etc. #[instrument(skip_all)] - pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> { + pub async fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> { let pspec = compute_state.pspec.as_ref().expect("spec must be set"); let spec = &pspec.spec; let pgdata_path = Path::new(&self.params.pgdata); @@ -1227,11 +1226,13 @@ impl ComputeNode { let lsn = match spec.mode { ComputeMode::Primary => { info!("checking if safekeepers are synced"); - let lsn = if let Ok(Some(lsn)) = self.check_safekeepers_synced(compute_state) { + let lsn = if let Ok(Some(lsn)) = self.check_safekeepers_synced(compute_state).await + { lsn } else { info!("starting safekeepers syncing"); self.sync_safekeepers(pspec.storage_auth_token.clone()) + .await .with_context(|| "failed to sync safekeepers")? }; info!("safekeepers synced at LSN {}", lsn);