diff --git a/compute_tools/src/bin/fast_import.rs b/compute_tools/src/bin/fast_import.rs index aba6a347aa..275765bba4 100644 --- a/compute_tools/src/bin/fast_import.rs +++ b/compute_tools/src/bin/fast_import.rs @@ -31,7 +31,7 @@ use camino::{Utf8Path, Utf8PathBuf}; use clap::Parser; use compute_tools::extension_server::{get_pg_version, PostgresMajorVersion}; use nix::unistd::Pid; -use tracing::{info, info_span, warn, Instrument}; +use tracing::{error, info, info_span, warn, Instrument}; use utils::fs_ext::is_directory_empty; #[path = "fast_import/aws_s3_sync.rs"] @@ -41,6 +41,9 @@ mod child_stdio_to_log; #[path = "fast_import/s3_uri.rs"] mod s3_uri; +const PG_WAIT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(600); +const PG_WAIT_RETRY_INTERVAL: std::time::Duration = std::time::Duration::from_millis(300); + #[derive(clap::Parser)] struct Args { #[clap(long)] @@ -96,6 +99,9 @@ pub(crate) async fn main() -> anyhow::Result<()> { if args.s3_prefix.is_none() && args.source_connection_string.is_none() { anyhow::bail!("either s3_prefix or source_connection_string must be specified"); } + if args.s3_prefix.is_some() && args.source_connection_string.is_some() { + anyhow::bail!("only one of s3_prefix or source_connection_string can be specified"); + } let working_directory = args.working_directory; let pg_bin_dir = args.pg_bin_dir; @@ -244,13 +250,23 @@ pub(crate) async fn main() -> anyhow::Result<()> { // Create neondb database in the running postgres let restore_pg_connstring = format!("host=localhost port={pg_port} user={superuser} dbname=postgres"); + + let start_time = std::time::Instant::now(); + loop { + if start_time.elapsed() > PG_WAIT_TIMEOUT { + error!( + "timeout exceeded: failed to poll postgres and create database within 10 minutes" + ); + std::process::exit(1); + } + match tokio_postgres::connect(&restore_pg_connstring, tokio_postgres::NoTls).await { Ok((client, connection)) => { // Spawn the connection handling task to maintain the connection tokio::spawn(async move { if let Err(e) = connection.await { - eprintln!("connection error: {}", e); + warn!("connection error: {}", e); } }); @@ -260,12 +276,24 @@ pub(crate) async fn main() -> anyhow::Result<()> { break; } Err(e) => { - info!("failed to create database: {}", e); - break; + warn!( + "failed to create database: {}, retying in {}s", + e, + PG_WAIT_RETRY_INTERVAL.as_secs_f32() + ); + tokio::time::sleep(PG_WAIT_RETRY_INTERVAL).await; + continue; } } } - Err(_) => continue, + Err(_) => { + info!( + "postgres not ready yet, retrying in {}s", + PG_WAIT_RETRY_INTERVAL.as_secs_f32() + ); + tokio::time::sleep(PG_WAIT_RETRY_INTERVAL).await; + continue; + } } }