diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index aa087b6c27..86c59d0ec2 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -534,6 +534,7 @@ impl PageServerNode { tenant_id: ZTenantId, timeline_id: ZTimelineId, tarfile: PathBuf, + lsn: Lsn, ) -> anyhow::Result<()> { let mut client = self.pg_connection_config.connect(NoTls).unwrap(); @@ -542,7 +543,7 @@ impl PageServerNode { let mut reader = BufReader::new(file); // Init writer - let import_cmd = format!("import {tenant_id} {timeline_id}"); + let import_cmd = format!("import {tenant_id} {timeline_id} {lsn}"); let mut writer = client.copy_in(&import_cmd)?; // Stream reader -> writer diff --git a/neon_local/src/main.rs b/neon_local/src/main.rs index 4bec31912d..0e48d6bd6c 100644 --- a/neon_local/src/main.rs +++ b/neon_local/src/main.rs @@ -166,7 +166,9 @@ fn main() -> Result<()> { .arg(Arg::new("node-name").long("node-name").takes_value(true) .help("Name to assign to the imported timeline")) .arg(Arg::new("tarfile").long("tarfile").takes_value(true) - .help("Basebackup tarfil to import"))) + .help("Basebackup tarfil to import")) + .arg(Arg::new("lsn").long("lsn").takes_value(true) + .help("Lsn the basebackup ends at"))) ).subcommand( App::new("tenant") .setting(AppSettings::ArgRequiredElseHelp) @@ -629,10 +631,12 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) - .ok_or_else(|| anyhow!("No node name provided"))?; let tarfile = import_match.value_of("tarfile") .ok_or_else(|| anyhow!("No tarfile provided"))?; + let lsn = Lsn::from_str(import_match.value_of("lsn") + .ok_or_else(|| anyhow!("No lsn provided"))?)?; let mut cplane = ComputeControlPlane::load(env.clone())?; println!("Importing timeline into pageserver ..."); - pageserver.timeline_import(tenant_id, timeline_id, tarfile.try_into()?)?; + pageserver.timeline_import(tenant_id, timeline_id, tarfile.try_into()?, lsn)?; println!("Creating node for imported timeline ..."); env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?; cplane.new_node(tenant_id, name, timeline_id, None, None)?; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 2df453d0e7..341c9cd3ae 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -533,6 +533,7 @@ impl PageServerHandler { pgb: &mut PostgresBackend, tenant_id: ZTenantId, timeline_id: ZTimelineId, + end_lsn: Lsn, ) -> anyhow::Result<()> { let _enter = info_span!("import", timeline = %timeline_id, tenant = %tenant_id).entered(); // TODO cap the max number of import threads allowed @@ -541,9 +542,8 @@ impl PageServerHandler { // Create empty timeline info!("creating new timeline"); - let init_lsn = Lsn(0); let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?; - let timeline = repo.create_empty_timeline(timeline_id, init_lsn)?; + let timeline = repo.create_empty_timeline(timeline_id, Lsn(0))?; let repartition_distance = repo.get_checkpoint_distance(); // TODO let mut datadir_timeline = DatadirTimeline::::new( timeline, repartition_distance); @@ -552,7 +552,7 @@ impl PageServerHandler { info!("importing basebackup"); pgb.write_message(&BeMessage::CopyInResponse)?; let reader = CopyInReader::new(pgb); - import_timeline_from_tar(&mut datadir_timeline, reader, init_lsn)?; + import_timeline_from_tar(&mut datadir_timeline, reader, end_lsn)?; info!("done"); Ok(()) @@ -837,13 +837,14 @@ impl postgres_backend::Handler for PageServerHandler { } else if query_string.starts_with("import ") { let (_, params_raw) = query_string.split_at("import ".len()); let params = params_raw.split_whitespace().collect::>(); - ensure!(params.len() == 2); + ensure!(params.len() == 3); let tenant = ZTenantId::from_str(params[0])?; let timeline = ZTimelineId::from_str(params[1])?; + let lsn = Lsn::from_str(params[2])?; self.check_permission(Some(tenant))?; - self.handle_import(pgb, tenant, timeline)?; + self.handle_import(pgb, tenant, timeline, lsn)?; pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; } else if query_string.to_ascii_lowercase().starts_with("set ") { // important because psycopg2 executes "SET datestyle TO 'ISO'" diff --git a/test_runner/batch_others/test_import.py b/test_runner/batch_others/test_import.py index ac5d606fb0..72f069f889 100644 --- a/test_runner/batch_others/test_import.py +++ b/test_runner/batch_others/test_import.py @@ -2,6 +2,8 @@ from fixtures.neon_fixtures import NeonEnvBuilder from uuid import UUID import tarfile import os +import shutil +from pathlib import Path def test_import(neon_env_builder, @@ -12,28 +14,56 @@ def test_import(neon_env_builder, pg_bin): """Move a timeline to a new neon stack using pg_basebackup as interface.""" node_name = "test_import" - source_repo_dir = os.path.join(test_output_dir, "source_repo") - destination_repo_dir = os.path.join(test_output_dir, "destination_repo") - basebackup_dir = os.path.join(test_output_dir, "basebackup") - basebackup_tar_path = os.path.join(test_output_dir, "basebackup.tar") + source_repo_dir = Path(test_output_dir) / "source_repo" + destination_repo_dir = Path(test_output_dir) / "destination_repo" + basebackup_dir = Path(test_output_dir) / "basebackup" + basebackup_tar_path = Path(test_output_dir) / "basebackup.tar" os.mkdir(basebackup_dir) # Create a repo, put some data in, take basebackup, and shut it down with NeonEnvBuilder(source_repo_dir, port_distributor, default_broker, mock_s3_server) as builder: + + # Insert data env = builder.init_start() env.neon_cli.create_branch(node_name) pg = env.postgres.create_start(node_name) pg.safe_psql("create table t as select generate_series(1,300000)") assert pg.safe_psql('select count(*) from t') == [(300000, )] + # Get basebackup + lsn = pg.safe_psql('select pg_current_wal_flush_lsn()')[0][0] tenant = pg.safe_psql("show neon.tenant_id")[0][0] timeline = pg.safe_psql("show neon.timeline_id")[0][0] - pg_bin.run(["pg_basebackup", "-d", pg.connstr(), "-D", basebackup_dir]) + timeline_dir = source_repo_dir / "tenants" / tenant / "timelines" / timeline + pg_bin.run(["pg_basebackup", "-d", pg.connstr(), "-D", str(basebackup_dir)]) + + # Pack basebackup into tar file (uncompressed) + with tarfile.open(basebackup_tar_path, "w") as tf: + # TODO match iteration order to what pageserver would do + tf.add(basebackup_dir) + + # Remove timeline + # env.pageserver.stop() + # shutil.rmtree(timeline_dir) + env.pageserver.http_client().timeline_detach(UUID(tenant), UUID(timeline)) + + # env.neon_cli.create_tenant(UUID(tenant)) + env.neon_cli.raw_cli([ + "timeline", + "import", + "--tenant-id", tenant, + "--timeline-id", timeline, + "--node-name", node_name, + "--tarfile", str(basebackup_tar_path), + "--lsn", lsn, + ]) + # pg = env.postgres.create_start(node_name, tenant_id=UUID(tenant)) + assert pg.safe_psql('select count(*) from t') == [(300000, )] + + + # XXX + return - # Pack into tar file (uncompressed) - with tarfile.open(basebackup_tar_path, "w") as tf: - # TODO match iteration order to what pageserver would do - tf.add(basebackup_dir) # Create a new repo, load the basebackup into it, and check that data is there with NeonEnvBuilder(destination_repo_dir, port_distributor, default_broker, mock_s3_server) as builder: