Merge branch 'main' into ps-trace

This commit is contained in:
Bojan Serafimov
2022-10-05 13:24:06 -04:00
165 changed files with 5246 additions and 2708 deletions

View File

@@ -6,6 +6,9 @@ Prerequisites:
- Correctly configured Python, see [`/docs/sourcetree.md`](/docs/sourcetree.md#using-python)
- Neon and Postgres binaries
- See the root [README.md](/README.md) for build directions
If you want to test tests with test-only APIs, you would need to add `--features testing` to Rust code build commands.
For convenience, repository cargo config contains `build_testing` alias, that serves as a subcommand, adding the required feature flags.
Usage example: `cargo build_testing --release` is equivalent to `cargo build --features testing --release`
- Tests can be run from the git tree; or see the environment variables
below to run from other directories.
- The neon git repo, including the postgres submodule
@@ -53,10 +56,24 @@ If you want to run all tests that have the string "bench" in their names:
`./scripts/pytest -k bench`
To run tests in parellel we utilize `pytest-xdist` plugin. By default everything runs single threaded. Number of workers can be specified with `-n` argument:
`./scripts/pytest -n4`
By default performance tests are excluded. To run them explicitly pass performance tests selection to the script:
`./scripts/pytest test_runner/performance`
Useful environment variables:
`NEON_BIN`: The directory where neon binaries can be found.
`POSTGRES_DISTRIB_DIR`: The directory where postgres distribution can be found.
Since pageserver supports several postgres versions, `POSTGRES_DISTRIB_DIR` must contain
a subdirectory for each version with naming convention `v{PG_VERSION}/`.
Inside that dir, a `bin/postgres` binary should be present.
`DEFAULT_PG_VERSION`: The version of Postgres to use,
This is used to construct full path to the postgres binaries.
Format is 2-digit major version nubmer, i.e. `DEFAULT_PG_VERSION="14"`
`TEST_OUTPUT`: Set the directory where test state and test output files
should go.
`TEST_SHARED_FIXTURES`: Try to re-use a single pageserver for all the tests.

View File

@@ -89,6 +89,7 @@ class NeonCompare(PgCompare):
self.env = neon_simple_env
self._zenbenchmark = zenbenchmark
self._pg_bin = pg_bin
self.pageserver_http_client = self.env.pageserver.http_client()
self.tenant, _ = self.env.neon_cli.create_tenant(
conf={
@@ -110,10 +111,6 @@ class NeonCompare(PgCompare):
self._pg = self.env.postgres.create_start(
branch_name, "main", self.tenant, config_lines=["shared_buffers=2GB"])
# Long-lived cursor, useful for flushing
self.psconn = self.env.pageserver.connect()
self.pscur = self.psconn.cursor()
@property
def pg(self):
return self._pg
@@ -127,10 +124,10 @@ class NeonCompare(PgCompare):
return self._pg_bin
def flush(self):
self.pscur.execute(f"do_gc {self.tenant} {self.timeline} 0")
self.pageserver_http_client.timeline_gc(self.env.initial_tenant, self.timeline, 0)
def compact(self):
self.pscur.execute(f"compact {self.tenant} {self.timeline}")
self.pageserver_http_client.timeline_compact(self.env.initial_tenant, self.timeline)
def report_peak_memory_use(self) -> None:
self.zenbenchmark.record(

View File

@@ -1,6 +1,5 @@
import logging
import logging.config
import re
"""
This file configures logging to use in python tests.
@@ -30,17 +29,6 @@ LOGGING = {
}
class PasswordFilter(logging.Filter):
"""Filter out password from logs."""
# Good enough to filter our passwords produced by PgProtocol.connstr
FILTER = re.compile(r"(\s*)password=[^\s]+(\s*)")
def filter(self, record: logging.LogRecord) -> bool:
record.msg = self.FILTER.sub(r"\1password=<hidden>\2", str(record.msg))
return True
def getLogger(name="root") -> logging.Logger:
"""Method to get logger for tests.
@@ -50,6 +38,5 @@ def getLogger(name="root") -> logging.Logger:
# default logger for tests
log = getLogger()
log.addFilter(PasswordFilter())
logging.config.dictConfig(LOGGING)

View File

@@ -59,8 +59,8 @@ Env = Dict[str, str]
Fn = TypeVar("Fn", bound=Callable[..., Any])
DEFAULT_OUTPUT_DIR = "test_output"
DEFAULT_POSTGRES_DIR = "pg_install/v14"
DEFAULT_BRANCH_NAME = "main"
DEFAULT_PG_VERSION_DEFAULT = "14"
BASE_PORT = 15000
WORKER_PORT_NUM = 1000
@@ -71,6 +71,7 @@ base_dir = ""
neon_binpath = ""
pg_distrib_dir = ""
top_output_dir = ""
default_pg_version = ""
def pytest_configure(config):
@@ -100,20 +101,36 @@ def pytest_configure(config):
Path(top_output_dir).mkdir(exist_ok=True)
# Find the postgres installation.
global default_pg_version
log.info(f"default_pg_version is {default_pg_version}")
env_default_pg_version = os.environ.get("DEFAULT_PG_VERSION")
if env_default_pg_version:
default_pg_version = env_default_pg_version
log.info(f"default_pg_version is set to {default_pg_version}")
else:
default_pg_version = DEFAULT_PG_VERSION_DEFAULT
global pg_distrib_dir
env_postgres_bin = os.environ.get("POSTGRES_DISTRIB_DIR")
if env_postgres_bin:
pg_distrib_dir = env_postgres_bin
else:
pg_distrib_dir = os.path.normpath(os.path.join(base_dir, DEFAULT_POSTGRES_DIR))
pg_distrib_dir = os.path.normpath(os.path.join(base_dir, "pg_install"))
log.info(f"pg_distrib_dir is {pg_distrib_dir}")
psql_bin_path = os.path.join(pg_distrib_dir, "v{}".format(default_pg_version), "bin/psql")
postgres_bin_path = os.path.join(
pg_distrib_dir, "v{}".format(default_pg_version), "bin/postgres"
)
if os.getenv("REMOTE_ENV"):
# When testing against a remote server, we only need the client binary.
if not os.path.exists(os.path.join(pg_distrib_dir, "bin/psql")):
raise Exception('psql not found at "{}"'.format(pg_distrib_dir))
if not os.path.exists(psql_bin_path):
raise Exception('psql not found at "{}"'.format(psql_bin_path))
else:
if not os.path.exists(os.path.join(pg_distrib_dir, "bin/postgres")):
raise Exception('postgres not found at "{}"'.format(pg_distrib_dir))
if not os.path.exists(postgres_bin_path):
raise Exception('postgres not found at "{}"'.format(postgres_bin_path))
if os.getenv("REMOTE_ENV"):
# we are in remote env and do not have neon binaries locally
@@ -266,10 +283,15 @@ class PgProtocol:
return str(make_dsn(**self.conn_options(**kwargs)))
def conn_options(self, **kwargs):
"""
Construct a dictionary of connection options from default values and extra parameters.
An option can be dropped from the returning dictionary by None-valued extra parameter.
"""
result = self.default_options.copy()
if "dsn" in kwargs:
result.update(parse_dsn(kwargs["dsn"]))
result.update(kwargs)
result = {k: v for k, v in result.items() if v is not None}
# Individual statement timeout in seconds. 2 minutes should be
# enough for our tests, but if you need a longer, you can
@@ -433,6 +455,9 @@ class RemoteStorageKind(enum.Enum):
LOCAL_FS = "local_fs"
MOCK_S3 = "mock_s3"
REAL_S3 = "real_s3"
# Pass to tests that are generic to remote storage
# to ensure the test pass with or without the remote storage
NOOP = "noop"
def available_remote_storages() -> List[RemoteStorageKind]:
@@ -539,6 +564,7 @@ class NeonEnvBuilder:
self.env: Optional[NeonEnv] = None
self.remote_storage_prefix: Optional[str] = None
self.keep_remote_storage_contents: bool = True
self.pg_version = default_pg_version
def init(self) -> NeonEnv:
# Cannot create more than one environment from one builder
@@ -560,7 +586,9 @@ class NeonEnvBuilder:
test_name: str,
force_enable: bool = True,
):
if remote_storage_kind == RemoteStorageKind.LOCAL_FS:
if remote_storage_kind == RemoteStorageKind.NOOP:
return
elif remote_storage_kind == RemoteStorageKind.LOCAL_FS:
self.enable_local_fs_remote_storage(force_enable=force_enable)
elif remote_storage_kind == RemoteStorageKind.MOCK_S3:
self.enable_mock_s3_remote_storage(bucket_name=test_name, force_enable=force_enable)
@@ -751,6 +779,7 @@ class NeonEnv:
self.broker = config.broker
self.remote_storage = config.remote_storage
self.remote_storage_users = config.remote_storage_users
self.pg_version = config.pg_version
# generate initial tenant ID here instead of letting 'neon init' generate it,
# so that we don't need to dig it out of the config file afterwards.
@@ -964,6 +993,24 @@ class NeonPageserverHttpClient(requests.Session):
def check_status(self):
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
def configure_failpoints(self, config_strings: tuple[str, str] | list[tuple[str, str]]) -> None:
if isinstance(config_strings, tuple):
pairs = [config_strings]
else:
pairs = config_strings
log.info(f"Requesting config failpoints: {repr(pairs)}")
res = self.put(
f"http://localhost:{self.port}/v1/failpoints",
json=[{"name": name, "actions": actions} for name, actions in pairs],
)
log.info(f"Got failpoints request response code {res.status_code}")
self.verbose_error(res)
res_json = res.json()
assert res_json is None
return res_json
def tenant_list(self) -> List[Dict[Any, Any]]:
res = self.get(f"http://localhost:{self.port}/v1/tenant")
self.verbose_error(res)
@@ -1061,6 +1108,45 @@ class NeonPageserverHttpClient(requests.Session):
assert res_json is None
return res_json
def timeline_gc(
self, tenant_id: TenantId, timeline_id: TimelineId, gc_horizon: Optional[int]
) -> dict[str, Any]:
log.info(
f"Requesting GC: tenant {tenant_id}, timeline {timeline_id}, gc_horizon {repr(gc_horizon)}"
)
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/do_gc",
json={"gc_horizon": gc_horizon},
)
log.info(f"Got GC request response code: {res.status_code}")
self.verbose_error(res)
res_json = res.json()
assert res_json is not None
assert isinstance(res_json, dict)
return res_json
def timeline_compact(self, tenant_id: TenantId, timeline_id: TimelineId):
log.info(f"Requesting compact: tenant {tenant_id}, timeline {timeline_id}")
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/compact"
)
log.info(f"Got compact request response code: {res.status_code}")
self.verbose_error(res)
res_json = res.json()
assert res_json is None
return res_json
def timeline_checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId):
log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}")
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint"
)
log.info(f"Got checkpoint request response code: {res.status_code}")
self.verbose_error(res)
res_json = res.json()
assert res_json is None
return res_json
def get_metrics(self) -> str:
res = self.get(f"http://localhost:{self.port}/metrics")
self.verbose_error(res)
@@ -1194,6 +1280,8 @@ class NeonCli(AbstractNeonCli):
str(tenant_id),
"--timeline-id",
str(timeline_id),
"--pg-version",
self.env.pg_version,
]
)
else:
@@ -1205,6 +1293,8 @@ class NeonCli(AbstractNeonCli):
str(tenant_id),
"--timeline-id",
str(timeline_id),
"--pg-version",
self.env.pg_version,
]
+ sum(list(map(lambda kv: (["-c", kv[0] + ":" + kv[1]]), conf.items())), [])
)
@@ -1230,7 +1320,9 @@ class NeonCli(AbstractNeonCli):
return res
def create_timeline(
self, new_branch_name: str, tenant_id: Optional[TenantId] = None
self,
new_branch_name: str,
tenant_id: Optional[TenantId] = None,
) -> TimelineId:
cmd = [
"timeline",
@@ -1239,6 +1331,8 @@ class NeonCli(AbstractNeonCli):
new_branch_name,
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
"--pg-version",
self.env.pg_version,
]
res = self.raw_cli(cmd)
@@ -1252,7 +1346,11 @@ class NeonCli(AbstractNeonCli):
return TimelineId(str(created_timeline_id))
def create_root_branch(self, branch_name: str, tenant_id: Optional[TenantId] = None):
def create_root_branch(
self,
branch_name: str,
tenant_id: Optional[TenantId] = None,
):
cmd = [
"timeline",
"create",
@@ -1260,6 +1358,8 @@ class NeonCli(AbstractNeonCli):
branch_name,
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
"--pg-version",
self.env.pg_version,
]
res = self.raw_cli(cmd)
@@ -1329,7 +1429,9 @@ class NeonCli(AbstractNeonCli):
return timelines_cli
def init(
self, config_toml: str, initial_timeline_id: Optional[TimelineId] = None
self,
config_toml: str,
initial_timeline_id: Optional[TimelineId] = None,
) -> "subprocess.CompletedProcess[str]":
with tempfile.NamedTemporaryFile(mode="w+") as tmp:
tmp.write(config_toml)
@@ -1338,6 +1440,9 @@ class NeonCli(AbstractNeonCli):
cmd = ["init", f"--config={tmp.name}"]
if initial_timeline_id:
cmd.extend(["--timeline-id", str(initial_timeline_id)])
cmd.extend(["--pg-version", self.env.pg_version])
append_pageserver_param_overrides(
params_to_update=cmd,
remote_storage=self.env.remote_storage,
@@ -1364,7 +1469,10 @@ class NeonCli(AbstractNeonCli):
log.info(f"pageserver_enabled_features success: {res.stdout}")
return json.loads(res.stdout)
def pageserver_start(self, overrides=()) -> "subprocess.CompletedProcess[str]":
def pageserver_start(
self,
overrides=(),
) -> "subprocess.CompletedProcess[str]":
start_args = ["pageserver", "start", *overrides]
append_pageserver_param_overrides(
params_to_update=start_args,
@@ -1419,6 +1527,8 @@ class NeonCli(AbstractNeonCli):
str(tenant_id or self.env.initial_tenant),
"--branch-name",
branch_name,
"--pg-version",
self.env.pg_version,
]
if lsn is not None:
args.extend(["--lsn", str(lsn)])
@@ -1443,6 +1553,8 @@ class NeonCli(AbstractNeonCli):
"start",
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
"--pg-version",
self.env.pg_version,
]
if lsn is not None:
args.append(f"--lsn={lsn}")
@@ -1572,11 +1684,13 @@ def append_pageserver_param_overrides(
class PgBin:
"""A helper class for executing postgres binaries"""
def __init__(self, log_dir: Path):
def __init__(self, log_dir: Path, pg_version: str):
self.log_dir = log_dir
self.pg_bin_path = os.path.join(str(pg_distrib_dir), "bin")
self.pg_version = pg_version
self.pg_bin_path = os.path.join(str(pg_distrib_dir), "v{}".format(pg_version), "bin")
self.pg_lib_dir = os.path.join(str(pg_distrib_dir), "v{}".format(pg_version), "lib")
self.env = os.environ.copy()
self.env["LD_LIBRARY_PATH"] = os.path.join(str(pg_distrib_dir), "lib")
self.env["LD_LIBRARY_PATH"] = self.pg_lib_dir
def _fixpath(self, command: List[str]):
if "/" not in command[0]:
@@ -1631,8 +1745,8 @@ class PgBin:
@pytest.fixture(scope="function")
def pg_bin(test_output_dir: Path) -> PgBin:
return PgBin(test_output_dir)
def pg_bin(test_output_dir: Path, pg_version: str) -> PgBin:
return PgBin(test_output_dir, pg_version)
@dataclass
@@ -1710,12 +1824,19 @@ class VanillaPostgres(PgProtocol):
self.stop()
@pytest.fixture(scope="session")
def pg_version() -> str:
return default_pg_version
@pytest.fixture(scope="function")
def vanilla_pg(
test_output_dir: Path, port_distributor: PortDistributor
test_output_dir: Path,
port_distributor: PortDistributor,
pg_version: str,
) -> Iterator[VanillaPostgres]:
pgdatadir = test_output_dir / "pgdata-vanilla"
pg_bin = PgBin(test_output_dir)
pg_bin = PgBin(test_output_dir, pg_version)
port = port_distributor.get_port()
with VanillaPostgres(pgdatadir, pg_bin, port) as vanilla_pg:
yield vanilla_pg
@@ -1751,8 +1872,8 @@ class RemotePostgres(PgProtocol):
@pytest.fixture(scope="function")
def remote_pg(test_output_dir: Path) -> Iterator[RemotePostgres]:
pg_bin = PgBin(test_output_dir)
def remote_pg(test_output_dir: Path, pg_version: str) -> Iterator[RemotePostgres]:
pg_bin = PgBin(test_output_dir, pg_version)
connstr = os.getenv("BENCHMARK_CONNSTR")
if connstr is None:
@@ -2481,7 +2602,11 @@ def list_files_to_compare(pgdata_dir: Path):
# pg is the existing and running compute node, that we want to compare with a basebackup
def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, pg: Postgres):
def check_restored_datadir_content(
test_output_dir: Path,
env: NeonEnv,
pg: Postgres,
):
# Get the timeline ID. We need it for the 'basebackup' command
timeline = TimelineId(pg.safe_psql("SHOW neon.timeline_id")[0][0])
@@ -2492,7 +2617,7 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, pg: Post
restored_dir_path = env.repo_dir / f"{pg.node_name}_restored_datadir"
restored_dir_path.mkdir(exist_ok=True)
pg_bin = PgBin(test_output_dir)
pg_bin = PgBin(test_output_dir, env.pg_version)
psql_path = os.path.join(pg_bin.pg_bin_path, "psql")
cmd = rf"""
@@ -2505,7 +2630,7 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, pg: Post
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.
psql_env = {"LD_LIBRARY_PATH": os.path.join(str(pg_distrib_dir), "lib")}
psql_env = {"LD_LIBRARY_PATH": pg_bin.pg_lib_dir}
result = subprocess.run(cmd, env=psql_env, capture_output=True, text=True, shell=True)
# Print captured stdout/stderr if basebackup cmd failed.

View File

@@ -0,0 +1,39 @@
import time
from fixtures.neon_fixtures import NeonEnvBuilder
#
# Benchmark searching the layer map, when there are a lot of small layer files.
#
def test_layer_map(neon_env_builder: NeonEnvBuilder, zenbenchmark):
env = neon_env_builder.init_start()
n_iters = 10
n_records = 100000
# We want to have a lot of lot of layer files to exercise the layer map. Make
# gc_horizon and checkpoint_distance very small, so that we get a lot of small layer files.
tenant, _ = env.neon_cli.create_tenant(
conf={
"gc_period": "100 m",
"gc_horizon": "1048576",
"checkpoint_distance": "8192",
"compaction_period": "1 s",
"compaction_threshold": "1",
"compaction_target_size": "8192",
}
)
env.neon_cli.create_timeline("test_layer_map", tenant_id=tenant)
pg = env.postgres.create_start("test_layer_map", tenant_id=tenant)
cur = pg.connect().cursor()
cur.execute("create table t(x integer)")
for i in range(n_iters):
cur.execute(f"insert into t values (generate_series(1,{n_records}))")
time.sleep(1)
cur.execute("vacuum t")
with zenbenchmark.record_duration("test_query"):
cur.execute("SELECT count(*) from t")
assert cur.fetchone() == (n_iters * n_records,)

View File

@@ -4,7 +4,7 @@ import os
import timeit
from datetime import datetime
from pathlib import Path
from typing import List
from typing import Dict, List
import pytest
from fixtures.benchmark_fixture import MetricReport, PgBenchInitResult, PgBenchRunResult
@@ -24,14 +24,18 @@ def utc_now_timestamp() -> int:
return calendar.timegm(datetime.utcnow().utctimetuple())
def init_pgbench(env: PgCompare, cmdline):
def init_pgbench(env: PgCompare, cmdline, password: None):
environ: Dict[str, str] = {}
if password is not None:
environ["PGPASSWORD"] = password
# calculate timestamps and durations separately
# timestamp is intended to be used for linking to grafana and logs
# duration is actually a metric and uses float instead of int for timestamp
start_timestamp = utc_now_timestamp()
t0 = timeit.default_timer()
with env.record_pageserver_writes("init.pageserver_writes"):
out = env.pg_bin.run_capture(cmdline)
out = env.pg_bin.run_capture(cmdline, env=environ)
env.flush()
duration = timeit.default_timer() - t0
@@ -48,13 +52,15 @@ def init_pgbench(env: PgCompare, cmdline):
env.zenbenchmark.record_pg_bench_init_result("init", res)
def run_pgbench(env: PgCompare, prefix: str, cmdline):
def run_pgbench(env: PgCompare, prefix: str, cmdline, password: None):
environ: Dict[str, str] = {}
if password is not None:
environ["PGPASSWORD"] = password
with env.record_pageserver_writes(f"{prefix}.pageserver_writes"):
run_start_timestamp = utc_now_timestamp()
t0 = timeit.default_timer()
out = env.pg_bin.run_capture(
cmdline,
)
out = env.pg_bin.run_capture(cmdline, env=environ)
run_duration = timeit.default_timer() - t0
run_end_timestamp = utc_now_timestamp()
env.flush()
@@ -82,11 +88,14 @@ def run_pgbench(env: PgCompare, prefix: str, cmdline):
def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: PgBenchLoadType):
env.zenbenchmark.record("scale", scale, "", MetricReport.TEST_PARAM)
password = env.pg.default_options.get("password", None)
options = "-cstatement_timeout=1h " + env.pg.default_options.get("options", "")
# drop password from the connection string by passing password=None and set password separately
connstr = env.pg.connstr(password=None, options=options)
if workload_type == PgBenchLoadType.INIT:
# Run initialize
init_pgbench(
env, ["pgbench", f"-s{scale}", "-i", env.pg.connstr(options="-cstatement_timeout=1h")]
)
init_pgbench(env, ["pgbench", f"-s{scale}", "-i", connstr], password=password)
if workload_type == PgBenchLoadType.SIMPLE_UPDATE:
# Run simple-update workload
@@ -100,8 +109,9 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P
f"-T{duration}",
"-P2",
"--progress-timestamp",
env.pg.connstr(),
connstr,
],
password=password,
)
if workload_type == PgBenchLoadType.SELECT_ONLY:
@@ -116,8 +126,9 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P
f"-T{duration}",
"-P2",
"--progress-timestamp",
env.pg.connstr(),
connstr,
],
password=password,
)
env.report_size()

View File

@@ -9,6 +9,7 @@ from fixtures.utils import query_scalar
#
def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
# Override defaults, 1M gc_horizon and 4M checkpoint_distance.
# Extend compaction_period and gc_period to disable background compaction and gc.
@@ -23,7 +24,7 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
}
)
env.pageserver.safe_psql("failpoints flush-frozen-before-sync=sleep(10000)")
pageserver_http.configure_failpoints(("flush-frozen-before-sync", "sleep(10000)"))
pg_branch0 = env.postgres.create_start("main", tenant_id=tenant)
branch0_cur = pg_branch0.connect().cursor()
@@ -92,9 +93,9 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
log.info(f"LSN after 300k rows: {lsn_300}")
# Run compaction on branch1.
compact = f"compact {tenant} {branch1_timeline} {lsn_200}"
compact = f"compact {tenant} {branch1_timeline}"
log.info(compact)
env.pageserver.safe_psql(compact)
pageserver_http.timeline_compact(tenant, branch1_timeline)
assert query_scalar(branch0_cur, "SELECT count(*) FROM foo") == 100000

View File

@@ -9,9 +9,10 @@ from fixtures.neon_fixtures import NeonEnv
def test_basebackup_error(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_basebackup_error", "empty")
pageserver_http = env.pageserver.http_client()
# Introduce failpoint
env.pageserver.safe_psql("failpoints basebackup-before-control-file=return")
pageserver_http.configure_failpoints(("basebackup-before-control-file", "return"))
with pytest.raises(Exception, match="basebackup-before-control-file"):
env.postgres.create_start("test_basebackup_error")

View File

@@ -47,6 +47,7 @@ from fixtures.utils import query_scalar
# could not find data for key ... at LSN ..., for request at LSN ...
def test_branch_and_gc(neon_simple_env: NeonEnv):
env = neon_simple_env
pageserver_http_client = env.pageserver.http_client()
tenant, _ = env.neon_cli.create_tenant(
conf={
@@ -84,7 +85,7 @@ def test_branch_and_gc(neon_simple_env: NeonEnv):
# Set the GC horizon so that lsn1 is inside the horizon, which means
# we can create a new branch starting from lsn1.
env.pageserver.safe_psql(f"do_gc {tenant} {timeline_main} {lsn2 - lsn1 + 1024}")
pageserver_http_client.timeline_gc(tenant, timeline_main, lsn2 - lsn1 + 1024)
env.neon_cli.create_branch(
"test_branch", "test_main", tenant_id=tenant, ancestor_start_lsn=lsn1
@@ -113,6 +114,8 @@ def test_branch_and_gc(neon_simple_env: NeonEnv):
# For more details, see discussion in https://github.com/neondatabase/neon/pull/2101#issuecomment-1185273447.
def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
env = neon_simple_env
pageserver_http_client = env.pageserver.http_client()
# Disable background GC but set the `pitr_interval` to be small, so GC can delete something
tenant, _ = env.neon_cli.create_tenant(
conf={
@@ -147,10 +150,10 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
# Use `failpoint=sleep` and `threading` to make the GC iteration triggers *before* the
# branch creation task but the individual timeline GC iteration happens *after*
# the branch creation task.
env.pageserver.safe_psql("failpoints before-timeline-gc=sleep(2000)")
pageserver_http_client.configure_failpoints(("before-timeline-gc", "sleep(2000)"))
def do_gc():
env.pageserver.safe_psql(f"do_gc {tenant} {b0} 0")
pageserver_http_client.timeline_gc(tenant, b0, 0)
thread = threading.Thread(target=do_gc, daemon=True)
thread.start()
@@ -161,7 +164,7 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
time.sleep(1.0)
# The starting LSN is invalid as the corresponding record is scheduled to be removed by in-queue GC.
with pytest.raises(Exception, match="invalid branch start lsn"):
with pytest.raises(Exception, match="invalid branch start lsn: .*"):
env.neon_cli.create_branch("b1", "b0", tenant_id=tenant, ancestor_start_lsn=lsn)
thread.join()

View File

@@ -1,4 +1,3 @@
import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
@@ -96,7 +95,7 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
assert pg.safe_psql("SELECT 1")[0][0] == 1
# branch at pre-initdb lsn
with pytest.raises(Exception, match="invalid branch start lsn"):
with pytest.raises(Exception, match="invalid branch start lsn: .*"):
env.neon_cli.create_branch("test_branch_preinitdb", ancestor_start_lsn=Lsn("0/42"))
# branch at pre-ancestor lsn
@@ -106,13 +105,11 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
)
# check that we cannot create branch based on garbage collected data
with env.pageserver.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur:
# call gc to advace latest_gc_cutoff_lsn
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row)
with env.pageserver.http_client() as pageserver_http:
gc_result = pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
print_gc_result(gc_result)
with pytest.raises(Exception, match="invalid branch start lsn"):
with pytest.raises(Exception, match="invalid branch start lsn: .*"):
# this gced_lsn is pretty random, so if gc is disabled this woudln't fail
env.neon_cli.create_branch(
"test_branch_create_fail", "test_branch_behind", ancestor_start_lsn=gced_lsn

View File

@@ -113,13 +113,14 @@ def test_create_multiple_timelines_parallel(neon_simple_env: NeonEnv):
def test_fix_broken_timelines_on_startup(neon_simple_env: NeonEnv):
env = neon_simple_env
pageserver_http = env.pageserver.http_client()
tenant_id, _ = env.neon_cli.create_tenant()
old_tenant_timelines = env.neon_cli.list_timelines(tenant_id)
# Introduce failpoint when creating a new timeline
env.pageserver.safe_psql("failpoints before-checkpoint-new-timeline=return")
pageserver_http.configure_failpoints(("before-checkpoint-new-timeline", "return"))
with pytest.raises(Exception, match="before-checkpoint-new-timeline"):
_ = env.neon_cli.create_timeline("test_fix_broken_timelines", tenant_id)

View File

@@ -1,4 +1,5 @@
import asyncio
import concurrent.futures
import random
from fixtures.log_helper import log
@@ -30,10 +31,15 @@ async def update_table(pg: Postgres):
# Perform aggressive GC with 0 horizon
async def gc(env: NeonEnv, timeline: TimelineId):
psconn = await env.pageserver.connect_async()
pageserver_http = env.pageserver.http_client()
while updates_performed < updates_to_perform:
await psconn.execute(f"do_gc {env.initial_tenant} {timeline} 0")
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
while updates_performed < updates_to_perform:
await loop.run_in_executor(
pool, lambda: pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
)
# At the same time, run UPDATEs and GC

View File

@@ -96,6 +96,8 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
end_lsn,
"--wal-tarfile",
wal,
"--pg-version",
env.pg_version,
]
)
@@ -248,6 +250,8 @@ def _import(
str(lsn),
"--base-tarfile",
os.path.join(tar_output_file),
"--pg-version",
env.pg_version,
]
)
@@ -270,8 +274,7 @@ def _import(
assert os.path.getsize(tar_output_file) == os.path.getsize(new_tar_output_file)
# Check that gc works
psconn = env.pageserver.connect()
pscur = psconn.cursor()
pscur.execute(f"do_gc {tenant} {timeline} 0")
pageserver_http = env.pageserver.http_client()
pageserver_http.timeline_gc(tenant, timeline, 0)
return tar_output_file

View File

@@ -1,4 +1,3 @@
import psycopg2.extras
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.types import TimelineId
@@ -29,8 +28,7 @@ def test_old_request_lsn(neon_env_builder: NeonEnvBuilder):
# Get the timeline ID of our branch. We need it for the 'do_gc' command
timeline = TimelineId(query_scalar(cur, "SHOW neon.timeline_id"))
psconn = env.pageserver.connect()
pscur = psconn.cursor(cursor_factory=psycopg2.extras.DictCursor)
pageserver_http = env.pageserver.http_client()
# Create table, and insert some rows. Make it big enough that it doesn't fit in
# shared_buffers.
@@ -61,9 +59,8 @@ def test_old_request_lsn(neon_env_builder: NeonEnvBuilder):
# Make a lot of updates on a single row, generating a lot of WAL. Trigger
# garbage collections so that the page server will remove old page versions.
for i in range(10):
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
gcrow = pscur.fetchone()
print_gc_result(gcrow)
gc_result = pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
print_gc_result(gc_result)
for j in range(100):
cur.execute("UPDATE foo SET val = val + 1 WHERE id = 1;")

View File

@@ -26,9 +26,9 @@ def test_pg_regress(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, cap
(runpath / "testtablespace").mkdir(parents=True)
# Compute all the file locations that pg_regress will need.
build_path = os.path.join(pg_distrib_dir, "../build/v14/src/test/regress")
src_path = os.path.join(base_dir, "vendor/postgres-v14/src/test/regress")
bindir = os.path.join(pg_distrib_dir, "bin")
build_path = os.path.join(pg_distrib_dir, "build/v{}/src/test/regress").format(env.pg_version)
src_path = os.path.join(base_dir, "vendor/postgres-v{}/src/test/regress").format(env.pg_version)
bindir = os.path.join(pg_distrib_dir, "v{}".format(env.pg_version), "bin")
schedule = os.path.join(src_path, "parallel_schedule")
pg_regress = os.path.join(build_path, "pg_regress")
@@ -80,9 +80,11 @@ def test_isolation(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, caps
(runpath / "testtablespace").mkdir(parents=True)
# Compute all the file locations that pg_isolation_regress will need.
build_path = os.path.join(pg_distrib_dir, "../build/v14/src/test/isolation")
src_path = os.path.join(base_dir, "vendor/postgres-v14/src/test/isolation")
bindir = os.path.join(pg_distrib_dir, "bin")
build_path = os.path.join(pg_distrib_dir, "build/v{}/src/test/isolation".format(env.pg_version))
src_path = os.path.join(
base_dir, "vendor/postgres-v{}/src/test/isolation".format(env.pg_version)
)
bindir = os.path.join(pg_distrib_dir, "v{}".format(env.pg_version), "bin")
schedule = os.path.join(src_path, "isolation_schedule")
pg_isolation_regress = os.path.join(build_path, "pg_isolation_regress")
@@ -124,9 +126,9 @@ def test_sql_regress(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, ca
# Compute all the file locations that pg_regress will need.
# This test runs neon specific tests
build_path = os.path.join(pg_distrib_dir, "../build/v14/src/test/regress")
build_path = os.path.join(pg_distrib_dir, "build/v{}/src/test/regress").format(env.pg_version)
src_path = os.path.join(base_dir, "test_runner/sql_regress")
bindir = os.path.join(pg_distrib_dir, "bin")
bindir = os.path.join(pg_distrib_dir, "v{}".format(env.pg_version), "bin")
schedule = os.path.join(src_path, "parallel_schedule")
pg_regress = os.path.join(build_path, "pg_regress")

View File

@@ -1,6 +1,3 @@
from contextlib import closing
import psycopg2.extras
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.types import TimelineId
@@ -54,13 +51,11 @@ def test_pitr_gc(neon_env_builder: NeonEnvBuilder):
log.info(f"LSN after 10000 rows: {debug_lsn} xid {debug_xid}")
# run GC
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur:
pscur.execute(f"compact {env.initial_tenant} {timeline}")
# perform aggressive GC. Data still should be kept because of the PITR setting.
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row)
with env.pageserver.http_client() as pageserver_http:
pageserver_http.timeline_compact(env.initial_tenant, timeline)
# perform aggressive GC. Data still should be kept because of the PITR setting.
gc_result = pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
print_gc_result(gc_result)
# Branch at the point where only 100 rows were inserted
# It must have been preserved by PITR setting

View File

@@ -106,6 +106,7 @@ def test_readonly_node(neon_simple_env: NeonEnv):
# Similar test, but with more data, and we force checkpoints
def test_timetravel(neon_simple_env: NeonEnv):
env = neon_simple_env
pageserver_http_client = env.pageserver.http_client()
env.neon_cli.create_branch("test_timetravel", "empty")
pg = env.postgres.create_start("test_timetravel")
@@ -136,7 +137,7 @@ def test_timetravel(neon_simple_env: NeonEnv):
wait_for_last_record_lsn(client, tenant_id, timeline_id, current_lsn)
# run checkpoint manually to force a new layer file
env.pageserver.safe_psql(f"checkpoint {tenant_id} {timeline_id}")
pageserver_http_client.timeline_checkpoint(tenant_id, timeline_id)
##### Restart pageserver
env.postgres.stop_all()

View File

@@ -1,7 +1,6 @@
import time
from contextlib import closing
import psycopg2.extras
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
@@ -19,8 +18,8 @@ def test_pageserver_recovery(neon_env_builder: NeonEnvBuilder):
f = env.neon_cli.pageserver_enabled_features()
assert (
"failpoints" in f["features"]
), "Build pageserver with --features=failpoints option to run this test"
"testing" in f["features"]
), "Build pageserver with --features=testing option to run this test"
neon_env_builder.start()
# Create a branch for us
@@ -31,26 +30,28 @@ def test_pageserver_recovery(neon_env_builder: NeonEnvBuilder):
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur:
# Create and initialize test table
cur.execute("CREATE TABLE foo(x bigint)")
cur.execute("INSERT INTO foo VALUES (generate_series(1,100000))")
with env.pageserver.http_client() as pageserver_http:
# Create and initialize test table
cur.execute("CREATE TABLE foo(x bigint)")
cur.execute("INSERT INTO foo VALUES (generate_series(1,100000))")
# Sleep for some time to let checkpoint create image layers
time.sleep(2)
# Sleep for some time to let checkpoint create image layers
time.sleep(2)
# Configure failpoints
pscur.execute(
"failpoints flush-frozen-before-sync=sleep(2000);checkpoint-after-sync=exit"
)
# Configure failpoints
pageserver_http.configure_failpoints(
[
("flush-frozen-before-sync", "sleep(2000)"),
("checkpoint-after-sync", "exit"),
]
)
# Do some updates until pageserver is crashed
try:
while True:
cur.execute("update foo set x=x+1")
except Exception as err:
log.info(f"Expected server crash {err}")
# Do some updates until pageserver is crashed
try:
while True:
cur.execute("update foo set x=x+1")
except Exception as err:
log.info(f"Expected server crash {err}")
log.info("Wait before server restart")
env.pageserver.stop()

View File

@@ -57,6 +57,7 @@ def test_remote_storage_backup_and_restore(
##### First start, insert secret data and upload it to the remote storage
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
pg = env.postgres.create_start("main")
client = env.pageserver.http_client()
@@ -80,7 +81,7 @@ def test_remote_storage_backup_and_restore(
wait_for_last_record_lsn(client, tenant_id, timeline_id, current_lsn)
# run checkpoint manually to be sure that data landed in remote storage
env.pageserver.safe_psql(f"checkpoint {tenant_id} {timeline_id}")
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
log.info(f"waiting for checkpoint {checkpoint_number} upload")
# wait until pageserver successfully uploaded a checkpoint to remote storage
@@ -99,7 +100,7 @@ def test_remote_storage_backup_and_restore(
env.pageserver.start()
# Introduce failpoint in download
env.pageserver.safe_psql("failpoints remote-storage-download-pre-rename=return")
pageserver_http.configure_failpoints(("remote-storage-download-pre-rename", "return"))
client.tenant_attach(tenant_id)

View File

@@ -1,16 +1,21 @@
from threading import Thread
import psycopg2
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, NeonPageserverApiException
from fixtures.neon_fixtures import (
NeonEnvBuilder,
NeonPageserverApiException,
NeonPageserverHttpClient,
)
from fixtures.types import TenantId, TimelineId
def do_gc_target(env: NeonEnv, tenant_id: TenantId, timeline_id: TimelineId):
def do_gc_target(
pageserver_http: NeonPageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId
):
"""Hack to unblock main, see https://github.com/neondatabase/neon/issues/2211"""
try:
env.pageserver.safe_psql(f"do_gc {tenant_id} {timeline_id} 0")
pageserver_http.timeline_gc(tenant_id, timeline_id, 0)
except Exception as e:
log.error("do_gc failed: %s", e)
@@ -44,13 +49,13 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
# gc should not try to even start
with pytest.raises(
expected_exception=psycopg2.DatabaseError, match="gc target timeline does not exist"
expected_exception=NeonPageserverApiException, match="gc target timeline does not exist"
):
bogus_timeline_id = TimelineId.generate()
env.pageserver.safe_psql(f"do_gc {tenant_id} {bogus_timeline_id} 0")
pageserver_http.timeline_gc(tenant_id, bogus_timeline_id, 0)
# try to concurrently run gc and detach
gc_thread = Thread(target=lambda: do_gc_target(env, tenant_id, timeline_id))
gc_thread = Thread(target=lambda: do_gc_target(pageserver_http, tenant_id, timeline_id))
gc_thread.start()
last_error = None
@@ -73,6 +78,6 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
assert not (env.repo_dir / "tenants" / str(tenant_id)).exists()
with pytest.raises(
expected_exception=psycopg2.DatabaseError, match=f"Tenant {tenant_id} not found"
expected_exception=NeonPageserverApiException, match=f"Tenant {tenant_id} not found"
):
env.pageserver.safe_psql(f"do_gc {tenant_id} {timeline_id} 0")
pageserver_http.timeline_gc(tenant_id, timeline_id, 0)

