Files
neon/test_runner/regress/test_relsize_cache.py
Heikki Linnakangas 61af437087 Add relsize cache stress test
I wrote this as a regression test for the bug fixed in #8807, but seems
like a useful test in general.

Without the fix from PR #8807, this readily fails with an assertion
failure or other errors.

XXX: This is failing on 'main', even with the fix from #8807, just not
as quickly. This uses a small neon.relsize_hash_size=100 setting; when
I bump it up to 1000 or more, it doesn't fail anymore. But this
suggests that it's still possible to "overwhelm" the relsize cache
2024-08-23 14:38:05 +03:00

188 lines
6.8 KiB
Python

import concurrent.futures
import time
from contextlib import closing
import random
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import query_scalar
def test_relsize_cache(neon_simple_env: NeonEnv):
"""Stress tests the relsize cache in compute
The test runs a few different workloads in parallel on the same
table:
* INSERTs
* SELECT with seqscan
* VACUUM
The table is created with 100 indexes, to exercise the relation
extension codepath as much as possible.
At the same time, we run yet another thread which creates a new
target table, and switches 'tblname' a global variable, so that
all the other threads start to use that too. Sometimes (with 50%
probability ), it also TRUNCATEs the old table after switching, so
that the relsize "forget" function also gets exercised.
This test was written to test a bug in locking of the relsize
cache's LRU list, which lead to a corrupted LRU list, causing the
effective size of the relsize cache to shrink to just a few
entries over time as old entries were missing from the LRU list
and thus "leaked", with the right workload. This is probably more
complicated than necessary to reproduce that particular bug, but
it gives a nice variety of concurrent activities on the relsize
cache.
"""
env = neon_simple_env
env.neon_cli.create_branch("test_relsize_cache", "empty")
endpoint = env.endpoints.create_start(
"test_relsize_cache",
config_lines=[
# Make the relsize cache small, so that the LRU-based
# eviction gets exercised
"neon.relsize_hash_size=100",
# Use a large shared buffers and LFC, so that it's not
# slowed down by getpage requests to storage. They are not
# interesting for this test, and we want as much
# contention on the relsize cache as possible.
"shared_buffers='1000 MB'",
"neon.file_cache_path='file.cache'",
"neon.max_file_cache_size=512MB",
"neon.file_cache_size_limit=512MB",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("CREATE EXTENSION amcheck")
# Function to create the target table
def create_tbl(wcur, new_tblname: str):
wcur.execute(f"CREATE TABLE {new_tblname} (x bigint, y bigint, z bigint)")
for i in range(0, 100):
wcur.execute(f"CREATE INDEX relsize_test_idx_{new_tblname}_{i} ON {new_tblname} (x, y, z)")
# create initial table
tblname = "tbl_initial"
create_tbl(cur, tblname)
inserters_running = 0
total_inserts = 0
# XXX
def insert_thread(id: int):
nonlocal tblname, inserters_running, total_inserts
log.info(f"i{id}: inserter thread started")
with closing(endpoint.connect()) as wconn:
with wconn.cursor() as wcur:
wcur.execute("set synchronous_commit=off")
for i in range(0, 100):
this_tblname = tblname
wcur.execute(
f"INSERT INTO {this_tblname} SELECT 1000000000*random(), g, g FROM generate_series(1, 100) g"
)
total_inserts += 100
log.info(f"i{id}: inserted to {this_tblname}")
inserters_running -= 1
log.info(f"inserter thread {id} finished!")
# This thread periodically creates a new target table
def switcher_thread():
nonlocal tblname, inserters_running, total_inserts
log.info("switcher thread started")
wconn = endpoint.connect()
wcur = wconn.cursor()
tblcounter = 0
while inserters_running > 0:
time.sleep(0.01)
old_tblname = tblname
# Create a new target table and change the global 'tblname' variable to
# switch to it
tblcounter += 1
new_tblname = f"tbl{tblcounter}"
create_tbl(wcur, new_tblname)
tblname = new_tblname
# With 50% probability, also truncate the old table, to exercise the
# relsize "forget" codepath too
if random.random() < 0.5:
wcur.execute(f"TRUNCATE {old_tblname}")
# print a "progress repot"
log.info(f"switched to {new_tblname} ({total_inserts} inserts done)")
# Continuously run vacuum on the target table.
#
# Vacuum has the effect of invalidating the cached relation size in relcache
def vacuum_thread():
nonlocal tblname, inserters_running
log.info("vacuum thread started")
wconn = endpoint.connect()
wcur = wconn.cursor()
while inserters_running > 0:
wcur.execute(f"vacuum {tblname}")
# Continuously query the current target table
#
# This actually queries not just the latest target table, but a
# few latest ones. This is implemented by only updating the target
# table with 10% probability on each iteration. This gives a bit
# more variability on the relsize entries that are requested from
# the cache.
def query_thread(id: int):
nonlocal tblname, inserters_running
log.info(f"q{id}: query thread started")
wconn = endpoint.connect()
wcur = wconn.cursor()
wcur.execute("set max_parallel_workers_per_gather=0")
this_tblname = tblname
while inserters_running > 0:
if random.random() < 0.1:
this_tblname = tblname
wcur.execute(f"select count(*) from {this_tblname}")
log.info(f"q{id}: query thread finished!")
# With 'with', this waits for all the threads to finish
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
futures = []
# Launch all the threads
f = executor.submit(switcher_thread)
futures.append(f)
f = executor.submit(vacuum_thread)
futures.append(f)
# 5 inserter threads
for i in range(0, 5):
f = executor.submit(insert_thread, i)
futures.append(f)
inserters_running += 1
# 20 query threads
for i in range(0, 20):
f = executor.submit(query_thread, i)
futures.append(f)
for f in concurrent.futures.as_completed(futures):
ex = f.exception()
if ex:
log.info(f"exception from thread, stopping: {ex}")
inserters_running = 0 # abort the other threads
f.result()
# Finally, run amcheck on all the indexes. Most relsize cache bugs
# would result in runtime ERRORs, but doesn't hurt to do more sanity
# checking.
cur.execute(f"select bt_index_check(oid, true) from pg_class where relname like 'relsize_test_idx%'")