mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 14:32:57 +00:00
test: Add helper function for importing a Postgres cluster (#8025)
Also, modify the "neon_local timeline import" command so that it doesn't create the endpoint any more. I don't see any reason to bundle that in the same command, the "timeline create" and "timeline branch" commands don't do that either. I plan to add more tests similar to 'test_import_at_2bil', this will help to reduce the copy-pasting.
This commit is contained in:
committed by
GitHub
parent
04b2ac3fed
commit
d2753719e3
@@ -600,13 +600,9 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
|
||||
Some(("import", import_match)) => {
|
||||
let tenant_id = get_tenant_id(import_match, env)?;
|
||||
let timeline_id = parse_timeline_id(import_match)?.expect("No timeline id provided");
|
||||
let name = import_match
|
||||
.get_one::<String>("node-name")
|
||||
.ok_or_else(|| anyhow!("No node name provided"))?;
|
||||
let update_catalog = import_match
|
||||
.get_one::<bool>("update-catalog")
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
let branch_name = import_match
|
||||
.get_one::<String>("branch-name")
|
||||
.ok_or_else(|| anyhow!("No branch name provided"))?;
|
||||
|
||||
// Parse base inputs
|
||||
let base_tarfile = import_match
|
||||
@@ -633,24 +629,11 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
|
||||
.copied()
|
||||
.context("Failed to parse postgres version from the argument string")?;
|
||||
|
||||
let mut cplane = ComputeControlPlane::load(env.clone())?;
|
||||
println!("Importing timeline into pageserver ...");
|
||||
pageserver
|
||||
.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version)
|
||||
.await?;
|
||||
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
|
||||
|
||||
println!("Creating endpoint for imported timeline ...");
|
||||
cplane.new_endpoint(
|
||||
name,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
None,
|
||||
None,
|
||||
pg_version,
|
||||
ComputeMode::Primary,
|
||||
!update_catalog,
|
||||
)?;
|
||||
env.register_branch_mapping(branch_name.to_string(), tenant_id, timeline_id)?;
|
||||
println!("Done");
|
||||
}
|
||||
Some(("branch", branch_match)) => {
|
||||
@@ -1487,8 +1470,7 @@ fn cli() -> Command {
|
||||
.about("Import timeline from basebackup directory")
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(timeline_id_arg.clone())
|
||||
.arg(Arg::new("node-name").long("node-name")
|
||||
.help("Name to assign to the imported timeline"))
|
||||
.arg(branch_name_arg.clone())
|
||||
.arg(Arg::new("base-tarfile")
|
||||
.long("base-tarfile")
|
||||
.value_parser(value_parser!(PathBuf))
|
||||
@@ -1504,7 +1486,6 @@ fn cli() -> Command {
|
||||
.arg(Arg::new("end-lsn").long("end-lsn")
|
||||
.help("Lsn the basebackup ends at"))
|
||||
.arg(pg_version_arg.clone())
|
||||
.arg(update_catalog.clone())
|
||||
)
|
||||
).subcommand(
|
||||
Command::new("tenant")
|
||||
|
||||
@@ -4659,6 +4659,70 @@ def fork_at_current_lsn(
|
||||
return env.neon_cli.create_branch(new_branch_name, ancestor_branch_name, tenant_id, current_lsn)
|
||||
|
||||
|
||||
def import_timeline_from_vanilla_postgres(
|
||||
test_output_dir: Path,
|
||||
env: NeonEnv,
|
||||
pg_bin: PgBin,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
branch_name: str,
|
||||
vanilla_pg_connstr: str,
|
||||
):
|
||||
"""
|
||||
Create a new timeline, by importing an existing PostgreSQL cluster.
|
||||
|
||||
This works by taking a physical backup of the running PostgreSQL cluster, and importing that.
|
||||
"""
|
||||
|
||||
# Take backup of the existing PostgreSQL server with pg_basebackup
|
||||
basebackup_dir = os.path.join(test_output_dir, "basebackup")
|
||||
base_tar = os.path.join(basebackup_dir, "base.tar")
|
||||
wal_tar = os.path.join(basebackup_dir, "pg_wal.tar")
|
||||
os.mkdir(basebackup_dir)
|
||||
pg_bin.run(
|
||||
[
|
||||
"pg_basebackup",
|
||||
"-F",
|
||||
"tar",
|
||||
"-d",
|
||||
vanilla_pg_connstr,
|
||||
"-D",
|
||||
basebackup_dir,
|
||||
]
|
||||
)
|
||||
|
||||
# Extract start_lsn and end_lsn form the backup manifest file
|
||||
with open(os.path.join(basebackup_dir, "backup_manifest")) as f:
|
||||
manifest = json.load(f)
|
||||
start_lsn = manifest["WAL-Ranges"][0]["Start-LSN"]
|
||||
end_lsn = manifest["WAL-Ranges"][0]["End-LSN"]
|
||||
|
||||
# Import the backup tarballs into the pageserver
|
||||
env.neon_cli.raw_cli(
|
||||
[
|
||||
"timeline",
|
||||
"import",
|
||||
"--tenant-id",
|
||||
str(tenant_id),
|
||||
"--timeline-id",
|
||||
str(timeline_id),
|
||||
"--branch-name",
|
||||
branch_name,
|
||||
"--base-lsn",
|
||||
start_lsn,
|
||||
"--base-tarfile",
|
||||
base_tar,
|
||||
"--end-lsn",
|
||||
end_lsn,
|
||||
"--wal-tarfile",
|
||||
wal_tar,
|
||||
"--pg-version",
|
||||
env.pg_version,
|
||||
]
|
||||
)
|
||||
wait_for_last_record_lsn(env.pageserver.http_client(), tenant_id, timeline_id, Lsn(end_lsn))
|
||||
|
||||
|
||||
def last_flush_lsn_upload(
|
||||
env: NeonEnv,
|
||||
endpoint: Endpoint,
|
||||
|
||||
@@ -76,7 +76,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
start_lsn = manifest["WAL-Ranges"][0]["Start-LSN"]
|
||||
end_lsn = manifest["WAL-Ranges"][0]["End-LSN"]
|
||||
|
||||
endpoint_id = "ep-import_from_vanilla"
|
||||
branch_name = "import_from_vanilla"
|
||||
tenant = TenantId.generate()
|
||||
timeline = TimelineId.generate()
|
||||
|
||||
@@ -106,8 +106,8 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
str(tenant),
|
||||
"--timeline-id",
|
||||
str(timeline),
|
||||
"--node-name",
|
||||
endpoint_id,
|
||||
"--branch-name",
|
||||
branch_name,
|
||||
"--base-lsn",
|
||||
start_lsn,
|
||||
"--base-tarfile",
|
||||
@@ -146,7 +146,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
wait_for_upload(client, tenant, timeline, Lsn(end_lsn))
|
||||
|
||||
# Check it worked
|
||||
endpoint = env.endpoints.create_start(endpoint_id, tenant_id=tenant)
|
||||
endpoint = env.endpoints.create_start(branch_name, tenant_id=tenant)
|
||||
assert endpoint.safe_psql("select count(*) from t") == [(300000,)]
|
||||
|
||||
vanilla_pg.stop()
|
||||
@@ -265,7 +265,7 @@ def _import(
|
||||
tenant = TenantId.generate()
|
||||
|
||||
# Import to pageserver
|
||||
endpoint_id = "ep-import_from_pageserver"
|
||||
branch_name = "import_from_pageserver"
|
||||
client = env.pageserver.http_client()
|
||||
env.pageserver.tenant_create(tenant)
|
||||
env.neon_cli.raw_cli(
|
||||
@@ -276,8 +276,8 @@ def _import(
|
||||
str(tenant),
|
||||
"--timeline-id",
|
||||
str(timeline),
|
||||
"--node-name",
|
||||
endpoint_id,
|
||||
"--branch-name",
|
||||
branch_name,
|
||||
"--base-lsn",
|
||||
str(lsn),
|
||||
"--base-tarfile",
|
||||
@@ -292,7 +292,7 @@ def _import(
|
||||
wait_for_upload(client, tenant, timeline, lsn)
|
||||
|
||||
# Check it worked
|
||||
endpoint = env.endpoints.create_start(endpoint_id, tenant_id=tenant, lsn=lsn)
|
||||
endpoint = env.endpoints.create_start(branch_name, tenant_id=tenant, lsn=lsn)
|
||||
assert endpoint.safe_psql("select count(*) from tbl") == [(expected_num_rows,)]
|
||||
|
||||
# Take another fullbackup
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
from fixtures.common_types import Lsn, TenantId, TimelineId
|
||||
from fixtures.common_types import TenantId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_wal_insert_lsn
|
||||
from fixtures.pageserver.utils import (
|
||||
wait_for_last_record_lsn,
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
import_timeline_from_vanilla_postgres,
|
||||
wait_for_wal_insert_lsn,
|
||||
)
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
from fixtures.utils import query_scalar
|
||||
@@ -76,7 +77,6 @@ def test_import_at_2bil(
|
||||
):
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
env = neon_env_builder.init_start()
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# Reset the vanilla Postgres instance to somewhat before 2 billion transactions.
|
||||
pg_resetwal_path = os.path.join(pg_bin.pg_bin_path, "pg_resetwal")
|
||||
@@ -92,68 +92,28 @@ def test_import_at_2bil(
|
||||
assert vanilla_pg.safe_psql("select count(*) from tt") == [(300000,)]
|
||||
vanilla_pg.safe_psql("CREATE TABLE t (t text);")
|
||||
vanilla_pg.safe_psql("INSERT INTO t VALUES ('inserted in vanilla')")
|
||||
|
||||
endpoint_id = "ep-import_from_vanilla"
|
||||
tenant = TenantId.generate()
|
||||
timeline = TimelineId.generate()
|
||||
|
||||
env.pageserver.tenant_create(tenant)
|
||||
|
||||
# Take basebackup
|
||||
basebackup_dir = os.path.join(test_output_dir, "basebackup")
|
||||
base_tar = os.path.join(basebackup_dir, "base.tar")
|
||||
wal_tar = os.path.join(basebackup_dir, "pg_wal.tar")
|
||||
os.mkdir(basebackup_dir)
|
||||
vanilla_pg.safe_psql("CHECKPOINT")
|
||||
pg_bin.run(
|
||||
[
|
||||
"pg_basebackup",
|
||||
"-F",
|
||||
"tar",
|
||||
"-d",
|
||||
vanilla_pg.connstr(),
|
||||
"-D",
|
||||
basebackup_dir,
|
||||
]
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
env.pageserver.tenant_create(tenant_id)
|
||||
timeline_id = TimelineId.generate()
|
||||
|
||||
# Import the cluster to Neon
|
||||
import_timeline_from_vanilla_postgres(
|
||||
test_output_dir,
|
||||
env,
|
||||
pg_bin,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
"imported_2bil_xids",
|
||||
vanilla_pg.connstr(),
|
||||
)
|
||||
vanilla_pg.stop() # don't need the original server anymore
|
||||
|
||||
# Get start_lsn and end_lsn
|
||||
with open(os.path.join(basebackup_dir, "backup_manifest")) as f:
|
||||
manifest = json.load(f)
|
||||
start_lsn = manifest["WAL-Ranges"][0]["Start-LSN"]
|
||||
end_lsn = manifest["WAL-Ranges"][0]["End-LSN"]
|
||||
|
||||
def import_tar(base, wal):
|
||||
env.neon_cli.raw_cli(
|
||||
[
|
||||
"timeline",
|
||||
"import",
|
||||
"--tenant-id",
|
||||
str(tenant),
|
||||
"--timeline-id",
|
||||
str(timeline),
|
||||
"--node-name",
|
||||
endpoint_id,
|
||||
"--base-lsn",
|
||||
start_lsn,
|
||||
"--base-tarfile",
|
||||
base,
|
||||
"--end-lsn",
|
||||
end_lsn,
|
||||
"--wal-tarfile",
|
||||
wal,
|
||||
"--pg-version",
|
||||
env.pg_version,
|
||||
]
|
||||
)
|
||||
|
||||
# Importing correct backup works
|
||||
import_tar(base_tar, wal_tar)
|
||||
wait_for_last_record_lsn(ps_http, tenant, timeline, Lsn(end_lsn))
|
||||
|
||||
# Check that it works
|
||||
endpoint = env.endpoints.create_start(
|
||||
endpoint_id,
|
||||
tenant_id=tenant,
|
||||
"imported_2bil_xids",
|
||||
tenant_id=tenant_id,
|
||||
config_lines=[
|
||||
"log_autovacuum_min_duration = 0",
|
||||
"autovacuum_naptime='5 s'",
|
||||
@@ -161,7 +121,6 @@ def test_import_at_2bil(
|
||||
)
|
||||
assert endpoint.safe_psql("select count(*) from t") == [(1,)]
|
||||
|
||||
# Ok, consume
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
|
||||
@@ -213,7 +172,7 @@ def test_import_at_2bil(
|
||||
cur.execute("checkpoint")
|
||||
|
||||
# wait until pageserver receives that data
|
||||
wait_for_wal_insert_lsn(env, endpoint, tenant, timeline)
|
||||
wait_for_wal_insert_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
# Restart endpoint
|
||||
endpoint.stop()
|
||||
|
||||
Reference in New Issue
Block a user