mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
Merge batch_others and batch_pg_regress. The original idea was to split all the python tests into multiple "batches" and run each batch in parallel as a separate CI job. However, the batch_pg_regress batch was pretty short compared to all the tests in batch_others. We could split batch_others into multiple batches, but it actually seems better to just treat them as one big pool of tests and use pytest's handle the parallelism on its own. If we need to split them across multiple nodes in the future, we could use pytest-shard or something else, instead of managing the batches ourselves. Merge test_neon_regress.py, test_pg_regress.py and test_isolation.py into one file, test_pg_regress.py. Seems more clear to group all pg_regress-based tests into one file, now that they would all be in the same directory.
91 lines
3.2 KiB
Python
91 lines
3.2 KiB
Python
import random
|
|
import threading
|
|
import time
|
|
from typing import List
|
|
|
|
import pytest
|
|
from fixtures.log_helper import log
|
|
from fixtures.neon_fixtures import NeonEnv, PgBin, Postgres
|
|
from performance.test_perf_pgbench import get_scales_matrix
|
|
|
|
|
|
# Test branch creation
|
|
#
|
|
# This test spawns pgbench in a thread in the background, and creates a branch while
|
|
# pgbench is running. Then it launches pgbench on the new branch, and creates another branch.
|
|
# Repeat `n_branches` times.
|
|
#
|
|
# If 'ty' == 'cascade', each branch is created from the previous branch, so that you end
|
|
# up with a branch of a branch of a branch ... of a branch. With 'ty' == 'flat',
|
|
# each branch is created from the root.
|
|
@pytest.mark.parametrize("n_branches", [10])
|
|
@pytest.mark.parametrize("scale", get_scales_matrix(1))
|
|
@pytest.mark.parametrize("ty", ["cascade", "flat"])
|
|
def test_branching_with_pgbench(
|
|
neon_simple_env: NeonEnv, pg_bin: PgBin, n_branches: int, scale: int, ty: str
|
|
):
|
|
env = neon_simple_env
|
|
|
|
# Use aggressive GC and checkpoint settings, so that we also exercise GC during the test
|
|
tenant, _ = env.neon_cli.create_tenant(
|
|
conf={
|
|
"gc_period": "5 s",
|
|
"gc_horizon": f"{1024 ** 2}",
|
|
"checkpoint_distance": f"{1024 ** 2}",
|
|
"compaction_target_size": f"{1024 ** 2}",
|
|
# set PITR interval to be small, so we can do GC
|
|
"pitr_interval": "5 s",
|
|
}
|
|
)
|
|
|
|
def run_pgbench(pg: Postgres):
|
|
connstr = pg.connstr()
|
|
|
|
log.info(f"Start a pgbench workload on pg {connstr}")
|
|
|
|
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
|
|
pg_bin.run_capture(["pgbench", "-T15", connstr])
|
|
|
|
env.neon_cli.create_branch("b0", tenant_id=tenant)
|
|
pgs: List[Postgres] = []
|
|
pgs.append(env.postgres.create_start("b0", tenant_id=tenant))
|
|
|
|
threads: List[threading.Thread] = []
|
|
threads.append(threading.Thread(target=run_pgbench, args=(pgs[0],), daemon=True))
|
|
threads[-1].start()
|
|
|
|
thread_limit = 4
|
|
|
|
for i in range(n_branches):
|
|
# random a delay between [0, 5]
|
|
delay = random.random() * 5
|
|
time.sleep(delay)
|
|
log.info(f"Sleep {delay}s")
|
|
|
|
# If the number of concurrent threads exceeds a threshold, wait for
|
|
# all the threads to finish before spawning a new one. Because the
|
|
# regression tests in this directory are run concurrently in CI, we
|
|
# want to avoid the situation that one test exhausts resources for
|
|
# other tests.
|
|
if len(threads) >= thread_limit:
|
|
for thread in threads:
|
|
thread.join()
|
|
threads = []
|
|
|
|
if ty == "cascade":
|
|
env.neon_cli.create_branch("b{}".format(i + 1), "b{}".format(i), tenant_id=tenant)
|
|
else:
|
|
env.neon_cli.create_branch("b{}".format(i + 1), "b0", tenant_id=tenant)
|
|
|
|
pgs.append(env.postgres.create_start("b{}".format(i + 1), tenant_id=tenant))
|
|
|
|
threads.append(threading.Thread(target=run_pgbench, args=(pgs[-1],), daemon=True))
|
|
threads[-1].start()
|
|
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
for pg in pgs:
|
|
res = pg.safe_psql("SELECT count(*) from pgbench_accounts")
|
|
assert res[0] == (100000 * scale,)
|