This commit is contained in:
Bojan Serafimov
2022-06-08 14:37:46 -04:00
parent 28acefb31f
commit ecee80d1bf
4 changed files with 46 additions and 28 deletions

View File

@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::io::Write;
use std::fs::File;
use std::io::{BufReader, Write};
use std::net::TcpStream;
use std::num::NonZeroU64;
use std::path::PathBuf;
@@ -532,8 +533,21 @@ impl PageServerNode {
&self,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
tarfile: PathBuf,
) -> anyhow::Result<()> {
// TODO send using apis::upload::send_basebackup
let mut client = self.pg_connection_config.connect(NoTls).unwrap();
// Init reader
let file = File::open(tarfile)?;
let mut reader = BufReader::new(file);
// Init writer
let import_cmd = format!("import {tenant_id} {timeline_id}");
let mut writer = client.copy_in(&import_cmd)?;
// Stream reader -> writer
io::copy(&mut reader, &mut writer)?;
writer.finish()?;
Ok(())
}
}

View File

@@ -164,7 +164,9 @@ fn main() -> Result<()> {
.arg(tenant_id_arg.clone())
.arg(timeline_id_arg.clone())
.arg(Arg::new("node-name").long("node-name").takes_value(true)
.help("Name to assign to the imported timeline")))
.help("Name to assign to the imported timeline"))
.arg(Arg::new("tarfile").long("tarfile").takes_value(true)
.help("Basebackup tarfil to import")))
).subcommand(
App::new("tenant")
.setting(AppSettings::ArgRequiredElseHelp)
@@ -625,12 +627,14 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
.expect("No timeline id provided");
let name = import_match.value_of("node-name")
.ok_or_else(|| anyhow!("No node name provided"))?;
let tarfile = import_match.value_of("tarfile")
.ok_or_else(|| anyhow!("No tarfile provided"))?;
let mut cplane = ComputeControlPlane::load(env.clone())?;
println!("Importing timeline into pageserver ...");
pageserver.timeline_import(tenant_id, timeline_id)?;
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
pageserver.timeline_import(tenant_id, timeline_id, tarfile.try_into()?)?;
println!("Creating node for imported timeline ...");
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
cplane.new_node(tenant_id, name, timeline_id, None, None)?;
println!("Done");
},

View File

@@ -464,36 +464,32 @@ impl PageServerHandler {
thread_mgr::associate_with(Some(tenant), Some(timeline));
// switch client to COPYBOTH
pgb.write_message(&BeMessage::CopyBothResponse)?;
pgb.write_message(&BeMessage::CopyInResponse)?;
while !thread_mgr::is_shutdown_requested() {
match pgb.read_message() {
Ok(Some(message)) => {
let copy_data_bytes = match message {
FeMessage::CopyData(bytes) => bytes,
_ => continue, // TODO maybe error?
};
let fe_msg = ImportFeMessage::des(&copy_data_bytes)?;
match fe_msg {
ImportFeMessage::File(name, _file_bytes) => {
info!("Got file {}", name);
// TODO chunk (what does pg_basebackup do?)
// TODO handle
}
ImportFeMessage::Done => {
info!("Done importing.");
pgb.write_message(&BeMessage::CopyData(&ImportBeMessage::Done.ser()?))?;
FeMessage::CopyDone => {
return Ok(())
}
}
m => {
info!("unexpected copy in client message {:?}", m);
continue;
},
};
// TODO use copy_data_bytes
},
Ok(None) => {
todo!("error on none?");
// Is this ok?
return Ok(())
},
Err(e) => {
todo!("import error handler not implemented {}", e);
if !is_socket_read_timed_out(&e) {
return Err(e);
}
},
}
}
@@ -787,8 +783,6 @@ impl postgres_backend::Handler for PageServerHandler {
info!("Importing timeline {}.{}", tenant, timeline);
self.handle_import(pgb, tenant, timeline)?;
info!("Done importing timeline {}.{}", tenant, timeline);
// Do I need this?
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.to_ascii_lowercase().starts_with("set ") {
// important because psycopg2 executes "SET datestyle TO 'ISO'"

View File

@@ -1,5 +1,6 @@
from fixtures.neon_fixtures import NeonEnvBuilder
from uuid import UUID
import tarfile
import os
@@ -14,6 +15,7 @@ def test_import(neon_env_builder,
source_repo_dir = os.path.join(test_output_dir, "source_repo")
destination_repo_dir = os.path.join(test_output_dir, "destination_repo")
basebackup_dir = os.path.join(test_output_dir, "basebackup")
basebackup_tar_path = os.path.join(test_output_dir, "basebackup.tar.gz")
os.mkdir(basebackup_dir)
# Create a repo, put some data in, take basebackup, and shut it down
@@ -28,17 +30,21 @@ def test_import(neon_env_builder,
timeline = pg.safe_psql("show neon.timeline_id")[0][0]
pg_bin.run(["pg_basebackup", "-d", pg.connstr(), "-D", basebackup_dir])
# compress basebackup
with tarfile.open(basebackup_tar_path, "w:gz") as tf:
# TODO match iteration order to what pageserver would do
tf.add(basebackup_dir)
# Create a new repo, load the basebackup into it, and check that data is there
with NeonEnvBuilder(destination_repo_dir, port_distributor, default_broker, mock_s3_server) as builder:
env = builder.init_start()
# TODO do everything from neon_cli instead
env.pageserver.safe_psql(f"import {tenant} {timeline}")
env.neon_cli.raw_cli([
"timeline",
"import",
"--tenant-id", tenant,
"--timeline-id", timeline,
"--node-name", node_name,
"--tarfile", basebackup_tar_path,
])
pg = env.postgres.create_start(node_name, tenant_id=UUID(tenant))
assert pg.safe_psql('select count(*) from t') == [(300000, )]
# pg = env.postgres.create_start(node_name, tenant_id=UUID(tenant))
# assert pg.safe_psql('select count(*) from t') == [(300000, )]