From ecee80d1bf01d37ada4ed04ca46b612096ea8118 Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Wed, 8 Jun 2022 14:37:46 -0400 Subject: [PATCH] Send tar --- control_plane/src/storage.rs | 18 ++++++++++++-- neon_local/src/main.rs | 10 +++++--- pageserver/src/page_service.rs | 32 ++++++++++--------------- test_runner/batch_others/test_import.py | 14 +++++++---- 4 files changed, 46 insertions(+), 28 deletions(-) diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 2baf496ed4..aa087b6c27 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; -use std::io::Write; +use std::fs::File; +use std::io::{BufReader, Write}; use std::net::TcpStream; use std::num::NonZeroU64; use std::path::PathBuf; @@ -532,8 +533,21 @@ impl PageServerNode { &self, tenant_id: ZTenantId, timeline_id: ZTimelineId, + tarfile: PathBuf, ) -> anyhow::Result<()> { - // TODO send using apis::upload::send_basebackup + let mut client = self.pg_connection_config.connect(NoTls).unwrap(); + + // Init reader + let file = File::open(tarfile)?; + let mut reader = BufReader::new(file); + + // Init writer + let import_cmd = format!("import {tenant_id} {timeline_id}"); + let mut writer = client.copy_in(&import_cmd)?; + + // Stream reader -> writer + io::copy(&mut reader, &mut writer)?; + writer.finish()?; Ok(()) } } diff --git a/neon_local/src/main.rs b/neon_local/src/main.rs index 81f7c62a0f..4bec31912d 100644 --- a/neon_local/src/main.rs +++ b/neon_local/src/main.rs @@ -164,7 +164,9 @@ fn main() -> Result<()> { .arg(tenant_id_arg.clone()) .arg(timeline_id_arg.clone()) .arg(Arg::new("node-name").long("node-name").takes_value(true) - .help("Name to assign to the imported timeline"))) + .help("Name to assign to the imported timeline")) + .arg(Arg::new("tarfile").long("tarfile").takes_value(true) + .help("Basebackup tarfil to import"))) ).subcommand( App::new("tenant") .setting(AppSettings::ArgRequiredElseHelp) @@ -625,12 +627,14 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) - .expect("No timeline id provided"); let name = import_match.value_of("node-name") .ok_or_else(|| anyhow!("No node name provided"))?; + let tarfile = import_match.value_of("tarfile") + .ok_or_else(|| anyhow!("No tarfile provided"))?; let mut cplane = ComputeControlPlane::load(env.clone())?; println!("Importing timeline into pageserver ..."); - pageserver.timeline_import(tenant_id, timeline_id)?; - env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?; + pageserver.timeline_import(tenant_id, timeline_id, tarfile.try_into()?)?; println!("Creating node for imported timeline ..."); + env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?; cplane.new_node(tenant_id, name, timeline_id, None, None)?; println!("Done"); }, diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index e0baf5293a..35f6000b9a 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -464,36 +464,32 @@ impl PageServerHandler { thread_mgr::associate_with(Some(tenant), Some(timeline)); // switch client to COPYBOTH - pgb.write_message(&BeMessage::CopyBothResponse)?; + pgb.write_message(&BeMessage::CopyInResponse)?; while !thread_mgr::is_shutdown_requested() { match pgb.read_message() { Ok(Some(message)) => { let copy_data_bytes = match message { FeMessage::CopyData(bytes) => bytes, - _ => continue, // TODO maybe error? - }; - - let fe_msg = ImportFeMessage::des(©_data_bytes)?; - match fe_msg { - ImportFeMessage::File(name, _file_bytes) => { - info!("Got file {}", name); - // TODO chunk (what does pg_basebackup do?) - // TODO handle - } - ImportFeMessage::Done => { - info!("Done importing."); - pgb.write_message(&BeMessage::CopyData(&ImportBeMessage::Done.ser()?))?; + FeMessage::CopyDone => { return Ok(()) } - } + m => { + info!("unexpected copy in client message {:?}", m); + continue; + }, + }; + // TODO use copy_data_bytes }, Ok(None) => { - todo!("error on none?"); + // Is this ok? + return Ok(()) }, Err(e) => { - todo!("import error handler not implemented {}", e); + if !is_socket_read_timed_out(&e) { + return Err(e); + } }, } } @@ -787,8 +783,6 @@ impl postgres_backend::Handler for PageServerHandler { info!("Importing timeline {}.{}", tenant, timeline); self.handle_import(pgb, tenant, timeline)?; info!("Done importing timeline {}.{}", tenant, timeline); - - // Do I need this? pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; } else if query_string.to_ascii_lowercase().starts_with("set ") { // important because psycopg2 executes "SET datestyle TO 'ISO'" diff --git a/test_runner/batch_others/test_import.py b/test_runner/batch_others/test_import.py index 976c8d7f64..ea829eb3e6 100644 --- a/test_runner/batch_others/test_import.py +++ b/test_runner/batch_others/test_import.py @@ -1,5 +1,6 @@ from fixtures.neon_fixtures import NeonEnvBuilder from uuid import UUID +import tarfile import os @@ -14,6 +15,7 @@ def test_import(neon_env_builder, source_repo_dir = os.path.join(test_output_dir, "source_repo") destination_repo_dir = os.path.join(test_output_dir, "destination_repo") basebackup_dir = os.path.join(test_output_dir, "basebackup") + basebackup_tar_path = os.path.join(test_output_dir, "basebackup.tar.gz") os.mkdir(basebackup_dir) # Create a repo, put some data in, take basebackup, and shut it down @@ -28,17 +30,21 @@ def test_import(neon_env_builder, timeline = pg.safe_psql("show neon.timeline_id")[0][0] pg_bin.run(["pg_basebackup", "-d", pg.connstr(), "-D", basebackup_dir]) + # compress basebackup + with tarfile.open(basebackup_tar_path, "w:gz") as tf: + # TODO match iteration order to what pageserver would do + tf.add(basebackup_dir) + # Create a new repo, load the basebackup into it, and check that data is there with NeonEnvBuilder(destination_repo_dir, port_distributor, default_broker, mock_s3_server) as builder: env = builder.init_start() - # TODO do everything from neon_cli instead - env.pageserver.safe_psql(f"import {tenant} {timeline}") env.neon_cli.raw_cli([ "timeline", "import", "--tenant-id", tenant, "--timeline-id", timeline, "--node-name", node_name, + "--tarfile", basebackup_tar_path, ]) - pg = env.postgres.create_start(node_name, tenant_id=UUID(tenant)) - assert pg.safe_psql('select count(*) from t') == [(300000, )] + # pg = env.postgres.create_start(node_name, tenant_id=UUID(tenant)) + # assert pg.safe_psql('select count(*) from t') == [(300000, )]