mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-03 19:42:55 +00:00
refactor(test): duplication with fullbackup, tar content hashing (#7828)
"taking a fullbackup" is an ugly multi-liner copypasted in multiple places, most recently with timeline ancestor detach tests. move it under `PgBin` which is not a great place, but better than yet another utility function. Additionally: - cleanup `psql_env` repetition (PgBin already configures that) - move the backup tar comparison as a yet another free utility function - use backup tar comparison in `test_import.py` where a size check was done previously - cleanup extra timeline creation from test Cc: #7715
This commit is contained in:
@@ -2788,6 +2788,28 @@ class PgBin:
|
||||
log.info(f"last checkpoint at {checkpoint_lsn}")
|
||||
return Lsn(checkpoint_lsn)
|
||||
|
||||
def take_fullbackup(
|
||||
self,
|
||||
pageserver: NeonPageserver,
|
||||
tenant: TenantId,
|
||||
timeline: TimelineId,
|
||||
lsn: Lsn,
|
||||
output: Path,
|
||||
):
|
||||
"""
|
||||
Request fullbackup from pageserver, store it at 'output'.
|
||||
"""
|
||||
cmd = [
|
||||
"psql",
|
||||
"--no-psqlrc",
|
||||
pageserver.connstr(),
|
||||
"-c",
|
||||
f"fullbackup {tenant} {timeline} {lsn}",
|
||||
"-o",
|
||||
str(output),
|
||||
]
|
||||
self.run_capture(cmd)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_bin(test_output_dir: Path, pg_distrib_dir: Path, pg_version: PgVersion) -> PgBin:
|
||||
|
||||
@@ -4,10 +4,13 @@ import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import tarfile
|
||||
import threading
|
||||
import time
|
||||
from hashlib import sha256
|
||||
from pathlib import Path
|
||||
from typing import (
|
||||
IO,
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Callable,
|
||||
@@ -15,8 +18,10 @@ from typing import (
|
||||
Iterable,
|
||||
List,
|
||||
Optional,
|
||||
Set,
|
||||
Tuple,
|
||||
TypeVar,
|
||||
Union,
|
||||
)
|
||||
from urllib.parse import urlencode
|
||||
|
||||
@@ -499,3 +504,48 @@ class AuxFileStore(str, enum.Enum):
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"'aux-{self.value}'"
|
||||
|
||||
|
||||
def assert_pageserver_backups_equal(left: Path, right: Path, skip_files: Set[str]):
|
||||
"""
|
||||
This is essentially:
|
||||
|
||||
lines=$(comm -3 \
|
||||
<(mkdir left && cd left && tar xf "$left" && find . -type f -print0 | xargs sha256sum | sort -k2) \
|
||||
<(mkdir right && cd right && tar xf "$right" && find . -type f -print0 | xargs sha256sum | sort -k2) \
|
||||
| wc -l)
|
||||
[ "$lines" = "0" ]
|
||||
|
||||
But in a more mac friendly fashion.
|
||||
"""
|
||||
started_at = time.time()
|
||||
|
||||
def hash_extracted(reader: Union[IO[bytes], None]) -> bytes:
|
||||
assert reader is not None
|
||||
digest = sha256(usedforsecurity=False)
|
||||
while True:
|
||||
buf = reader.read(64 * 1024)
|
||||
if not buf:
|
||||
break
|
||||
digest.update(buf)
|
||||
return digest.digest()
|
||||
|
||||
def build_hash_list(p: Path) -> List[Tuple[str, bytes]]:
|
||||
with tarfile.open(p) as f:
|
||||
matching_files = (info for info in f if info.isreg() and info.name not in skip_files)
|
||||
ret = list(
|
||||
map(lambda info: (info.name, hash_extracted(f.extractfile(info))), matching_files)
|
||||
)
|
||||
ret.sort(key=lambda t: t[0])
|
||||
return ret
|
||||
|
||||
left_list, right_list = map(build_hash_list, [left, right])
|
||||
|
||||
try:
|
||||
assert len(left_list) == len(right_list)
|
||||
|
||||
for left_tuple, right_tuple in zip(left_list, right_list):
|
||||
assert left_tuple == right_tuple
|
||||
finally:
|
||||
elapsed = time.time() - started_at
|
||||
log.info(f"assert_pageserver_backups_equal completed in {elapsed}s")
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from fixtures.common_types import Lsn, TimelineId
|
||||
from fixtures.common_types import Lsn
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
@@ -19,17 +19,16 @@ def test_fullbackup(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_bin: PgBin,
|
||||
port_distributor: PortDistributor,
|
||||
pg_distrib_dir: Path,
|
||||
test_output_dir: Path,
|
||||
):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch("test_fullbackup")
|
||||
endpoint_main = env.endpoints.create_start("test_fullbackup")
|
||||
# endpoint needs to be alive until the fullbackup so that we have
|
||||
# prev_record_lsn for the vanilla_pg to start in read-write mode
|
||||
# for some reason this does not happen if endpoint is shutdown.
|
||||
endpoint_main = env.endpoints.create_start("main")
|
||||
|
||||
with endpoint_main.cursor() as cur:
|
||||
timeline = TimelineId(query_scalar(cur, "SHOW neon.timeline_id"))
|
||||
|
||||
# data loading may take a while, so increase statement timeout
|
||||
cur.execute("SET statement_timeout='300s'")
|
||||
cur.execute(
|
||||
@@ -41,17 +40,13 @@ def test_fullbackup(
|
||||
lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_insert_lsn()"))
|
||||
log.info(f"start_backup_lsn = {lsn}")
|
||||
|
||||
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
|
||||
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.
|
||||
psql_env = {"LD_LIBRARY_PATH": str(pg_distrib_dir / "lib")}
|
||||
|
||||
# Get and unpack fullbackup from pageserver
|
||||
restored_dir_path = env.repo_dir / "restored_datadir"
|
||||
os.mkdir(restored_dir_path, 0o750)
|
||||
query = f"fullbackup {env.initial_tenant} {timeline} {lsn}"
|
||||
tar_output_file = test_output_dir / "fullbackup.tar"
|
||||
cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query, "-o", str(tar_output_file)]
|
||||
pg_bin.run_capture(cmd, env=psql_env)
|
||||
pg_bin.take_fullbackup(
|
||||
env.pageserver, env.initial_tenant, env.initial_timeline, lsn, tar_output_file
|
||||
)
|
||||
subprocess_capture(
|
||||
env.repo_dir, ["tar", "-xf", str(tar_output_file), "-C", str(restored_dir_path)]
|
||||
)
|
||||
@@ -61,7 +56,7 @@ def test_fullbackup(
|
||||
# use resetwal to overwrite it
|
||||
pg_resetwal_path = os.path.join(pg_bin.pg_bin_path, "pg_resetwal")
|
||||
cmd = [pg_resetwal_path, "-D", str(restored_dir_path)]
|
||||
pg_bin.run_capture(cmd, env=psql_env)
|
||||
pg_bin.run_capture(cmd)
|
||||
|
||||
# Restore from the backup and find the data we inserted
|
||||
port = port_distributor.get_port()
|
||||
|
||||
@@ -21,7 +21,7 @@ from fixtures.pageserver.utils import (
|
||||
wait_for_upload,
|
||||
)
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
from fixtures.utils import subprocess_capture
|
||||
from fixtures.utils import assert_pageserver_backups_equal, subprocess_capture
|
||||
|
||||
|
||||
def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_builder):
|
||||
@@ -248,15 +248,9 @@ def _import(
|
||||
path to the backup archive file"""
|
||||
log.info(f"start_backup_lsn = {lsn}")
|
||||
|
||||
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
|
||||
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.
|
||||
psql_env = {"LD_LIBRARY_PATH": str(pg_distrib_dir / "lib")}
|
||||
|
||||
# Get a fullbackup from pageserver
|
||||
query = f"fullbackup { env.initial_tenant} {timeline} {lsn}"
|
||||
tar_output_file = test_output_dir / "fullbackup.tar"
|
||||
cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query, "-o", str(tar_output_file)]
|
||||
pg_bin.run_capture(cmd, env=psql_env)
|
||||
pg_bin.take_fullbackup(env.pageserver, env.initial_tenant, timeline, lsn, tar_output_file)
|
||||
|
||||
# Stop the first pageserver instance, erase all its data
|
||||
env.endpoints.stop_all()
|
||||
@@ -305,22 +299,11 @@ def _import(
|
||||
assert endpoint.safe_psql("select count(*) from tbl") == [(expected_num_rows,)]
|
||||
|
||||
# Take another fullbackup
|
||||
query = f"fullbackup { tenant} {timeline} {lsn}"
|
||||
new_tar_output_file = test_output_dir / "fullbackup-new.tar"
|
||||
cmd = [
|
||||
"psql",
|
||||
"--no-psqlrc",
|
||||
env.pageserver.connstr(),
|
||||
"-c",
|
||||
query,
|
||||
"-o",
|
||||
str(new_tar_output_file),
|
||||
]
|
||||
pg_bin.run_capture(cmd, env=psql_env)
|
||||
pg_bin.take_fullbackup(env.pageserver, tenant, timeline, lsn, new_tar_output_file)
|
||||
|
||||
# Check it's the same as the first fullbackup
|
||||
# TODO pageserver should be checking checksum
|
||||
assert os.path.getsize(tar_output_file) == os.path.getsize(new_tar_output_file)
|
||||
assert_pageserver_backups_equal(tar_output_file, new_tar_output_file, set())
|
||||
|
||||
# Check that gc works
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
@@ -5,7 +5,7 @@ from pathlib import Path
|
||||
|
||||
from fixtures.common_types import Lsn, TenantId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, wait_for_wal_insert_lsn
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_wal_insert_lsn
|
||||
from fixtures.pageserver.utils import (
|
||||
wait_for_last_record_lsn,
|
||||
)
|
||||
@@ -71,22 +71,17 @@ def test_next_xid(neon_env_builder: NeonEnvBuilder):
|
||||
def test_import_at_2bil(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
test_output_dir: Path,
|
||||
pg_distrib_dir: Path,
|
||||
pg_bin,
|
||||
pg_bin: PgBin,
|
||||
vanilla_pg,
|
||||
):
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
env = neon_env_builder.init_start()
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
|
||||
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.
|
||||
psql_env = {"LD_LIBRARY_PATH": str(pg_distrib_dir / "lib")}
|
||||
|
||||
# Reset the vanilla Postgres instance to somewhat before 2 billion transactions.
|
||||
pg_resetwal_path = os.path.join(pg_bin.pg_bin_path, "pg_resetwal")
|
||||
cmd = [pg_resetwal_path, "--next-transaction-id=2129920000", "-D", str(vanilla_pg.pgdatadir)]
|
||||
pg_bin.run_capture(cmd, env=psql_env)
|
||||
pg_bin.run_capture(cmd)
|
||||
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
|
||||
|
||||
@@ -1,25 +1,18 @@
|
||||
import datetime
|
||||
import enum
|
||||
import tarfile
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from hashlib import sha256
|
||||
from pathlib import Path
|
||||
from queue import Empty, Queue
|
||||
from threading import Barrier
|
||||
from typing import IO, List, Set, Tuple, Union
|
||||
from typing import List, Tuple
|
||||
|
||||
import pytest
|
||||
from fixtures.common_types import Lsn, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
|
||||
from fixtures.pageserver.http import HistoricLayerInfo
|
||||
from fixtures.pageserver.utils import wait_timeline_detail_404
|
||||
from fixtures.remote_storage import LocalFsStorage
|
||||
from fixtures.utils import assert_pageserver_backups_equal
|
||||
|
||||
|
||||
def by_end_lsn(info: HistoricLayerInfo) -> Lsn:
|
||||
@@ -68,7 +61,6 @@ SHUTDOWN_ALLOWED_ERRORS = [
|
||||
@pytest.mark.parametrize("write_to_branch_first", [True, False])
|
||||
def test_ancestor_detach_branched_from(
|
||||
test_output_dir,
|
||||
pg_distrib_dir,
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_bin: PgBin,
|
||||
branchpoint: Branchpoint,
|
||||
@@ -80,7 +72,6 @@ def test_ancestor_detach_branched_from(
|
||||
"""
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
psql_env = {"LD_LIBRARY_PATH": str(pg_distrib_dir / "lib")}
|
||||
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
@@ -160,16 +151,9 @@ def test_ancestor_detach_branched_from(
|
||||
# run fullbackup to make sure there are no off by one errors
|
||||
# take this on the parent
|
||||
fullbackup_before = test_output_dir / "fullbackup-before.tar"
|
||||
cmd = [
|
||||
"psql",
|
||||
"--no-psqlrc",
|
||||
env.pageserver.connstr(),
|
||||
"-c",
|
||||
f"fullbackup {env.initial_tenant} {env.initial_timeline} {branch_at}",
|
||||
"-o",
|
||||
str(fullbackup_before),
|
||||
]
|
||||
pg_bin.run_capture(cmd, env=psql_env)
|
||||
pg_bin.take_fullbackup(
|
||||
env.pageserver, env.initial_tenant, env.initial_timeline, branch_at, fullbackup_before
|
||||
)
|
||||
|
||||
all_reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
|
||||
assert all_reparented == set()
|
||||
@@ -200,16 +184,9 @@ def test_ancestor_detach_branched_from(
|
||||
|
||||
# take this on the detached, at same lsn
|
||||
fullbackup_after = test_output_dir / "fullbackup-after.tar"
|
||||
cmd = [
|
||||
"psql",
|
||||
"--no-psqlrc",
|
||||
env.pageserver.connstr(),
|
||||
"-c",
|
||||
f"fullbackup {env.initial_tenant} {timeline_id} {branch_at}",
|
||||
"-o",
|
||||
str(fullbackup_after),
|
||||
]
|
||||
pg_bin.run_capture(cmd, env=psql_env)
|
||||
pg_bin.take_fullbackup(
|
||||
env.pageserver, env.initial_tenant, timeline_id, branch_at, fullbackup_after
|
||||
)
|
||||
|
||||
client.timeline_delete(env.initial_tenant, env.initial_timeline)
|
||||
wait_timeline_detail_404(client, env.initial_tenant, env.initial_timeline, 10, 1.0)
|
||||
@@ -218,52 +195,7 @@ def test_ancestor_detach_branched_from(
|
||||
# as there is always "PREV_LSN: invalid" for "before"
|
||||
skip_files = {"zenith.signal"}
|
||||
|
||||
tar_cmp(fullbackup_before, fullbackup_after, skip_files)
|
||||
|
||||
|
||||
def tar_cmp(left: Path, right: Path, skip_files: Set[str]):
|
||||
"""
|
||||
This is essentially:
|
||||
|
||||
lines=$(comm -3 \
|
||||
<(mkdir left && cd left && tar xf "$left" && find . -type f -print0 | xargs sha256sum | sort -k2) \
|
||||
<(mkdir right && cd right && tar xf "$right" && find . -type f -print0 | xargs sha256sum | sort -k2) \
|
||||
| wc -l)
|
||||
[ "$lines" = "0" ]
|
||||
|
||||
But in a more mac friendly fashion.
|
||||
"""
|
||||
started_at = time.time()
|
||||
|
||||
def hash_extracted(reader: Union[IO[bytes], None]) -> bytes:
|
||||
assert reader is not None
|
||||
digest = sha256(usedforsecurity=False)
|
||||
while True:
|
||||
buf = reader.read(64 * 1024)
|
||||
if not buf:
|
||||
break
|
||||
digest.update(buf)
|
||||
return digest.digest()
|
||||
|
||||
def build_hash_list(p: Path) -> List[Tuple[str, bytes]]:
|
||||
with tarfile.open(p) as f:
|
||||
matching_files = (info for info in f if info.isreg() and info.name not in skip_files)
|
||||
ret = list(
|
||||
map(lambda info: (info.name, hash_extracted(f.extractfile(info))), matching_files)
|
||||
)
|
||||
ret.sort(key=lambda t: t[0])
|
||||
return ret
|
||||
|
||||
left_list, right_list = map(build_hash_list, [left, right])
|
||||
|
||||
try:
|
||||
assert len(left_list) == len(right_list)
|
||||
|
||||
for left_tuple, right_tuple in zip(left_list, right_list):
|
||||
assert left_tuple == right_tuple
|
||||
finally:
|
||||
elapsed = time.time() - started_at
|
||||
log.info(f"tar_cmp completed in {elapsed}s")
|
||||
assert_pageserver_backups_equal(fullbackup_before, fullbackup_after, skip_files)
|
||||
|
||||
|
||||
def test_ancestor_detach_reparents_earlier(neon_env_builder: NeonEnvBuilder):
|
||||
@@ -483,7 +415,7 @@ def test_detached_receives_flushes_while_being_detached(neon_env_builder: NeonEn
|
||||
|
||||
|
||||
def test_compaction_induced_by_detaches_in_history(
|
||||
neon_env_builder: NeonEnvBuilder, test_output_dir, pg_distrib_dir, pg_bin: PgBin
|
||||
neon_env_builder: NeonEnvBuilder, test_output_dir, pg_bin: PgBin
|
||||
):
|
||||
"""
|
||||
Assuming the tree of timelines:
|
||||
@@ -500,8 +432,6 @@ def test_compaction_induced_by_detaches_in_history(
|
||||
timeline broken.
|
||||
"""
|
||||
|
||||
psql_env = {"LD_LIBRARY_PATH": str(pg_distrib_dir / "lib")}
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
# we want to create layers manually so we don't branch on arbitrary
|
||||
@@ -589,16 +519,9 @@ def test_compaction_induced_by_detaches_in_history(
|
||||
|
||||
# take the fullbackup before and after inheriting the new L0s
|
||||
fullbackup_before = test_output_dir / "fullbackup-before.tar"
|
||||
cmd = [
|
||||
"psql",
|
||||
"--no-psqlrc",
|
||||
env.pageserver.connstr(),
|
||||
"-c",
|
||||
f"fullbackup {env.initial_tenant} {branch_timeline_id} {branch_lsn}",
|
||||
"-o",
|
||||
str(fullbackup_before),
|
||||
]
|
||||
pg_bin.run_capture(cmd, env=psql_env)
|
||||
pg_bin.take_fullbackup(
|
||||
env.pageserver, env.initial_tenant, branch_timeline_id, branch_lsn, fullbackup_before
|
||||
)
|
||||
|
||||
for _, timeline_id in skip_main:
|
||||
reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
|
||||
@@ -624,19 +547,12 @@ def test_compaction_induced_by_detaches_in_history(
|
||||
assert len(post_compact_l0s) == 1, "only the consecutive inherited L0s should be compacted"
|
||||
|
||||
fullbackup_after = test_output_dir / "fullbackup_after.tar"
|
||||
cmd = [
|
||||
"psql",
|
||||
"--no-psqlrc",
|
||||
env.pageserver.connstr(),
|
||||
"-c",
|
||||
f"fullbackup {env.initial_tenant} {branch_timeline_id} {branch_lsn}",
|
||||
"-o",
|
||||
str(fullbackup_after),
|
||||
]
|
||||
pg_bin.run_capture(cmd, env=psql_env)
|
||||
pg_bin.take_fullbackup(
|
||||
env.pageserver, env.initial_tenant, branch_timeline_id, branch_lsn, fullbackup_after
|
||||
)
|
||||
|
||||
# we don't need to skip any files, because zenith.signal will be identical
|
||||
tar_cmp(fullbackup_before, fullbackup_after, set())
|
||||
assert_pageserver_backups_equal(fullbackup_before, fullbackup_after, set())
|
||||
|
||||
|
||||
# TODO:
|
||||
|
||||
Reference in New Issue
Block a user