Merge branch '22037-basic-fast-import-e2e' into cloud-22775-restore-to-connstring

This commit is contained in:
Gleb Novikov
2025-01-17 17:53:44 +00:00

View File

@@ -87,7 +87,11 @@ const DEFAULT_LOCALE: &str = if cfg!(target_os = "macos") {
"C.UTF-8"
};
async fn decode_connstring(kms_client: &aws_sdk_kms::Client, key_id: &String, connstring_ciphertext_base64: Vec<u8>) -> Result<String, anyhow::Error> {
async fn decode_connstring(
kms_client: &aws_sdk_kms::Client,
key_id: &String,
connstring_ciphertext_base64: Vec<u8>,
) -> Result<String, anyhow::Error> {
let mut output = kms_client
.decrypt()
.key_id(key_id)
@@ -103,8 +107,7 @@ async fn decode_connstring(kms_client: &aws_sdk_kms::Client, key_id: &String, co
.take()
.context("get plaintext connection string")?;
String::from_utf8(plaintext.into_inner())
.context("parse connection string as utf8")
String::from_utf8(plaintext.into_inner()).context("parse connection string as utf8")
}
#[tokio::main]
@@ -142,12 +145,10 @@ pub(crate) async fn main() -> anyhow::Result<()> {
let superuser = "cloud_admin";
let pg_port = || {
if args.pg_port.is_some() {
args.pg_port.unwrap()
} else {
args.pg_port.unwrap_or_else(|| {
info!("pg_port not specified, using default 5432");
5432
}
})
};
let mut run_postgres = true;
@@ -176,13 +177,15 @@ pub(crate) async fn main() -> anyhow::Result<()> {
let source = decode_connstring(
kms_client.as_ref().unwrap(),
&key_id,
spec.source_connstring_ciphertext_base64
).await?;
spec.source_connstring_ciphertext_base64,
)
.await?;
let restore =
if let Some(restore_ciphertext) = spec.restore_connstring_ciphertext_base64 {
run_postgres = false;
decode_connstring(kms_client.as_ref().unwrap(), &key_id, restore_ciphertext).await?
decode_connstring(kms_client.as_ref().unwrap(), &key_id, restore_ciphertext)
.await?
} else {
// restoring to local postgres otherwise
format!(
@@ -251,8 +254,8 @@ pub(crate) async fn main() -> anyhow::Result<()> {
library_search_path: &pg_lib_dir, // TODO: is this right? Prob works in compute image, not sure about neon_local.
pgdata: &pgdata_dir,
})
.await
.context("initdb")?;
.await
.context("initdb")?;
//
// Launch postgres process
@@ -301,7 +304,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
&restore_connstring.replace("dbname=neondb", "dbname=postgres"),
tokio_postgres::NoTls,
)
.await
.await
{
Ok((client, connection)) => {
// Spawn the connection handling task to maintain the connection
@@ -318,10 +321,10 @@ pub(crate) async fn main() -> anyhow::Result<()> {
}
Err(e) => {
warn!(
"failed to create database: {}, retying in {}s",
e,
PG_WAIT_RETRY_INTERVAL.as_secs_f32()
);
"failed to create database: {}, retying in {}s",
e,
PG_WAIT_RETRY_INTERVAL.as_secs_f32()
);
tokio::time::sleep(PG_WAIT_RETRY_INTERVAL).await;
continue;
}
@@ -334,7 +337,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
);
tokio::time::sleep(PG_WAIT_RETRY_INTERVAL).await;
continue;
},
}
}
}
Some(proc)
@@ -420,7 +423,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
pg_restore.stdout.take(),
pg_restore.stderr.take(),
)
.instrument(info_span!("pg_restore")),
.instrument(info_span!("pg_restore")),
);
let st = pg_restore.wait().await.context("wait for pg_restore")?;
info!(status=?st, "pg_restore exited");
@@ -430,45 +433,43 @@ pub(crate) async fn main() -> anyhow::Result<()> {
}
if let Some(mut proc) = postgres_proc {
// If interactive mode, wait for Ctrl+C
if args.interactive {
info!("Running in interactive mode. Press Ctrl+C to shut down.");
tokio::signal::ctrl_c().await.context("wait for ctrl-c")?;
}
// If interactive mode, wait for Ctrl+C
if args.interactive {
info!("Running in interactive mode. Press Ctrl+C to shut down.");
tokio::signal::ctrl_c().await.context("wait for ctrl-c")?;
}
info!("shutdown postgres");
info!("shutdown postgres");
{
nix::sys::signal::kill(
Pid::from_raw(i32::try_from(proc.id().unwrap()).expect("convert child pid to i32")),
nix::sys::signal::SIGTERM,
)
.context("signal postgres to shut down")?;
proc.wait()
.await
.context("wait for postgres to shut down")?;
}
// Only sync if s3_prefix was specified
if let Some(s3_prefix) = args.s3_prefix {
info!("upload pgdata");
aws_s3_sync::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/pgdata/"))
.await
.context("sync dump directory to destination")?;
info!("write status");
{
nix::sys::signal::kill(
Pid::from_raw(
i32::try_from(proc.id().unwrap()).expect("convert child pid to i32"),
),
nix::sys::signal::SIGTERM,
)
.context("signal postgres to shut down")?;
proc.wait()
let status_dir = working_directory.join("status");
std::fs::create_dir(&status_dir).context("create status directory")?;
let status_file = status_dir.join("pgdata");
std::fs::write(&status_file, serde_json::json!({"done": true}).to_string())
.context("write status file")?;
aws_s3_sync::sync(&status_dir, &s3_prefix.append("/status/"))
.await
.context("wait for postgres to shut down")?;
}
// Only sync if s3_prefix was specified
if let Some(s3_prefix) = args.s3_prefix {
info!("upload pgdata");
aws_s3_sync::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/pgdata/"))
.await
.context("sync dump directory to destination")?;
info!("write status");
{
let status_dir = working_directory.join("status");
std::fs::create_dir(&status_dir).context("create status directory")?;
let status_file = status_dir.join("pgdata");
std::fs::write(&status_file, serde_json::json!({"done": true}).to_string())
.context("write status file")?;
aws_s3_sync::sync(&status_dir, &s3_prefix.append("/status/"))
.await
.context("sync status directory to destination")?;
}
.context("sync status directory to destination")?;
}
}
}
Ok(())