This commit is contained in:
Vlad Lazar
2025-01-23 16:20:02 +01:00
parent 156b798d1d
commit 6c1de868dc

View File

@@ -31,6 +31,7 @@ use camino::{Utf8Path, Utf8PathBuf};
use clap::Parser;
use compute_tools::extension_server::{get_pg_version, PostgresMajorVersion};
use nix::unistd::Pid;
use tokio::io::BufReader;
use tracing::{error, info, info_span, warn, Instrument};
use utils::fs_ext::is_directory_empty;
@@ -405,8 +406,6 @@ pub(crate) async fn main() -> anyhow::Result<()> {
None
};
let dumpdir = working_directory.join("dumpdir");
let common_args = [
// schema mapping (prob suffices to specify them on one side)
"--no-owner".to_string(),
@@ -417,46 +416,49 @@ pub(crate) async fn main() -> anyhow::Result<()> {
"--no-tablespaces".to_string(),
// format
"--format".to_string(),
"directory".to_string(),
"custom".to_string(),
// concurrency
"--jobs".to_string(),
num_cpus::get().to_string(),
// "--jobs".to_string(),
// num_cpus::get().to_string(),
// progress updates
"--verbose".to_string(),
];
info!("dump into the working directory");
{
let mut pg_dump = tokio::process::Command::new(pg_bin_dir.join("pg_dump"))
.args(&common_args)
.arg("-f")
.arg(&dumpdir)
.arg("--no-sync")
// POSITIONAL args
// source db (db name included in connection string)
.arg(&source_connstring)
// how we run it
.env_clear()
.env("LD_LIBRARY_PATH", &pg_lib_dir)
.kill_on_drop(true)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.context("spawn pg_dump")?;
let mut pg_dump = tokio::process::Command::new(pg_bin_dir.join("pg_dump"))
.args(&common_args)
.arg("--no-sync")
// POSITIONAL args
// source db (db name included in connection string)
.arg(&source_connstring)
// how we run it
.env_clear()
.env("LD_LIBRARY_PATH", &pg_lib_dir)
.kill_on_drop(true)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.context("spawn pg_dump")?;
const BUF_SIZE: usize = 64 * 1024 * 1024;
let mut buf = BufReader::with_capacity(BUF_SIZE, pg_dump.stdout.take().unwrap());
tokio::spawn(async move {
info!(pid=%pg_dump.id().unwrap(), "spawned pg_dump");
tokio::spawn(
child_stdio_to_log::relay_process_output(pg_dump.stdout.take(), pg_dump.stderr.take())
.instrument(info_span!("pg_dump")),
);
let st = pg_dump.wait().await.context("wait for pg_dump")?;
let st = pg_dump.wait().await.expect("wait for pg_dump");
info!(status=?st, "pg_dump exited");
if !st.success() {
warn!(status=%st, "pg_dump failed, restore will likely fail as well");
}
}
tokio::spawn(
child_stdio_to_log::relay_process_output(
None,
pg_dump.stderr.take(),
)
.instrument(info_span!("pg_dump")),
);
});
// TODO: do it in a streaming way, plenty of internal research done on this already
// TODO: do the unlogged table trick
@@ -465,14 +467,15 @@ pub(crate) async fn main() -> anyhow::Result<()> {
.args(&common_args)
.arg("-d")
.arg(&restore_connstring)
// POSITIONAL args
.arg(&dumpdir)
.arg("--clean")
.arg("--if-exists")
// how we run it
.env_clear()
.env("LD_LIBRARY_PATH", &pg_lib_dir)
.kill_on_drop(true)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.stdin(std::process::Stdio::piped())
.spawn()
.context("spawn pg_restore")?;
@@ -484,11 +487,19 @@ pub(crate) async fn main() -> anyhow::Result<()> {
)
.instrument(info_span!("pg_restore")),
);
let mut restore_stdin = pg_restore.stdin.take().unwrap();
tokio::spawn(async move {
tokio::io::copy_buf(&mut buf, &mut restore_stdin).await.expect("pg_restore failed to read from pg_dump");
});
let st = pg_restore.wait().await.context("wait for pg_restore")?;
info!(status=?st, "pg_restore exited");
if !st.success() {
warn!(status=%st, "pg_restore failed, restore will likely fail as well");
}
}
if let Some(mut proc) = postgres_proc {