mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-23 16:10:37 +00:00
Pass lsn
This commit is contained in:
@@ -534,6 +534,7 @@ impl PageServerNode {
|
||||
tenant_id: ZTenantId,
|
||||
timeline_id: ZTimelineId,
|
||||
tarfile: PathBuf,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut client = self.pg_connection_config.connect(NoTls).unwrap();
|
||||
|
||||
@@ -542,7 +543,7 @@ impl PageServerNode {
|
||||
let mut reader = BufReader::new(file);
|
||||
|
||||
// Init writer
|
||||
let import_cmd = format!("import {tenant_id} {timeline_id}");
|
||||
let import_cmd = format!("import {tenant_id} {timeline_id} {lsn}");
|
||||
let mut writer = client.copy_in(&import_cmd)?;
|
||||
|
||||
// Stream reader -> writer
|
||||
|
||||
@@ -166,7 +166,9 @@ fn main() -> Result<()> {
|
||||
.arg(Arg::new("node-name").long("node-name").takes_value(true)
|
||||
.help("Name to assign to the imported timeline"))
|
||||
.arg(Arg::new("tarfile").long("tarfile").takes_value(true)
|
||||
.help("Basebackup tarfil to import")))
|
||||
.help("Basebackup tarfil to import"))
|
||||
.arg(Arg::new("lsn").long("lsn").takes_value(true)
|
||||
.help("Lsn the basebackup ends at")))
|
||||
).subcommand(
|
||||
App::new("tenant")
|
||||
.setting(AppSettings::ArgRequiredElseHelp)
|
||||
@@ -629,10 +631,12 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
||||
.ok_or_else(|| anyhow!("No node name provided"))?;
|
||||
let tarfile = import_match.value_of("tarfile")
|
||||
.ok_or_else(|| anyhow!("No tarfile provided"))?;
|
||||
let lsn = Lsn::from_str(import_match.value_of("lsn")
|
||||
.ok_or_else(|| anyhow!("No lsn provided"))?)?;
|
||||
|
||||
let mut cplane = ComputeControlPlane::load(env.clone())?;
|
||||
println!("Importing timeline into pageserver ...");
|
||||
pageserver.timeline_import(tenant_id, timeline_id, tarfile.try_into()?)?;
|
||||
pageserver.timeline_import(tenant_id, timeline_id, tarfile.try_into()?, lsn)?;
|
||||
println!("Creating node for imported timeline ...");
|
||||
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
|
||||
cplane.new_node(tenant_id, name, timeline_id, None, None)?;
|
||||
|
||||
@@ -533,6 +533,7 @@ impl PageServerHandler {
|
||||
pgb: &mut PostgresBackend,
|
||||
tenant_id: ZTenantId,
|
||||
timeline_id: ZTimelineId,
|
||||
end_lsn: Lsn,
|
||||
) -> anyhow::Result<()> {
|
||||
let _enter = info_span!("import", timeline = %timeline_id, tenant = %tenant_id).entered();
|
||||
// TODO cap the max number of import threads allowed
|
||||
@@ -541,9 +542,8 @@ impl PageServerHandler {
|
||||
|
||||
// Create empty timeline
|
||||
info!("creating new timeline");
|
||||
let init_lsn = Lsn(0);
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
|
||||
let timeline = repo.create_empty_timeline(timeline_id, init_lsn)?;
|
||||
let timeline = repo.create_empty_timeline(timeline_id, Lsn(0))?;
|
||||
let repartition_distance = repo.get_checkpoint_distance(); // TODO
|
||||
let mut datadir_timeline = DatadirTimeline::<LayeredRepository>::new(
|
||||
timeline, repartition_distance);
|
||||
@@ -552,7 +552,7 @@ impl PageServerHandler {
|
||||
info!("importing basebackup");
|
||||
pgb.write_message(&BeMessage::CopyInResponse)?;
|
||||
let reader = CopyInReader::new(pgb);
|
||||
import_timeline_from_tar(&mut datadir_timeline, reader, init_lsn)?;
|
||||
import_timeline_from_tar(&mut datadir_timeline, reader, end_lsn)?;
|
||||
|
||||
info!("done");
|
||||
Ok(())
|
||||
@@ -837,13 +837,14 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
} else if query_string.starts_with("import ") {
|
||||
let (_, params_raw) = query_string.split_at("import ".len());
|
||||
let params = params_raw.split_whitespace().collect::<Vec<_>>();
|
||||
ensure!(params.len() == 2);
|
||||
ensure!(params.len() == 3);
|
||||
let tenant = ZTenantId::from_str(params[0])?;
|
||||
let timeline = ZTimelineId::from_str(params[1])?;
|
||||
let lsn = Lsn::from_str(params[2])?;
|
||||
|
||||
self.check_permission(Some(tenant))?;
|
||||
|
||||
self.handle_import(pgb, tenant, timeline)?;
|
||||
self.handle_import(pgb, tenant, timeline, lsn)?;
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
} else if query_string.to_ascii_lowercase().starts_with("set ") {
|
||||
// important because psycopg2 executes "SET datestyle TO 'ISO'"
|
||||
|
||||
@@ -2,6 +2,8 @@ from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from uuid import UUID
|
||||
import tarfile
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def test_import(neon_env_builder,
|
||||
@@ -12,28 +14,56 @@ def test_import(neon_env_builder,
|
||||
pg_bin):
|
||||
"""Move a timeline to a new neon stack using pg_basebackup as interface."""
|
||||
node_name = "test_import"
|
||||
source_repo_dir = os.path.join(test_output_dir, "source_repo")
|
||||
destination_repo_dir = os.path.join(test_output_dir, "destination_repo")
|
||||
basebackup_dir = os.path.join(test_output_dir, "basebackup")
|
||||
basebackup_tar_path = os.path.join(test_output_dir, "basebackup.tar")
|
||||
source_repo_dir = Path(test_output_dir) / "source_repo"
|
||||
destination_repo_dir = Path(test_output_dir) / "destination_repo"
|
||||
basebackup_dir = Path(test_output_dir) / "basebackup"
|
||||
basebackup_tar_path = Path(test_output_dir) / "basebackup.tar"
|
||||
os.mkdir(basebackup_dir)
|
||||
|
||||
# Create a repo, put some data in, take basebackup, and shut it down
|
||||
with NeonEnvBuilder(source_repo_dir, port_distributor, default_broker, mock_s3_server) as builder:
|
||||
|
||||
# Insert data
|
||||
env = builder.init_start()
|
||||
env.neon_cli.create_branch(node_name)
|
||||
pg = env.postgres.create_start(node_name)
|
||||
pg.safe_psql("create table t as select generate_series(1,300000)")
|
||||
assert pg.safe_psql('select count(*) from t') == [(300000, )]
|
||||
|
||||
# Get basebackup
|
||||
lsn = pg.safe_psql('select pg_current_wal_flush_lsn()')[0][0]
|
||||
tenant = pg.safe_psql("show neon.tenant_id")[0][0]
|
||||
timeline = pg.safe_psql("show neon.timeline_id")[0][0]
|
||||
pg_bin.run(["pg_basebackup", "-d", pg.connstr(), "-D", basebackup_dir])
|
||||
timeline_dir = source_repo_dir / "tenants" / tenant / "timelines" / timeline
|
||||
pg_bin.run(["pg_basebackup", "-d", pg.connstr(), "-D", str(basebackup_dir)])
|
||||
|
||||
# Pack basebackup into tar file (uncompressed)
|
||||
with tarfile.open(basebackup_tar_path, "w") as tf:
|
||||
# TODO match iteration order to what pageserver would do
|
||||
tf.add(basebackup_dir)
|
||||
|
||||
# Remove timeline
|
||||
# env.pageserver.stop()
|
||||
# shutil.rmtree(timeline_dir)
|
||||
env.pageserver.http_client().timeline_detach(UUID(tenant), UUID(timeline))
|
||||
|
||||
# env.neon_cli.create_tenant(UUID(tenant))
|
||||
env.neon_cli.raw_cli([
|
||||
"timeline",
|
||||
"import",
|
||||
"--tenant-id", tenant,
|
||||
"--timeline-id", timeline,
|
||||
"--node-name", node_name,
|
||||
"--tarfile", str(basebackup_tar_path),
|
||||
"--lsn", lsn,
|
||||
])
|
||||
# pg = env.postgres.create_start(node_name, tenant_id=UUID(tenant))
|
||||
assert pg.safe_psql('select count(*) from t') == [(300000, )]
|
||||
|
||||
|
||||
# XXX
|
||||
return
|
||||
|
||||
# Pack into tar file (uncompressed)
|
||||
with tarfile.open(basebackup_tar_path, "w") as tf:
|
||||
# TODO match iteration order to what pageserver would do
|
||||
tf.add(basebackup_dir)
|
||||
|
||||
# Create a new repo, load the basebackup into it, and check that data is there
|
||||
with NeonEnvBuilder(destination_repo_dir, port_distributor, default_broker, mock_s3_server) as builder:
|
||||
|
||||
Reference in New Issue
Block a user