View File

@@ -147,14 +147,13 @@ def populate_branch(
def ensure_checkpoint(
pageserver_cur,
pageserver_http: NeonPageserverHttpClient,
tenant_id: TenantId,
timeline_id: TimelineId,
current_lsn: Lsn,
):
# run checkpoint manually to be sure that data landed in remote storage
pageserver_cur.execute(f"checkpoint {tenant_id} {timeline_id}")
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
# wait until pageserver successfully uploaded a checkpoint to remote storage
wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn)
@@ -324,22 +323,19 @@ def test_tenant_relocation(
# this requirement introduces a problem
# if user creates a branch during migration
# it wont appear on the new pageserver
with pg_cur(env.pageserver) as cur:
ensure_checkpoint(
cur,
pageserver_http=pageserver_http,
tenant_id=tenant_id,
timeline_id=timeline_id_main,
current_lsn=current_lsn_main,
)
ensure_checkpoint(
pageserver_http=pageserver_http,
tenant_id=tenant_id,
timeline_id=timeline_id_main,
current_lsn=current_lsn_main,
)
ensure_checkpoint(
cur,
pageserver_http=pageserver_http,
tenant_id=tenant_id,
timeline_id=timeline_id_second,
current_lsn=current_lsn_second,
)
ensure_checkpoint(
pageserver_http=pageserver_http,
tenant_id=tenant_id,
timeline_id=timeline_id_second,
current_lsn=current_lsn_second,
)
log.info("inititalizing new pageserver")
# bootstrap second pageserver

View File

@@ -1,4 +1,5 @@
import os
import shutil
from contextlib import closing
from datetime import datetime
from pathlib import Path
@@ -7,8 +8,13 @@ from typing import List
import pytest
from fixtures.log_helper import log
from fixtures.metrics import PAGESERVER_PER_TENANT_METRICS, parse_metrics
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.types import Lsn, TenantId
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
RemoteStorageKind,
available_remote_storages,
)
from fixtures.types import Lsn, TenantId, TimelineId
from prometheus_client.samples import Sample
@@ -19,7 +25,8 @@ def test_tenant_creation_fails(neon_simple_env: NeonEnv):
)
initial_tenant_dirs = set([d for d in tenants_dir.iterdir()])
neon_simple_env.pageserver.safe_psql("failpoints tenant-creation-before-tmp-rename=return")
pageserver_http = neon_simple_env.pageserver.http_client()
pageserver_http.configure_failpoints(("tenant-creation-before-tmp-rename", "return"))
with pytest.raises(Exception, match="tenant-creation-before-tmp-rename"):
_ = neon_simple_env.neon_cli.create_tenant()
@@ -200,3 +207,63 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
post_detach_samples = set([x.name for x in get_ps_metric_samples_for_tenant(tenant)])
assert post_detach_samples == set()
# Check that empty tenants work with or without the remote storage
@pytest.mark.parametrize(
"remote_storage_kind", available_remote_storages() + [RemoteStorageKind.NOOP]
)
def test_pageserver_with_empty_tenants(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_pageserver_with_empty_tenants",
)
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
tenant_without_timelines_dir = env.initial_tenant
log.info(
f"Tenant {tenant_without_timelines_dir} becomes broken: it abnormally looses tenants/ directory and is expected to be completely ignored when pageserver restarts"
)
shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_without_timelines_dir) / "timelines")
tenant_with_empty_timelines_dir = client.tenant_create()
log.info(
f"Tenant {tenant_with_empty_timelines_dir} gets all of its timelines deleted: still should be functional"
)
temp_timelines = client.timeline_list(tenant_with_empty_timelines_dir)
for temp_timeline in temp_timelines:
client.timeline_delete(
tenant_with_empty_timelines_dir, TimelineId(temp_timeline["timeline_id"])
)
files_in_timelines_dir = sum(
1
for _p in Path.iterdir(
Path(env.repo_dir) / "tenants" / str(tenant_with_empty_timelines_dir) / "timelines"
)
)
assert (
files_in_timelines_dir == 0
), f"Tenant {tenant_with_empty_timelines_dir} should have an empty timelines/ directory"
# Trigger timeline reinitialization after pageserver restart
env.postgres.stop_all()
env.pageserver.stop()
env.pageserver.start()
client = env.pageserver.http_client()
tenants = client.tenant_list()
assert (
len(tenants) == 1
), "Pageserver should attach only tenants with empty timelines/ dir on restart"
loaded_tenant = tenants[0]
assert loaded_tenant["id"] == str(
tenant_with_empty_timelines_dir
), f"Tenant {tenant_with_empty_timelines_dir} should be loaded as the only one with tenants/ directory"
assert loaded_tenant["state"] == {
"Active": {"background_jobs_running": False}
}, "Empty tenant should be loaded and ready for timeline creation"

