diff --git a/compute_tools/src/bin/fast_import.rs b/compute_tools/src/bin/fast_import.rs index 346381f4d9..8017c7b94d 100644 --- a/compute_tools/src/bin/fast_import.rs +++ b/compute_tools/src/bin/fast_import.rs @@ -87,7 +87,11 @@ const DEFAULT_LOCALE: &str = if cfg!(target_os = "macos") { "C.UTF-8" }; -async fn decode_connstring(kms_client: &aws_sdk_kms::Client, key_id: &String, connstring_ciphertext_base64: Vec) -> Result { +async fn decode_connstring( + kms_client: &aws_sdk_kms::Client, + key_id: &String, + connstring_ciphertext_base64: Vec, +) -> Result { let mut output = kms_client .decrypt() .key_id(key_id) @@ -103,8 +107,7 @@ async fn decode_connstring(kms_client: &aws_sdk_kms::Client, key_id: &String, co .take() .context("get plaintext connection string")?; - String::from_utf8(plaintext.into_inner()) - .context("parse connection string as utf8") + String::from_utf8(plaintext.into_inner()).context("parse connection string as utf8") } #[tokio::main] @@ -142,12 +145,10 @@ pub(crate) async fn main() -> anyhow::Result<()> { let superuser = "cloud_admin"; let pg_port = || { - if args.pg_port.is_some() { - args.pg_port.unwrap() - } else { + args.pg_port.unwrap_or_else(|| { info!("pg_port not specified, using default 5432"); 5432 - } + }) }; let mut run_postgres = true; @@ -176,13 +177,15 @@ pub(crate) async fn main() -> anyhow::Result<()> { let source = decode_connstring( kms_client.as_ref().unwrap(), &key_id, - spec.source_connstring_ciphertext_base64 - ).await?; + spec.source_connstring_ciphertext_base64, + ) + .await?; let restore = if let Some(restore_ciphertext) = spec.restore_connstring_ciphertext_base64 { run_postgres = false; - decode_connstring(kms_client.as_ref().unwrap(), &key_id, restore_ciphertext).await? + decode_connstring(kms_client.as_ref().unwrap(), &key_id, restore_ciphertext) + .await? } else { // restoring to local postgres otherwise format!( @@ -251,8 +254,8 @@ pub(crate) async fn main() -> anyhow::Result<()> { library_search_path: &pg_lib_dir, // TODO: is this right? Prob works in compute image, not sure about neon_local. pgdata: &pgdata_dir, }) - .await - .context("initdb")?; + .await + .context("initdb")?; // // Launch postgres process @@ -301,7 +304,7 @@ pub(crate) async fn main() -> anyhow::Result<()> { &restore_connstring.replace("dbname=neondb", "dbname=postgres"), tokio_postgres::NoTls, ) - .await + .await { Ok((client, connection)) => { // Spawn the connection handling task to maintain the connection @@ -318,10 +321,10 @@ pub(crate) async fn main() -> anyhow::Result<()> { } Err(e) => { warn!( - "failed to create database: {}, retying in {}s", - e, - PG_WAIT_RETRY_INTERVAL.as_secs_f32() - ); + "failed to create database: {}, retying in {}s", + e, + PG_WAIT_RETRY_INTERVAL.as_secs_f32() + ); tokio::time::sleep(PG_WAIT_RETRY_INTERVAL).await; continue; } @@ -334,7 +337,7 @@ pub(crate) async fn main() -> anyhow::Result<()> { ); tokio::time::sleep(PG_WAIT_RETRY_INTERVAL).await; continue; - }, + } } } Some(proc) @@ -420,7 +423,7 @@ pub(crate) async fn main() -> anyhow::Result<()> { pg_restore.stdout.take(), pg_restore.stderr.take(), ) - .instrument(info_span!("pg_restore")), + .instrument(info_span!("pg_restore")), ); let st = pg_restore.wait().await.context("wait for pg_restore")?; info!(status=?st, "pg_restore exited"); @@ -430,45 +433,43 @@ pub(crate) async fn main() -> anyhow::Result<()> { } if let Some(mut proc) = postgres_proc { - // If interactive mode, wait for Ctrl+C - if args.interactive { - info!("Running in interactive mode. Press Ctrl+C to shut down."); - tokio::signal::ctrl_c().await.context("wait for ctrl-c")?; - } + // If interactive mode, wait for Ctrl+C + if args.interactive { + info!("Running in interactive mode. Press Ctrl+C to shut down."); + tokio::signal::ctrl_c().await.context("wait for ctrl-c")?; + } - info!("shutdown postgres"); + info!("shutdown postgres"); + { + nix::sys::signal::kill( + Pid::from_raw(i32::try_from(proc.id().unwrap()).expect("convert child pid to i32")), + nix::sys::signal::SIGTERM, + ) + .context("signal postgres to shut down")?; + proc.wait() + .await + .context("wait for postgres to shut down")?; + } + + // Only sync if s3_prefix was specified + if let Some(s3_prefix) = args.s3_prefix { + info!("upload pgdata"); + aws_s3_sync::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/pgdata/")) + .await + .context("sync dump directory to destination")?; + + info!("write status"); { - nix::sys::signal::kill( - Pid::from_raw( - i32::try_from(proc.id().unwrap()).expect("convert child pid to i32"), - ), - nix::sys::signal::SIGTERM, - ) - .context("signal postgres to shut down")?; - proc.wait() + let status_dir = working_directory.join("status"); + std::fs::create_dir(&status_dir).context("create status directory")?; + let status_file = status_dir.join("pgdata"); + std::fs::write(&status_file, serde_json::json!({"done": true}).to_string()) + .context("write status file")?; + aws_s3_sync::sync(&status_dir, &s3_prefix.append("/status/")) .await - .context("wait for postgres to shut down")?; - } - - // Only sync if s3_prefix was specified - if let Some(s3_prefix) = args.s3_prefix { - info!("upload pgdata"); - aws_s3_sync::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/pgdata/")) - .await - .context("sync dump directory to destination")?; - - info!("write status"); - { - let status_dir = working_directory.join("status"); - std::fs::create_dir(&status_dir).context("create status directory")?; - let status_file = status_dir.join("pgdata"); - std::fs::write(&status_file, serde_json::json!({"done": true}).to_string()) - .context("write status file")?; - aws_s3_sync::sync(&status_dir, &s3_prefix.append("/status/")) - .await - .context("sync status directory to destination")?; - } + .context("sync status directory to destination")?; } + } } Ok(())