Compare commits

...

2 Commits

Author SHA1 Message Date
Heikki Linnakangas
61af437087 Add relsize cache stress test
I wrote this as a regression test for the bug fixed in #8807, but seems
like a useful test in general.

Without the fix from PR #8807, this readily fails with an assertion
failure or other errors.

XXX: This is failing on 'main', even with the fix from #8807, just not
as quickly. This uses a small neon.relsize_hash_size=100 setting; when
I bump it up to 1000 or more, it doesn't fail anymore. But this
suggests that it's still possible to "overwhelm" the relsize cache
2024-08-23 14:38:05 +03:00
Heikki Linnakangas
e986896676 Add dump_relsize_cache() function
Quite useful for seeing what's in the cache.
2024-08-23 14:37:57 +03:00
8 changed files with 280 additions and 2 deletions

View File

@@ -441,6 +441,11 @@ WAL-log them periodically, from a backgound worker.
Similarly to replications snapshot files, the CID mapping files generated during VACUUM FULL of a catalog table are WAL-logged
FIXME: But they're not, AFAICS?
FIXME: However, we do WAL-log the file in pg_logical/mappings. But AFAICS that's WAL-logged
by PostgreSQL too. Why do we need separate WAL-logging for that? See changes in rewriteheap.c
### How to get rid of the patch
WAL-log them periodically, from a backgound worker.

View File

@@ -284,6 +284,9 @@ extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum,
extern void neon_write(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, const void *buffer, bool skipFsync);
#endif
extern PGDLLEXPORT void neon_dump_relsize_cache(void);
extern void neon_writeback(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, BlockNumber nblocks);
extern BlockNumber neon_nblocks(SMgrRelation reln, ForkNumber forknum);

View File

@@ -277,3 +277,62 @@ relsize_shmem_request(void)
RequestNamedLWLockTranche("neon_relsize", 1);
}
#endif
/*
* A debugging function, to print the contents of the relsize cache as NOTICE
* messages. This is exposed in the neon_test_utils extension.
*/
void
neon_dump_relsize_cache(void)
{
HASH_SEQ_STATUS status;
RelSizeEntry *entry;
dlist_iter iter;
int cnt;
if (relsize_hash_size == 0)
{
elog(NOTICE, "relsize cache is disable");
return;
}
LWLockAcquire(relsize_lock, LW_EXCLUSIVE);
elog(NOTICE, "stats: size %lu hits: " UINT64_FORMAT " misses " UINT64_FORMAT " writes " UINT64_FORMAT,
(unsigned long) relsize_ctl->size, relsize_ctl->hits, relsize_ctl->misses, relsize_ctl->writes);
elog(NOTICE, "hash:");
cnt = 0;
hash_seq_init(&status, relsize_hash);
while ((entry = hash_seq_search(&status)) != NULL)
{
cnt++;
elog(NOTICE, "hash entry %d: rel %u/%u/%u.%u size %u",
cnt,
RelFileInfoFmt(entry->tag.rinfo),
entry->tag.forknum,
entry->size);
}
elog(NOTICE, "LRU:");
cnt = 0;
dlist_foreach(iter, &relsize_ctl->lru)
{
entry = dlist_container(RelSizeEntry, lru_node, iter.cur);
cnt++;
elog(NOTICE, "LRU entry %d: rel %u/%u/%u.%u size %u",
cnt,
RelFileInfoFmt(entry->tag.rinfo),
entry->tag.forknum,
entry->size);
if (cnt > relsize_hash_size * 2)
{
elog(NOTICE, "broken LRU chain??");
break;
}
}
LWLockRelease(relsize_lock);
}

View File

@@ -7,7 +7,7 @@ OBJS = \
neontest.o
EXTENSION = neon_test_utils
DATA = neon_test_utils--1.3.sql
DATA = neon_test_utils--1.4.sql
PGFILEDESC = "neon_test_utils - helpers for neon testing and debugging"
PG_CONFIG = pg_config

View File

