Mark compute_ctl::main as #[tokio::main]

Historically, it was not the case, but we use async code extensively
within compute_ctl, so might as well make it easy for people in the
future to add async code.

Signed-off-by: Tristan Partin <tristan@neon.tech>
This commit is contained in:
Tristan Partin
2025-05-19 13:38:51 -05:00
parent ab898e40b0
commit 681edf3983
2 changed files with 26 additions and 33 deletions

View File

@@ -172,21 +172,13 @@ impl Cli {
}
}
fn main() -> Result<()> {
#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
let scenario = failpoint_support::init();
// For historical reasons, the main thread that processes the config and launches postgres
// is synchronous, but we always have this tokio runtime available and we "enter" it so
// that you can use tokio::spawn() and tokio::runtime::Handle::current().block_on(...)
// from all parts of compute_ctl.
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
let _rt_guard = runtime.enter();
runtime.block_on(init())?;
init().await?;
// enable core dumping for all child processes
setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
@@ -218,7 +210,7 @@ fn main() -> Result<()> {
config,
)?;
let exit_code = compute_node.run()?;
let exit_code = compute_node.run().await?;
scenario.teardown();

View File

@@ -15,9 +15,6 @@ use itertools::Itertools;
use nix::sys::signal::{Signal, kill};
use nix::unistd::Pid;
use once_cell::sync::Lazy;
use postgres;
use postgres::NoTls;
use postgres::error::SqlState;
use remote_storage::{DownloadError, RemotePath};
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
@@ -30,6 +27,7 @@ use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::time::{Duration, Instant};
use std::{env, fs};
use tokio::spawn;
use tokio_postgres::{NoTls, error::SqlState};
use tracing::{Instrument, debug, error, info, instrument, warn};
use url::Url;
use utils::id::{TenantId, TimelineId};
@@ -386,7 +384,7 @@ impl ComputeNode {
/// Top-level control flow of compute_ctl. Returns a process exit code we should
/// exit with.
pub fn run(self) -> Result<Option<i32>> {
pub async fn run(self) -> Result<Option<i32>> {
let this = Arc::new(self);
let cli_spec = this.state.lock().unwrap().pspec.clone();
@@ -438,7 +436,7 @@ impl ComputeNode {
let mut vm_monitor = None;
let mut pg_process: Option<PostgresHandle> = None;
match this.start_compute(&mut pg_process) {
match this.start_compute(&mut pg_process).await {
Ok(()) => {
// Success! Launch remaining services (just vm-monitor currently)
vm_monitor =
@@ -487,7 +485,7 @@ impl ComputeNode {
}
// Reap the postgres process
delay_exit |= this.cleanup_after_postgres_exit()?;
delay_exit |= this.cleanup_after_postgres_exit().await?;
// If launch failed, keep serving HTTP requests for a while, so the cloud
// control plane can get the actual error.
@@ -539,7 +537,7 @@ impl ComputeNode {
///
/// Note that this is in the critical path of a compute cold start. Keep this fast.
/// Try to do things concurrently, to hide the latencies.
fn start_compute(self: &Arc<Self>, pg_handle: &mut Option<PostgresHandle>) -> Result<()> {
async fn start_compute(self: &Arc<Self>, pg_handle: &mut Option<PostgresHandle>) -> Result<()> {
let compute_state: ComputeState;
let start_compute_span;
@@ -618,7 +616,7 @@ impl ComputeNode {
// Prepare pgdata directory. This downloads the basebackup, among other things.
{
let (this, cs) = (self.clone(), compute_state.clone());
pre_tasks.spawn_blocking_child(move || this.prepare_pgdata(&cs));
pre_tasks.spawn(async move { this.prepare_pgdata(&cs).await });
}
// Resize swap to the desired size if the compute spec says so
@@ -732,8 +730,7 @@ impl ComputeNode {
let _configurator_handle = launch_configurator(self);
// Wait for all the pre-tasks to finish before starting postgres
let rt = tokio::runtime::Handle::current();
while let Some(res) = rt.block_on(pre_tasks.join_next()) {
while let Some(res) = pre_tasks.join_next().await {
res??;
}
@@ -862,14 +859,14 @@ impl ComputeNode {
}
}
fn cleanup_after_postgres_exit(&self) -> Result<bool> {
async fn cleanup_after_postgres_exit(&self) -> Result<bool> {
// Maybe sync safekeepers again, to speed up next startup
let compute_state = self.state.lock().unwrap().clone();
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
if matches!(pspec.spec.mode, compute_api::spec::ComputeMode::Primary) {
info!("syncing safekeepers on shutdown");
let storage_auth_token = pspec.storage_auth_token.clone();
let lsn = self.sync_safekeepers(storage_auth_token)?;
let lsn = self.sync_safekeepers(storage_auth_token).await?;
info!("synced safekeepers at lsn {lsn}");
}
@@ -964,7 +961,7 @@ impl ComputeNode {
}
// Connect to pageserver
let mut client = config.connect(NoTls)?;
let mut client = config.connect(postgres::NoTls)?;
let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;
let basebackup_cmd = match lsn {
@@ -1128,11 +1125,13 @@ impl ComputeNode {
// Fast path for sync_safekeepers. If they're already synced we get the lsn
// in one roundtrip. If not, we should do a full sync_safekeepers.
#[instrument(skip_all)]
pub fn check_safekeepers_synced(&self, compute_state: &ComputeState) -> Result<Option<Lsn>> {
pub async fn check_safekeepers_synced(
&self,
compute_state: &ComputeState,
) -> Result<Option<Lsn>> {
let start_time = Utc::now();
let rt = tokio::runtime::Handle::current();
let result = rt.block_on(self.check_safekeepers_synced_async(compute_state));
let result = self.check_safekeepers_synced_async(compute_state).await;
// Record runtime
self.state.lock().unwrap().metrics.sync_sk_check_ms = Utc::now()
@@ -1146,7 +1145,7 @@ impl ComputeNode {
// Run `postgres` in a special mode with `--sync-safekeepers` argument
// and return the reported LSN back to the caller.
#[instrument(skip_all)]
pub fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
pub async fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
let start_time = Utc::now();
let mut sync_handle = maybe_cgexec(&self.params.pgbin)
@@ -1178,8 +1177,8 @@ impl ComputeNode {
SYNC_SAFEKEEPERS_PID.store(0, Ordering::SeqCst);
// Process has exited, so we can join the logs thread.
let _ = tokio::runtime::Handle::current()
.block_on(logs_handle)
let _ = logs_handle
.await
.map_err(|e| tracing::error!("log task panicked: {:?}", e));
if !sync_output.status.success() {
@@ -1205,7 +1204,7 @@ impl ComputeNode {
/// Do all the preparations like PGDATA directory creation, configuration,
/// safekeepers sync, basebackup, etc.
#[instrument(skip_all)]
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
pub async fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = &pspec.spec;
let pgdata_path = Path::new(&self.params.pgdata);
@@ -1227,11 +1226,13 @@ impl ComputeNode {
let lsn = match spec.mode {
ComputeMode::Primary => {
info!("checking if safekeepers are synced");
let lsn = if let Ok(Some(lsn)) = self.check_safekeepers_synced(compute_state) {
let lsn = if let Ok(Some(lsn)) = self.check_safekeepers_synced(compute_state).await
{
lsn
} else {
info!("starting safekeepers syncing");
self.sync_safekeepers(pspec.storage_auth_token.clone())
.await
.with_context(|| "failed to sync safekeepers")?
};
info!("safekeepers synced at LSN {}", lsn);