Files
neon/test_runner/regress/test_branching.py
Heikki Linnakangas 3aca717f3d Reorganize python tests.
Merge batch_others and batch_pg_regress. The original idea was to
split all the python tests into multiple "batches" and run each batch
in parallel as a separate CI job. However, the batch_pg_regress batch
was pretty short compared to all the tests in batch_others. We could
split batch_others into multiple batches, but it actually seems better
to just treat them as one big pool of tests and use pytest's handle
the parallelism on its own. If we need to split them across multiple
nodes in the future, we could use pytest-shard or something else,
instead of managing the batches ourselves.

Merge test_neon_regress.py, test_pg_regress.py and test_isolation.py
into one file, test_pg_regress.py. Seems more clear to group all
pg_regress-based tests into one file, now that they would all be in
the same directory.
2022-08-30 18:25:38 +03:00

91 lines
3.2 KiB
Python

import random
import threading
import time
from typing import List
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, PgBin, Postgres
from performance.test_perf_pgbench import get_scales_matrix
# Test branch creation
#
# This test spawns pgbench in a thread in the background, and creates a branch while
# pgbench is running. Then it launches pgbench on the new branch, and creates another branch.
# Repeat `n_branches` times.
#
# If 'ty' == 'cascade', each branch is created from the previous branch, so that you end
# up with a branch of a branch of a branch ... of a branch. With 'ty' == 'flat',
# each branch is created from the root.
@pytest.mark.parametrize("n_branches", [10])
@pytest.mark.parametrize("scale", get_scales_matrix(1))
@pytest.mark.parametrize("ty", ["cascade", "flat"])
def test_branching_with_pgbench(
neon_simple_env: NeonEnv, pg_bin: PgBin, n_branches: int, scale: int, ty: str
):
env = neon_simple_env
# Use aggressive GC and checkpoint settings, so that we also exercise GC during the test
tenant, _ = env.neon_cli.create_tenant(
conf={
"gc_period": "5 s",
"gc_horizon": f"{1024 ** 2}",
"checkpoint_distance": f"{1024 ** 2}",
"compaction_target_size": f"{1024 ** 2}",
# set PITR interval to be small, so we can do GC
"pitr_interval": "5 s",
}
)
def run_pgbench(pg: Postgres):
connstr = pg.connstr()
log.info(f"Start a pgbench workload on pg {connstr}")
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
pg_bin.run_capture(["pgbench", "-T15", connstr])
env.neon_cli.create_branch("b0", tenant_id=tenant)
pgs: List[Postgres] = []
pgs.append(env.postgres.create_start("b0", tenant_id=tenant))
threads: List[threading.Thread] = []
threads.append(threading.Thread(target=run_pgbench, args=(pgs[0],), daemon=True))
threads[-1].start()
thread_limit = 4
for i in range(n_branches):
# random a delay between [0, 5]
delay = random.random() * 5
time.sleep(delay)
log.info(f"Sleep {delay}s")
# If the number of concurrent threads exceeds a threshold, wait for
# all the threads to finish before spawning a new one. Because the
# regression tests in this directory are run concurrently in CI, we
# want to avoid the situation that one test exhausts resources for
# other tests.
if len(threads) >= thread_limit:
for thread in threads:
thread.join()
threads = []
if ty == "cascade":
env.neon_cli.create_branch("b{}".format(i + 1), "b{}".format(i), tenant_id=tenant)
else:
env.neon_cli.create_branch("b{}".format(i + 1), "b0", tenant_id=tenant)
pgs.append(env.postgres.create_start("b{}".format(i + 1), tenant_id=tenant))
threads.append(threading.Thread(target=run_pgbench, args=(pgs[-1],), daemon=True))
threads[-1].start()
for thread in threads:
thread.join()
for pg in pgs:
res = pg.safe_psql("SELECT count(*) from pgbench_accounts")
assert res[0] == (100000 * scale,)