This commit is contained in:
Christian Schwarz
2023-12-15 12:29:02 +00:00
parent 83bdebb4af
commit 2664e9b834
2 changed files with 32 additions and 26 deletions

View File

@@ -6,9 +6,9 @@
//!
use std::borrow::Cow;
use std::collections::HashMap;
use std::fs::File;
use std::io;
use std::io::{BufReader, Write};
use std::io::Write;
use std::num::NonZeroU64;
use std::path::PathBuf;
use std::process::{Child, Command};
@@ -16,7 +16,7 @@ use std::time::Duration;
use anyhow::{bail, Context};
use camino::Utf8PathBuf;
use futures::{SinkExt, StreamExt};
use futures::SinkExt;
use pageserver::client::mgmt_api;
use pageserver_api::models::{self, LocationConfig, TenantInfo, TimelineInfo};
use pageserver_api::shard::TenantShardId;
@@ -541,7 +541,7 @@ impl PageServerNode {
// Init base reader
let (start_lsn, base_tarfile_path) = base;
let base_tarfile = tokio::fs::File::open(base_tarfile_path).await?;
let mut base_tarfile = tokio_util::io::ReaderStream::new(base_tarfile);
let base_tarfile = tokio_util::io::ReaderStream::new(base_tarfile);
// Init wal reader if necessary
let (end_lsn, wal_reader) = if let Some((end_lsn, wal_tarfile_path)) = pg_wal {
@@ -552,29 +552,36 @@ impl PageServerNode {
(start_lsn, None)
};
let copy_in = |reader, cmd| {
let client = &client;
async move {
let writer = client.copy_in(&cmd).await?;
let writer = std::pin::pin!(writer);
let mut writer = writer.sink_map_err(|e| {
std::io::Error::new(std::io::ErrorKind::Other, format!("{e}"))
});
let mut reader = std::pin::pin!(reader);
writer.send_all(&mut reader).await?;
writer.into_inner().finish().await?;
anyhow::Ok(())
}
};
// Import base
let import_cmd = format!(
"import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn} {pg_version}"
);
let writer = client.copy_in(&import_cmd).await?;
let mut writer = std::pin::pin!(writer);
let mut writer =
writer.sink_map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{e}")));
let mut base_tarfile = std::pin::pin!(base_tarfile);
writer.send_all(&mut base_tarfile).await?;
writer.into_inner().finish().await?;
copy_in(
base_tarfile,
format!(
"import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn} {pg_version}"
),
)
.await?;
// Import wal if necessary
if let Some(mut wal_reader) = wal_reader {
let import_cmd = format!("import wal {tenant_id} {timeline_id} {start_lsn} {end_lsn}");
let writer = client.copy_in(&import_cmd).await?;
let mut writer = std::pin::pin!(writer);
let mut writer = writer
.sink_map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{e}")));
let mut wal_reader = std::pin::pin!(wal_reader);
writer.send_all(&mut wal_reader).await?;
writer.into_inner().finish().await?;
if let Some(wal_reader) = wal_reader {
copy_in(
wal_reader,
format!("import wal {tenant_id} {timeline_id} {start_lsn} {end_lsn}"),
)
.await?;
}
Ok(())

View File

@@ -4,7 +4,6 @@ use anyhow::{bail, Context};
use itertools::Itertools;
use std::borrow::Cow;
use std::fmt;
use tokio_postgres::tls::NoTlsStream;
use url::Host;
/// Parses a string of format either `host:port` or `host` into a corresponding pair.