View File

@@ -7,19 +7,25 @@
#
import asyncio
import os
from pathlib import Path
from typing import List, Tuple
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
NeonPageserverHttpClient,
Postgres,
RemoteStorageKind,
available_remote_storages,
wait_for_last_record_lsn,
wait_for_upload,
wait_until,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar
async def tenant_workload(env: NeonEnv, pg: Postgres):
@@ -91,5 +97,95 @@ def test_tenants_many(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Rem
wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id, current_lsn)
# run final checkpoint manually to flush all the data to remote storage
env.pageserver.safe_psql(f"checkpoint {tenant_id} {timeline_id}")
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn)
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
def test_tenants_attached_after_download(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="remote_storage_kind",
)
data_id = 1
data_secret = "very secret secret"
##### First start, insert secret data and upload it to the remote storage
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
pg = env.postgres.create_start("main")
client = env.pageserver.http_client()
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
for checkpoint_number in range(1, 3):
with pg.cursor() as cur:
cur.execute(
f"""
CREATE TABLE t{checkpoint_number}(id int primary key, secret text);
INSERT INTO t{checkpoint_number} VALUES ({data_id}, '{data_secret}|{checkpoint_number}');
"""
)
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
# wait until pageserver receives that data
wait_for_last_record_lsn(client, tenant_id, timeline_id, current_lsn)
# run checkpoint manually to be sure that data landed in remote storage
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
log.info(f"waiting for checkpoint {checkpoint_number} upload")
# wait until pageserver successfully uploaded a checkpoint to remote storage
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
log.info(f"upload of checkpoint {checkpoint_number} is done")
##### Stop the pageserver, erase its layer file to force it being downloaded from S3
env.postgres.stop_all()
env.pageserver.stop()
timeline_dir = Path(env.repo_dir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
local_layer_deleted = False
for path in Path.iterdir(timeline_dir):
if path.name.startswith("00000"):
# Looks like a layer file. Remove it
os.remove(path)
local_layer_deleted = True
break
assert local_layer_deleted, f"Found no local layer files to delete in directory {timeline_dir}"
##### Start the pageserver, forcing it to download the layer file and load the timeline into memory
env.pageserver.start()
client = env.pageserver.http_client()
wait_until(
number_of_iterations=5,
interval=1,
func=lambda: expect_tenant_to_download_timeline(client, tenant_id),
)
restored_timelines = client.timeline_list(tenant_id)
assert (
len(restored_timelines) == 1
), f"Tenant {tenant_id} should have its timeline reattached after its layer is downloaded from the remote storage"
retored_timeline = restored_timelines[0]
assert retored_timeline["timeline_id"] == str(
timeline_id
), f"Tenant {tenant_id} should have its old timeline {timeline_id} restored from the remote storage"
def expect_tenant_to_download_timeline(
client: NeonPageserverHttpClient,
tenant_id: TenantId,
):
for tenant in client.tenant_list():
if tenant["id"] == str(tenant_id):
assert not tenant.get(
"has_in_progress_downloads", True
), f"Tenant {tenant_id} should have no downloads in progress"
return
assert False, f"Tenant {tenant_id} is missing on pageserver"

View File

@@ -3,6 +3,7 @@ import random
import re
import time
from contextlib import closing
from pathlib import Path
import psycopg2.errors
import psycopg2.extras
@@ -11,7 +12,10 @@ from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
NeonPageserverHttpClient,
PgBin,
PortDistributor,
Postgres,
VanillaPostgres,
assert_timeline_local,
wait_for_last_flush_lsn,
)
@@ -238,6 +242,7 @@ def test_timeline_physical_size_init(neon_simple_env: NeonEnv):
def test_timeline_physical_size_post_checkpoint(neon_simple_env: NeonEnv):
env = neon_simple_env
pageserver_http = env.pageserver.http_client()
new_timeline_id = env.neon_cli.create_branch("test_timeline_physical_size_post_checkpoint")
pg = env.postgres.create_start("test_timeline_physical_size_post_checkpoint")
@@ -251,7 +256,7 @@ def test_timeline_physical_size_post_checkpoint(neon_simple_env: NeonEnv):
)
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant} {new_timeline_id}")
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
assert_physical_size(env, env.initial_tenant, new_timeline_id)
@@ -264,6 +269,7 @@ def test_timeline_physical_size_post_compaction(neon_env_builder: NeonEnvBuilder
)
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
new_timeline_id = env.neon_cli.create_branch("test_timeline_physical_size_post_compaction")
pg = env.postgres.create_start("test_timeline_physical_size_post_compaction")
@@ -278,8 +284,8 @@ def test_timeline_physical_size_post_compaction(neon_env_builder: NeonEnvBuilder
)
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant} {new_timeline_id}")
env.pageserver.safe_psql(f"compact {env.initial_tenant} {new_timeline_id}")
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
pageserver_http.timeline_compact(env.initial_tenant, new_timeline_id)
assert_physical_size(env, env.initial_tenant, new_timeline_id)
@@ -290,6 +296,7 @@ def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder):
neon_env_builder.pageserver_config_override = "tenant_config={checkpoint_distance=100000, compaction_period='10m', gc_period='10m', pitr_interval='1s'}"
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
new_timeline_id = env.neon_cli.create_branch("test_timeline_physical_size_post_gc")
pg = env.postgres.create_start("test_timeline_physical_size_post_gc")
@@ -304,7 +311,7 @@ def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder):
)
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant} {new_timeline_id}")
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
pg.safe_psql(
"""
@@ -315,17 +322,23 @@ def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder):
)
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant} {new_timeline_id}")
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
env.pageserver.safe_psql(f"do_gc {env.initial_tenant} {new_timeline_id} 0")
pageserver_http.timeline_gc(env.initial_tenant, new_timeline_id, gc_horizon=None)
assert_physical_size(env, env.initial_tenant, new_timeline_id)
# The timeline logical and physical sizes are also exposed as prometheus metrics.
# Test the metrics.
def test_timeline_size_metrics(neon_simple_env: NeonEnv):
def test_timeline_size_metrics(
neon_simple_env: NeonEnv,
test_output_dir: Path,
port_distributor: PortDistributor,
pg_version: str,
):
env = neon_simple_env
pageserver_http = env.pageserver.http_client()
new_timeline_id = env.neon_cli.create_branch("test_timeline_size_metrics")
pg = env.postgres.create_start("test_timeline_size_metrics")
@@ -340,7 +353,7 @@ def test_timeline_size_metrics(neon_simple_env: NeonEnv):
)
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant} {new_timeline_id}")
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
# get the metrics and parse the metric for the current timeline's physical size
metrics = env.pageserver.http_client().get_metrics()
@@ -365,11 +378,28 @@ def test_timeline_size_metrics(neon_simple_env: NeonEnv):
assert matches
tl_logical_size_metric = int(matches.group(1))
# An empty database is around 8 MB. There at least 3 databases, 'postgres',
# 'template0', 'template1'. So the total size should be about 32 MB. This isn't
# very accurate and can change with different PostgreSQL versions, so allow a
# couple of MB of slack.
assert math.isclose(tl_logical_size_metric, 32 * 1024 * 1024, abs_tol=2 * 1024 * 1024)
pgdatadir = test_output_dir / "pgdata-vanilla"
pg_bin = PgBin(test_output_dir, pg_version)
port = port_distributor.get_port()
with VanillaPostgres(pgdatadir, pg_bin, port) as vanilla_pg:
vanilla_pg.configure([f"port={port}"])
vanilla_pg.start()
# Create database based on template0 because we can't connect to template0
vanilla_pg.safe_psql("CREATE TABLE foo (t text)")
vanilla_pg.safe_psql(
"""INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 100000) g"""
)
vanilla_size_sum = vanilla_pg.safe_psql(
"select sum(pg_database_size(oid)) from pg_database"
)[0][0]
# Compare the size with Vanilla postgres.
# Allow some slack, because the logical size metric includes some things like
# the SLRUs that are not included in pg_database_size().
assert math.isclose(tl_logical_size_metric, vanilla_size_sum, abs_tol=2 * 1024 * 1024)
# The sum of the sizes of all databases, as seen by pg_database_size(), should also
# be close. Again allow some slack, the logical size metric includes some things like
@@ -382,6 +412,7 @@ def test_tenant_physical_size(neon_simple_env: NeonEnv):
random.seed(100)
env = neon_simple_env
pageserver_http = env.pageserver.http_client()
client = env.pageserver.http_client()
tenant, timeline = env.neon_cli.create_tenant()
@@ -405,7 +436,7 @@ def test_tenant_physical_size(neon_simple_env: NeonEnv):
)
wait_for_last_flush_lsn(env, pg, tenant, timeline)
env.pageserver.safe_psql(f"checkpoint {tenant} {timeline}")
pageserver_http.timeline_checkpoint(tenant, timeline)
timeline_total_size += get_timeline_physical_size(timeline)

View File

@@ -59,9 +59,7 @@ def wait_lsn_force_checkpoint(
)
# force checkpoint to advance remote_consistent_lsn
with closing(ps.connect(**pageserver_conn_options)) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"checkpoint {tenant_id} {timeline_id}")
ps.http_client(auth_token).timeline_checkpoint(tenant_id, timeline_id)
# ensure that remote_consistent_lsn is advanced
wait_for_upload(
@@ -636,6 +634,9 @@ class ProposerPostgres(PgProtocol):
}
basepath = self.pg_bin.run_capture(command, env)
log.info(f"postgres --sync-safekeepers output: {basepath}")
stdout_filename = basepath + ".stdout"
with open(stdout_filename, "r") as stdout_f:
@@ -664,7 +665,9 @@ class ProposerPostgres(PgProtocol):
# insert wal in all safekeepers and run sync on proposer
def test_sync_safekeepers(
neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, port_distributor: PortDistributor
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
port_distributor: PortDistributor,
):
# We don't really need the full environment for this test, just the
@@ -701,6 +704,7 @@ def test_sync_safekeepers(
"begin_lsn": int(begin_lsn),
"epoch_start_lsn": int(epoch_start_lsn),
"truncate_lsn": int(epoch_start_lsn),
"pg_version": int(env.pg_version) * 10000,
},
)
lsn = Lsn(res["inserted_wal"]["end_lsn"])

View File

@@ -26,11 +26,11 @@ def test_wal_restore(
env.neon_cli.pageserver_stop()
port = port_distributor.get_port()
data_dir = test_output_dir / "pgsql.restored"
with VanillaPostgres(data_dir, PgBin(test_output_dir), port) as restored:
with VanillaPostgres(data_dir, PgBin(test_output_dir, env.pg_version), port) as restored:
pg_bin.run_capture(
[
os.path.join(base_dir, "libs/utils/scripts/restore_from_wal.sh"),
os.path.join(pg_distrib_dir, "bin"),
os.path.join(pg_distrib_dir, "v{}".format(env.pg_version), "bin"),
str(test_output_dir / "repo" / "safekeepers" / "sk1" / str(tenant_id) / "*"),
str(data_dir),
str(port),