mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 01:42:55 +00:00
I wrote this as a regression test for the bug fixed in #8807, but seems like a useful test in general. Without the fix from PR #8807, this readily fails with an assertion failure or other errors. XXX: This is failing on 'main', even with the fix from #8807, just not as quickly. This uses a small neon.relsize_hash_size=100 setting; when I bump it up to 1000 or more, it doesn't fail anymore. But this suggests that it's still possible to "overwhelm" the relsize cache
188 lines
6.8 KiB
Python
188 lines
6.8 KiB
Python
import concurrent.futures
|
|
import time
|
|
from contextlib import closing
|
|
import random
|
|
|
|
from fixtures.log_helper import log
|
|
from fixtures.neon_fixtures import NeonEnv
|
|
from fixtures.utils import query_scalar
|
|
|
|
def test_relsize_cache(neon_simple_env: NeonEnv):
|
|
"""Stress tests the relsize cache in compute
|
|
|
|
The test runs a few different workloads in parallel on the same
|
|
table:
|
|
* INSERTs
|
|
* SELECT with seqscan
|
|
* VACUUM
|
|
|
|
The table is created with 100 indexes, to exercise the relation
|
|
extension codepath as much as possible.
|
|
|
|
At the same time, we run yet another thread which creates a new
|
|
target table, and switches 'tblname' a global variable, so that
|
|
all the other threads start to use that too. Sometimes (with 50%
|
|
probability ), it also TRUNCATEs the old table after switching, so
|
|
that the relsize "forget" function also gets exercised.
|
|
|
|
This test was written to test a bug in locking of the relsize
|
|
cache's LRU list, which lead to a corrupted LRU list, causing the
|
|
effective size of the relsize cache to shrink to just a few
|
|
entries over time as old entries were missing from the LRU list
|
|
and thus "leaked", with the right workload. This is probably more
|
|
complicated than necessary to reproduce that particular bug, but
|
|
it gives a nice variety of concurrent activities on the relsize
|
|
cache.
|
|
"""
|
|
env = neon_simple_env
|
|
env.neon_cli.create_branch("test_relsize_cache", "empty")
|
|
|
|
endpoint = env.endpoints.create_start(
|
|
"test_relsize_cache",
|
|
config_lines=[
|
|
# Make the relsize cache small, so that the LRU-based
|
|
# eviction gets exercised
|
|
"neon.relsize_hash_size=100",
|
|
|
|
# Use a large shared buffers and LFC, so that it's not
|
|
# slowed down by getpage requests to storage. They are not
|
|
# interesting for this test, and we want as much
|
|
# contention on the relsize cache as possible.
|
|
"shared_buffers='1000 MB'",
|
|
"neon.file_cache_path='file.cache'",
|
|
"neon.max_file_cache_size=512MB",
|
|
"neon.file_cache_size_limit=512MB",
|
|
],
|
|
)
|
|
|
|
conn = endpoint.connect()
|
|
cur = conn.cursor()
|
|
cur.execute("CREATE EXTENSION amcheck")
|
|
|
|
# Function to create the target table
|
|
def create_tbl(wcur, new_tblname: str):
|
|
wcur.execute(f"CREATE TABLE {new_tblname} (x bigint, y bigint, z bigint)")
|
|
for i in range(0, 100):
|
|
wcur.execute(f"CREATE INDEX relsize_test_idx_{new_tblname}_{i} ON {new_tblname} (x, y, z)")
|
|
|
|
# create initial table
|
|
tblname = "tbl_initial"
|
|
create_tbl(cur, tblname)
|
|
|
|
inserters_running = 0
|
|
total_inserts = 0
|
|
|
|
# XXX
|
|
def insert_thread(id: int):
|
|
nonlocal tblname, inserters_running, total_inserts
|
|
log.info(f"i{id}: inserter thread started")
|
|
with closing(endpoint.connect()) as wconn:
|
|
with wconn.cursor() as wcur:
|
|
|
|
wcur.execute("set synchronous_commit=off")
|
|
|
|
for i in range(0, 100):
|
|
this_tblname = tblname
|
|
wcur.execute(
|
|
f"INSERT INTO {this_tblname} SELECT 1000000000*random(), g, g FROM generate_series(1, 100) g"
|
|
)
|
|
total_inserts += 100
|
|
log.info(f"i{id}: inserted to {this_tblname}")
|
|
|
|
inserters_running -= 1
|
|
log.info(f"inserter thread {id} finished!")
|
|
|
|
# This thread periodically creates a new target table
|
|
def switcher_thread():
|
|
nonlocal tblname, inserters_running, total_inserts
|
|
log.info("switcher thread started")
|
|
wconn = endpoint.connect()
|
|
wcur = wconn.cursor()
|
|
|
|
tblcounter = 0
|
|
while inserters_running > 0:
|
|
time.sleep(0.01)
|
|
old_tblname = tblname
|
|
|
|
# Create a new target table and change the global 'tblname' variable to
|
|
# switch to it
|
|
tblcounter += 1
|
|
new_tblname = f"tbl{tblcounter}"
|
|
create_tbl(wcur, new_tblname)
|
|
tblname = new_tblname
|
|
|
|
# With 50% probability, also truncate the old table, to exercise the
|
|
# relsize "forget" codepath too
|
|
if random.random() < 0.5:
|
|
wcur.execute(f"TRUNCATE {old_tblname}")
|
|
|
|
# print a "progress repot"
|
|
log.info(f"switched to {new_tblname} ({total_inserts} inserts done)")
|
|
|
|
# Continuously run vacuum on the target table.
|
|
#
|
|
# Vacuum has the effect of invalidating the cached relation size in relcache
|
|
def vacuum_thread():
|
|
nonlocal tblname, inserters_running
|
|
log.info("vacuum thread started")
|
|
wconn = endpoint.connect()
|
|
wcur = wconn.cursor()
|
|
|
|
while inserters_running > 0:
|
|
wcur.execute(f"vacuum {tblname}")
|
|
|
|
# Continuously query the current target table
|
|
#
|
|
# This actually queries not just the latest target table, but a
|
|
# few latest ones. This is implemented by only updating the target
|
|
# table with 10% probability on each iteration. This gives a bit
|
|
# more variability on the relsize entries that are requested from
|
|
# the cache.
|
|
def query_thread(id: int):
|
|
nonlocal tblname, inserters_running
|
|
log.info(f"q{id}: query thread started")
|
|
wconn = endpoint.connect()
|
|
wcur = wconn.cursor()
|
|
wcur.execute("set max_parallel_workers_per_gather=0")
|
|
|
|
this_tblname = tblname
|
|
while inserters_running > 0:
|
|
if random.random() < 0.1:
|
|
this_tblname = tblname
|
|
wcur.execute(f"select count(*) from {this_tblname}")
|
|
|
|
log.info(f"q{id}: query thread finished!")
|
|
|
|
# With 'with', this waits for all the threads to finish
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
|
|
futures = []
|
|
|
|
# Launch all the threads
|
|
f = executor.submit(switcher_thread)
|
|
futures.append(f)
|
|
f = executor.submit(vacuum_thread)
|
|
futures.append(f)
|
|
|
|
# 5 inserter threads
|
|
for i in range(0, 5):
|
|
f = executor.submit(insert_thread, i)
|
|
futures.append(f)
|
|
inserters_running += 1
|
|
|
|
# 20 query threads
|
|
for i in range(0, 20):
|
|
f = executor.submit(query_thread, i)
|
|
futures.append(f)
|
|
|
|
for f in concurrent.futures.as_completed(futures):
|
|
ex = f.exception()
|
|
if ex:
|
|
log.info(f"exception from thread, stopping: {ex}")
|
|
inserters_running = 0 # abort the other threads
|
|
f.result()
|
|
|
|
# Finally, run amcheck on all the indexes. Most relsize cache bugs
|
|
# would result in runtime ERRORs, but doesn't hurt to do more sanity
|
|
# checking.
|
|
cur.execute(f"select bt_index_check(oid, true) from pg_class where relname like 'relsize_test_idx%'")
|