@@ -69,3 +69,8 @@ BEGIN
PERFORM trigger_segfault();
END;
$$;
CREATE FUNCTION dump_relsize_cache()
RETURNS VOID
AS 'MODULE_PATHNAME', 'dump_relsize_cache'
LANGUAGE C PARALLEL UNSAFE;

View File

@@ -1,6 +1,6 @@
# neon_test_utils extension
comment = 'helpers for neon testing and debugging'
default_version = '1.3'
default_version = '1.4'
module_pathname = '$libdir/neon_test_utils'
relocatable = true
trusted = true

View File

@@ -45,6 +45,7 @@ PG_FUNCTION_INFO_V1(get_raw_page_at_lsn_ex);
PG_FUNCTION_INFO_V1(neon_xlogflush);
PG_FUNCTION_INFO_V1(trigger_panic);
PG_FUNCTION_INFO_V1(trigger_segfault);
PG_FUNCTION_INFO_V1(dump_relsize_cache);
/*
* Linkage to functions in neon module.
@@ -60,6 +61,10 @@ typedef void (*neon_read_at_lsn_type) (NRelFileInfo rinfo, ForkNumber forkNum, B
static neon_read_at_lsn_type neon_read_at_lsn_ptr;
typedef void (*neon_dump_relsize_cache_type) (void);
static neon_dump_relsize_cache_type neon_dump_relsize_cache_ptr;
/*
* Module initialize function: fetch function pointers for cross-module calls.
*/
@@ -68,12 +73,18 @@ _PG_init(void)
{
/* Asserts verify that typedefs above match original declarations */
AssertVariableIsOfType(&neon_read_at_lsn, neon_read_at_lsn_type);
AssertVariableIsOfType(&neon_dump_relsize_cache, neon_dump_relsize_cache_type);
neon_read_at_lsn_ptr = (neon_read_at_lsn_type)
load_external_function("$libdir/neon", "neon_read_at_lsn",
true, NULL);
neon_dump_relsize_cache_ptr = (neon_dump_relsize_cache_type)
load_external_function("$libdir/neon", "neon_dump_relsize_cache",
true, NULL);
}
#define neon_read_at_lsn neon_read_at_lsn_ptr
#define neon_dump_relsize_cache neon_dump_relsize_cache_ptr
/*
* test_consume_oids(int4), for rapidly consuming OIDs, to test wraparound.
@@ -528,3 +539,11 @@ trigger_segfault(PG_FUNCTION_ARGS)
*ptr = 42;
PG_RETURN_VOID();
}
Datum
dump_relsize_cache(PG_FUNCTION_ARGS)
{
neon_dump_relsize_cache();
PG_RETURN_VOID();
}

View File

@@ -0,0 +1,187 @@
import concurrent.futures
import time
from contextlib import closing
import random
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import query_scalar
def test_relsize_cache(neon_simple_env: NeonEnv):
"""Stress tests the relsize cache in compute
The test runs a few different workloads in parallel on the same
table:
* INSERTs
* SELECT with seqscan
* VACUUM
The table is created with 100 indexes, to exercise the relation
extension codepath as much as possible.
At the same time, we run yet another thread which creates a new
target table, and switches 'tblname' a global variable, so that
all the other threads start to use that too. Sometimes (with 50%
probability ), it also TRUNCATEs the old table after switching, so
that the relsize "forget" function also gets exercised.
This test was written to test a bug in locking of the relsize
cache's LRU list, which lead to a corrupted LRU list, causing the
effective size of the relsize cache to shrink to just a few
entries over time as old entries were missing from the LRU list
and thus "leaked", with the right workload. This is probably more
complicated than necessary to reproduce that particular bug, but
it gives a nice variety of concurrent activities on the relsize
cache.
"""
env = neon_simple_env
env.neon_cli.create_branch("test_relsize_cache", "empty")
endpoint = env.endpoints.create_start(
"test_relsize_cache",
config_lines=[
# Make the relsize cache small, so that the LRU-based
# eviction gets exercised
"neon.relsize_hash_size=100",
# Use a large shared buffers and LFC, so that it's not
# slowed down by getpage requests to storage. They are not
# interesting for this test, and we want as much
# contention on the relsize cache as possible.
"shared_buffers='1000 MB'",
"neon.file_cache_path='file.cache'",
"neon.max_file_cache_size=512MB",
"neon.file_cache_size_limit=512MB",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("CREATE EXTENSION amcheck")
# Function to create the target table
def create_tbl(wcur, new_tblname: str):
wcur.execute(f"CREATE TABLE {new_tblname} (x bigint, y bigint, z bigint)")
for i in range(0, 100):
wcur.execute(f"CREATE INDEX relsize_test_idx_{new_tblname}_{i} ON {new_tblname} (x, y, z)")
# create initial table
tblname = "tbl_initial"
create_tbl(cur, tblname)
inserters_running = 0
total_inserts = 0
# XXX
def insert_thread(id: int):
nonlocal tblname, inserters_running, total_inserts
log.info(f"i{id}: inserter thread started")
with closing(endpoint.connect()) as wconn:
with wconn.cursor() as wcur:
wcur.execute("set synchronous_commit=off")
for i in range(0, 100):
this_tblname = tblname
wcur.execute(
f"INSERT INTO {this_tblname} SELECT 1000000000*random(), g, g FROM generate_series(1, 100) g"
)
total_inserts += 100
log.info(f"i{id}: inserted to {this_tblname}")
inserters_running -= 1
log.info(f"inserter thread {id} finished!")
# This thread periodically creates a new target table
def switcher_thread():
nonlocal tblname, inserters_running, total_inserts
log.info("switcher thread started")
wconn = endpoint.connect()
wcur = wconn.cursor()
tblcounter = 0
while inserters_running > 0:
time.sleep(0.01)
old_tblname = tblname
# Create a new target table and change the global 'tblname' variable to
# switch to it
tblcounter += 1
new_tblname = f"tbl{tblcounter}"
create_tbl(wcur, new_tblname)
tblname = new_tblname
# With 50% probability, also truncate the old table, to exercise the
# relsize "forget" codepath too
if random.random() < 0.5:
wcur.execute(f"TRUNCATE {old_tblname}")
# print a "progress repot"
log.info(f"switched to {new_tblname} ({total_inserts} inserts done)")
# Continuously run vacuum on the target table.
#
# Vacuum has the effect of invalidating the cached relation size in relcache
def vacuum_thread():
nonlocal tblname, inserters_running
log.info("vacuum thread started")
wconn = endpoint.connect()
wcur = wconn.cursor()
while inserters_running > 0:
wcur.execute(f"vacuum {tblname}")
# Continuously query the current target table
#
# This actually queries not just the latest target table, but a
# few latest ones. This is implemented by only updating the target
# table with 10% probability on each iteration. This gives a bit
# more variability on the relsize entries that are requested from
# the cache.
def query_thread(id: int):
nonlocal tblname, inserters_running
log.info(f"q{id}: query thread started")
wconn = endpoint.connect()
wcur = wconn.cursor()
wcur.execute("set max_parallel_workers_per_gather=0")
this_tblname = tblname
while inserters_running > 0:
if random.random() < 0.1:
this_tblname = tblname
wcur.execute(f"select count(*) from {this_tblname}")
log.info(f"q{id}: query thread finished!")
# With 'with', this waits for all the threads to finish
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
futures = []
# Launch all the threads
f = executor.submit(switcher_thread)
futures.append(f)
f = executor.submit(vacuum_thread)
futures.append(f)
# 5 inserter threads
for i in range(0, 5):
f = executor.submit(insert_thread, i)
futures.append(f)
inserters_running += 1
# 20 query threads
for i in range(0, 20):
f = executor.submit(query_thread, i)
futures.append(f)
for f in concurrent.futures.as_completed(futures):
ex = f.exception()
if ex:
log.info(f"exception from thread, stopping: {ex}")
inserters_running = 0 # abort the other threads
f.result()
# Finally, run amcheck on all the indexes. Most relsize cache bugs
# would result in runtime ERRORs, but doesn't hurt to do more sanity
# checking.
cur.execute(f"select bt_index_check(oid, true) from pg_class where relname like 'relsize_test_idx%'")