diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index d812ff12e3..e01f28e07b 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -6,9 +6,9 @@ //! use std::borrow::Cow; use std::collections::HashMap; -use std::fs::File; + use std::io; -use std::io::{BufReader, Write}; +use std::io::Write; use std::num::NonZeroU64; use std::path::PathBuf; use std::process::{Child, Command}; @@ -16,7 +16,7 @@ use std::time::Duration; use anyhow::{bail, Context}; use camino::Utf8PathBuf; -use futures::{SinkExt, StreamExt}; +use futures::SinkExt; use pageserver::client::mgmt_api; use pageserver_api::models::{self, LocationConfig, TenantInfo, TimelineInfo}; use pageserver_api::shard::TenantShardId; @@ -541,7 +541,7 @@ impl PageServerNode { // Init base reader let (start_lsn, base_tarfile_path) = base; let base_tarfile = tokio::fs::File::open(base_tarfile_path).await?; - let mut base_tarfile = tokio_util::io::ReaderStream::new(base_tarfile); + let base_tarfile = tokio_util::io::ReaderStream::new(base_tarfile); // Init wal reader if necessary let (end_lsn, wal_reader) = if let Some((end_lsn, wal_tarfile_path)) = pg_wal { @@ -552,29 +552,36 @@ impl PageServerNode { (start_lsn, None) }; + let copy_in = |reader, cmd| { + let client = &client; + async move { + let writer = client.copy_in(&cmd).await?; + let writer = std::pin::pin!(writer); + let mut writer = writer.sink_map_err(|e| { + std::io::Error::new(std::io::ErrorKind::Other, format!("{e}")) + }); + let mut reader = std::pin::pin!(reader); + writer.send_all(&mut reader).await?; + writer.into_inner().finish().await?; + anyhow::Ok(()) + } + }; + // Import base - let import_cmd = format!( - "import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn} {pg_version}" - ); - let writer = client.copy_in(&import_cmd).await?; - let mut writer = std::pin::pin!(writer); - let mut writer = - writer.sink_map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{e}"))); - let mut base_tarfile = std::pin::pin!(base_tarfile); - writer.send_all(&mut base_tarfile).await?; - writer.into_inner().finish().await?; - + copy_in( + base_tarfile, + format!( + "import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn} {pg_version}" + ), + ) + .await?; // Import wal if necessary - if let Some(mut wal_reader) = wal_reader { - let import_cmd = format!("import wal {tenant_id} {timeline_id} {start_lsn} {end_lsn}"); - - let writer = client.copy_in(&import_cmd).await?; - let mut writer = std::pin::pin!(writer); - let mut writer = writer - .sink_map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{e}"))); - let mut wal_reader = std::pin::pin!(wal_reader); - writer.send_all(&mut wal_reader).await?; - writer.into_inner().finish().await?; + if let Some(wal_reader) = wal_reader { + copy_in( + wal_reader, + format!("import wal {tenant_id} {timeline_id} {start_lsn} {end_lsn}"), + ) + .await?; } Ok(()) diff --git a/libs/postgres_connection/src/lib.rs b/libs/postgres_connection/src/lib.rs index d793abc8d0..ccf9108895 100644 --- a/libs/postgres_connection/src/lib.rs +++ b/libs/postgres_connection/src/lib.rs @@ -4,7 +4,6 @@ use anyhow::{bail, Context}; use itertools::Itertools; use std::borrow::Cow; use std::fmt; -use tokio_postgres::tls::NoTlsStream; use url::Host; /// Parses a string of format either `host:port` or `host` into a corresponding pair.