diff --git a/compute_tools/src/bin/fast_import.rs b/compute_tools/src/bin/fast_import.rs index 8a6fc3cfae..33341b22ee 100644 --- a/compute_tools/src/bin/fast_import.rs +++ b/compute_tools/src/bin/fast_import.rs @@ -31,6 +31,7 @@ use camino::{Utf8Path, Utf8PathBuf}; use clap::Parser; use compute_tools::extension_server::{get_pg_version, PostgresMajorVersion}; use nix::unistd::Pid; +use tokio::io::BufReader; use tracing::{error, info, info_span, warn, Instrument}; use utils::fs_ext::is_directory_empty; @@ -405,8 +406,6 @@ pub(crate) async fn main() -> anyhow::Result<()> { None }; - let dumpdir = working_directory.join("dumpdir"); - let common_args = [ // schema mapping (prob suffices to specify them on one side) "--no-owner".to_string(), @@ -417,46 +416,49 @@ pub(crate) async fn main() -> anyhow::Result<()> { "--no-tablespaces".to_string(), // format "--format".to_string(), - "directory".to_string(), + "custom".to_string(), // concurrency - "--jobs".to_string(), - num_cpus::get().to_string(), + // "--jobs".to_string(), + // num_cpus::get().to_string(), // progress updates "--verbose".to_string(), ]; info!("dump into the working directory"); - { - let mut pg_dump = tokio::process::Command::new(pg_bin_dir.join("pg_dump")) - .args(&common_args) - .arg("-f") - .arg(&dumpdir) - .arg("--no-sync") - // POSITIONAL args - // source db (db name included in connection string) - .arg(&source_connstring) - // how we run it - .env_clear() - .env("LD_LIBRARY_PATH", &pg_lib_dir) - .kill_on_drop(true) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::piped()) - .spawn() - .context("spawn pg_dump")?; + let mut pg_dump = tokio::process::Command::new(pg_bin_dir.join("pg_dump")) + .args(&common_args) + .arg("--no-sync") + // POSITIONAL args + // source db (db name included in connection string) + .arg(&source_connstring) + // how we run it + .env_clear() + .env("LD_LIBRARY_PATH", &pg_lib_dir) + .kill_on_drop(true) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .context("spawn pg_dump")?; + const BUF_SIZE: usize = 64 * 1024 * 1024; + let mut buf = BufReader::with_capacity(BUF_SIZE, pg_dump.stdout.take().unwrap()); + + tokio::spawn(async move { info!(pid=%pg_dump.id().unwrap(), "spawned pg_dump"); - - tokio::spawn( - child_stdio_to_log::relay_process_output(pg_dump.stdout.take(), pg_dump.stderr.take()) - .instrument(info_span!("pg_dump")), - ); - - let st = pg_dump.wait().await.context("wait for pg_dump")?; + let st = pg_dump.wait().await.expect("wait for pg_dump"); info!(status=?st, "pg_dump exited"); if !st.success() { warn!(status=%st, "pg_dump failed, restore will likely fail as well"); } - } + + tokio::spawn( + child_stdio_to_log::relay_process_output( + None, + pg_dump.stderr.take(), + ) + .instrument(info_span!("pg_dump")), + ); + }); // TODO: do it in a streaming way, plenty of internal research done on this already // TODO: do the unlogged table trick @@ -465,14 +467,15 @@ pub(crate) async fn main() -> anyhow::Result<()> { .args(&common_args) .arg("-d") .arg(&restore_connstring) - // POSITIONAL args - .arg(&dumpdir) + .arg("--clean") + .arg("--if-exists") // how we run it .env_clear() .env("LD_LIBRARY_PATH", &pg_lib_dir) .kill_on_drop(true) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) + .stdin(std::process::Stdio::piped()) .spawn() .context("spawn pg_restore")?; @@ -484,11 +487,19 @@ pub(crate) async fn main() -> anyhow::Result<()> { ) .instrument(info_span!("pg_restore")), ); + + let mut restore_stdin = pg_restore.stdin.take().unwrap(); + + tokio::spawn(async move { + tokio::io::copy_buf(&mut buf, &mut restore_stdin).await.expect("pg_restore failed to read from pg_dump"); + }); + let st = pg_restore.wait().await.context("wait for pg_restore")?; info!(status=?st, "pg_restore exited"); if !st.success() { warn!(status=%st, "pg_restore failed, restore will likely fail as well"); } + } if let Some(mut proc) = postgres_proc {