mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-18 19:02:56 +00:00
Compare commits
2 Commits
release-pr
...
test-relsi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
61af437087 | ||
|
|
e986896676 |
@@ -441,6 +441,11 @@ WAL-log them periodically, from a backgound worker.
|
||||
|
||||
Similarly to replications snapshot files, the CID mapping files generated during VACUUM FULL of a catalog table are WAL-logged
|
||||
|
||||
FIXME: But they're not, AFAICS?
|
||||
|
||||
FIXME: However, we do WAL-log the file in pg_logical/mappings. But AFAICS that's WAL-logged
|
||||
by PostgreSQL too. Why do we need separate WAL-logging for that? See changes in rewriteheap.c
|
||||
|
||||
### How to get rid of the patch
|
||||
|
||||
WAL-log them periodically, from a backgound worker.
|
||||
|
||||
@@ -284,6 +284,9 @@ extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum,
|
||||
extern void neon_write(SMgrRelation reln, ForkNumber forknum,
|
||||
BlockNumber blocknum, const void *buffer, bool skipFsync);
|
||||
#endif
|
||||
|
||||
extern PGDLLEXPORT void neon_dump_relsize_cache(void);
|
||||
|
||||
extern void neon_writeback(SMgrRelation reln, ForkNumber forknum,
|
||||
BlockNumber blocknum, BlockNumber nblocks);
|
||||
extern BlockNumber neon_nblocks(SMgrRelation reln, ForkNumber forknum);
|
||||
|
||||
@@ -277,3 +277,62 @@ relsize_shmem_request(void)
|
||||
RequestNamedLWLockTranche("neon_relsize", 1);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
/*
|
||||
* A debugging function, to print the contents of the relsize cache as NOTICE
|
||||
* messages. This is exposed in the neon_test_utils extension.
|
||||
*/
|
||||
void
|
||||
neon_dump_relsize_cache(void)
|
||||
{
|
||||
HASH_SEQ_STATUS status;
|
||||
RelSizeEntry *entry;
|
||||
dlist_iter iter;
|
||||
int cnt;
|
||||
|
||||
if (relsize_hash_size == 0)
|
||||
{
|
||||
elog(NOTICE, "relsize cache is disable");
|
||||
return;
|
||||
}
|
||||
|
||||
LWLockAcquire(relsize_lock, LW_EXCLUSIVE);
|
||||
|
||||
elog(NOTICE, "stats: size %lu hits: " UINT64_FORMAT " misses " UINT64_FORMAT " writes " UINT64_FORMAT,
|
||||
(unsigned long) relsize_ctl->size, relsize_ctl->hits, relsize_ctl->misses, relsize_ctl->writes);
|
||||
|
||||
elog(NOTICE, "hash:");
|
||||
cnt = 0;
|
||||
hash_seq_init(&status, relsize_hash);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
cnt++;
|
||||
elog(NOTICE, "hash entry %d: rel %u/%u/%u.%u size %u",
|
||||
cnt,
|
||||
RelFileInfoFmt(entry->tag.rinfo),
|
||||
entry->tag.forknum,
|
||||
entry->size);
|
||||
}
|
||||
|
||||
elog(NOTICE, "LRU:");
|
||||
cnt = 0;
|
||||
dlist_foreach(iter, &relsize_ctl->lru)
|
||||
{
|
||||
entry = dlist_container(RelSizeEntry, lru_node, iter.cur);
|
||||
cnt++;
|
||||
elog(NOTICE, "LRU entry %d: rel %u/%u/%u.%u size %u",
|
||||
cnt,
|
||||
RelFileInfoFmt(entry->tag.rinfo),
|
||||
entry->tag.forknum,
|
||||
entry->size);
|
||||
|
||||
if (cnt > relsize_hash_size * 2)
|
||||
{
|
||||
elog(NOTICE, "broken LRU chain??");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
LWLockRelease(relsize_lock);
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ OBJS = \
|
||||
neontest.o
|
||||
|
||||
EXTENSION = neon_test_utils
|
||||
DATA = neon_test_utils--1.3.sql
|
||||
DATA = neon_test_utils--1.4.sql
|
||||
PGFILEDESC = "neon_test_utils - helpers for neon testing and debugging"
|
||||
|
||||
PG_CONFIG = pg_config
|
||||
|
||||
@@ -69,3 +69,8 @@ BEGIN
|
||||
PERFORM trigger_segfault();
|
||||
END;
|
||||
$$;
|
||||
|
||||
CREATE FUNCTION dump_relsize_cache()
|
||||
RETURNS VOID
|
||||
AS 'MODULE_PATHNAME', 'dump_relsize_cache'
|
||||
LANGUAGE C PARALLEL UNSAFE;
|
||||
@@ -1,6 +1,6 @@
|
||||
# neon_test_utils extension
|
||||
comment = 'helpers for neon testing and debugging'
|
||||
default_version = '1.3'
|
||||
default_version = '1.4'
|
||||
module_pathname = '$libdir/neon_test_utils'
|
||||
relocatable = true
|
||||
trusted = true
|
||||
|
||||
@@ -45,6 +45,7 @@ PG_FUNCTION_INFO_V1(get_raw_page_at_lsn_ex);
|
||||
PG_FUNCTION_INFO_V1(neon_xlogflush);
|
||||
PG_FUNCTION_INFO_V1(trigger_panic);
|
||||
PG_FUNCTION_INFO_V1(trigger_segfault);
|
||||
PG_FUNCTION_INFO_V1(dump_relsize_cache);
|
||||
|
||||
/*
|
||||
* Linkage to functions in neon module.
|
||||
@@ -60,6 +61,10 @@ typedef void (*neon_read_at_lsn_type) (NRelFileInfo rinfo, ForkNumber forkNum, B
|
||||
|
||||
static neon_read_at_lsn_type neon_read_at_lsn_ptr;
|
||||
|
||||
typedef void (*neon_dump_relsize_cache_type) (void);
|
||||
|
||||
static neon_dump_relsize_cache_type neon_dump_relsize_cache_ptr;
|
||||
|
||||
/*
|
||||
* Module initialize function: fetch function pointers for cross-module calls.
|
||||
*/
|
||||
@@ -68,12 +73,18 @@ _PG_init(void)
|
||||
{
|
||||
/* Asserts verify that typedefs above match original declarations */
|
||||
AssertVariableIsOfType(&neon_read_at_lsn, neon_read_at_lsn_type);
|
||||
AssertVariableIsOfType(&neon_dump_relsize_cache, neon_dump_relsize_cache_type);
|
||||
neon_read_at_lsn_ptr = (neon_read_at_lsn_type)
|
||||
load_external_function("$libdir/neon", "neon_read_at_lsn",
|
||||
true, NULL);
|
||||
|
||||
neon_dump_relsize_cache_ptr = (neon_dump_relsize_cache_type)
|
||||
load_external_function("$libdir/neon", "neon_dump_relsize_cache",
|
||||
true, NULL);
|
||||
}
|
||||
|
||||
#define neon_read_at_lsn neon_read_at_lsn_ptr
|
||||
#define neon_dump_relsize_cache neon_dump_relsize_cache_ptr
|
||||
|
||||
/*
|
||||
* test_consume_oids(int4), for rapidly consuming OIDs, to test wraparound.
|
||||
@@ -528,3 +539,11 @@ trigger_segfault(PG_FUNCTION_ARGS)
|
||||
*ptr = 42;
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
Datum
|
||||
dump_relsize_cache(PG_FUNCTION_ARGS)
|
||||
{
|
||||
neon_dump_relsize_cache();
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
187
test_runner/regress/test_relsize_cache.py
Normal file
187
test_runner/regress/test_relsize_cache.py
Normal file
@@ -0,0 +1,187 @@
|
||||
import concurrent.futures
|
||||
import time
|
||||
from contextlib import closing
|
||||
import random
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
def test_relsize_cache(neon_simple_env: NeonEnv):
|
||||
"""Stress tests the relsize cache in compute
|
||||
|
||||
The test runs a few different workloads in parallel on the same
|
||||
table:
|
||||
* INSERTs
|
||||
* SELECT with seqscan
|
||||
* VACUUM
|
||||
|
||||
The table is created with 100 indexes, to exercise the relation
|
||||
extension codepath as much as possible.
|
||||
|
||||
At the same time, we run yet another thread which creates a new
|
||||
target table, and switches 'tblname' a global variable, so that
|
||||
all the other threads start to use that too. Sometimes (with 50%
|
||||
probability ), it also TRUNCATEs the old table after switching, so
|
||||
that the relsize "forget" function also gets exercised.
|
||||
|
||||
This test was written to test a bug in locking of the relsize
|
||||
cache's LRU list, which lead to a corrupted LRU list, causing the
|
||||
effective size of the relsize cache to shrink to just a few
|
||||
entries over time as old entries were missing from the LRU list
|
||||
and thus "leaked", with the right workload. This is probably more
|
||||
complicated than necessary to reproduce that particular bug, but
|
||||
it gives a nice variety of concurrent activities on the relsize
|
||||
cache.
|
||||
"""
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_relsize_cache", "empty")
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_relsize_cache",
|
||||
config_lines=[
|
||||
# Make the relsize cache small, so that the LRU-based
|
||||
# eviction gets exercised
|
||||
"neon.relsize_hash_size=100",
|
||||
|
||||
# Use a large shared buffers and LFC, so that it's not
|
||||
# slowed down by getpage requests to storage. They are not
|
||||
# interesting for this test, and we want as much
|
||||
# contention on the relsize cache as possible.
|
||||
"shared_buffers='1000 MB'",
|
||||
"neon.file_cache_path='file.cache'",
|
||||
"neon.max_file_cache_size=512MB",
|
||||
"neon.file_cache_size_limit=512MB",
|
||||
],
|
||||
)
|
||||
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute("CREATE EXTENSION amcheck")
|
||||
|
||||
# Function to create the target table
|
||||
def create_tbl(wcur, new_tblname: str):
|
||||
wcur.execute(f"CREATE TABLE {new_tblname} (x bigint, y bigint, z bigint)")
|
||||
for i in range(0, 100):
|
||||
wcur.execute(f"CREATE INDEX relsize_test_idx_{new_tblname}_{i} ON {new_tblname} (x, y, z)")
|
||||
|
||||
# create initial table
|
||||
tblname = "tbl_initial"
|
||||
create_tbl(cur, tblname)
|
||||
|
||||
inserters_running = 0
|
||||
total_inserts = 0
|
||||
|
||||
# XXX
|
||||
def insert_thread(id: int):
|
||||
nonlocal tblname, inserters_running, total_inserts
|
||||
log.info(f"i{id}: inserter thread started")
|
||||
with closing(endpoint.connect()) as wconn:
|
||||
with wconn.cursor() as wcur:
|
||||
|
||||
wcur.execute("set synchronous_commit=off")
|
||||
|
||||
for i in range(0, 100):
|
||||
this_tblname = tblname
|
||||
wcur.execute(
|
||||
f"INSERT INTO {this_tblname} SELECT 1000000000*random(), g, g FROM generate_series(1, 100) g"
|
||||
)
|
||||
total_inserts += 100
|
||||
log.info(f"i{id}: inserted to {this_tblname}")
|
||||
|
||||
inserters_running -= 1
|
||||
log.info(f"inserter thread {id} finished!")
|
||||
|
||||
# This thread periodically creates a new target table
|
||||
def switcher_thread():
|
||||
nonlocal tblname, inserters_running, total_inserts
|
||||
log.info("switcher thread started")
|
||||
wconn = endpoint.connect()
|
||||
wcur = wconn.cursor()
|
||||
|
||||
tblcounter = 0
|
||||
while inserters_running > 0:
|
||||
time.sleep(0.01)
|
||||
old_tblname = tblname
|
||||
|
||||
# Create a new target table and change the global 'tblname' variable to
|
||||
# switch to it
|
||||
tblcounter += 1
|
||||
new_tblname = f"tbl{tblcounter}"
|
||||
create_tbl(wcur, new_tblname)
|
||||
tblname = new_tblname
|
||||
|
||||
# With 50% probability, also truncate the old table, to exercise the
|
||||
# relsize "forget" codepath too
|
||||
if random.random() < 0.5:
|
||||
wcur.execute(f"TRUNCATE {old_tblname}")
|
||||
|
||||
# print a "progress repot"
|
||||
log.info(f"switched to {new_tblname} ({total_inserts} inserts done)")
|
||||
|
||||
# Continuously run vacuum on the target table.
|
||||
#
|
||||
# Vacuum has the effect of invalidating the cached relation size in relcache
|
||||
def vacuum_thread():
|
||||
nonlocal tblname, inserters_running
|
||||
log.info("vacuum thread started")
|
||||
wconn = endpoint.connect()
|
||||
wcur = wconn.cursor()
|
||||
|
||||
while inserters_running > 0:
|
||||
wcur.execute(f"vacuum {tblname}")
|
||||
|
||||
# Continuously query the current target table
|
||||
#
|
||||
# This actually queries not just the latest target table, but a
|
||||
# few latest ones. This is implemented by only updating the target
|
||||
# table with 10% probability on each iteration. This gives a bit
|
||||
# more variability on the relsize entries that are requested from
|
||||
# the cache.
|
||||
def query_thread(id: int):
|
||||
nonlocal tblname, inserters_running
|
||||
log.info(f"q{id}: query thread started")
|
||||
wconn = endpoint.connect()
|
||||
wcur = wconn.cursor()
|
||||
wcur.execute("set max_parallel_workers_per_gather=0")
|
||||
|
||||
this_tblname = tblname
|
||||
while inserters_running > 0:
|
||||
if random.random() < 0.1:
|
||||
this_tblname = tblname
|
||||
wcur.execute(f"select count(*) from {this_tblname}")
|
||||
|
||||
log.info(f"q{id}: query thread finished!")
|
||||
|
||||
# With 'with', this waits for all the threads to finish
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
|
||||
futures = []
|
||||
|
||||
# Launch all the threads
|
||||
f = executor.submit(switcher_thread)
|
||||
futures.append(f)
|
||||
f = executor.submit(vacuum_thread)
|
||||
futures.append(f)
|
||||
|
||||
# 5 inserter threads
|
||||
for i in range(0, 5):
|
||||
f = executor.submit(insert_thread, i)
|
||||
futures.append(f)
|
||||
inserters_running += 1
|
||||
|
||||
# 20 query threads
|
||||
for i in range(0, 20):
|
||||
f = executor.submit(query_thread, i)
|
||||
futures.append(f)
|
||||
|
||||
for f in concurrent.futures.as_completed(futures):
|
||||
ex = f.exception()
|
||||
if ex:
|
||||
log.info(f"exception from thread, stopping: {ex}")
|
||||
inserters_running = 0 # abort the other threads
|
||||
f.result()
|
||||
|
||||
# Finally, run amcheck on all the indexes. Most relsize cache bugs
|
||||
# would result in runtime ERRORs, but doesn't hurt to do more sanity
|
||||
# checking.
|
||||
cur.execute(f"select bt_index_check(oid, true) from pg_class where relname like 'relsize_test_idx%'")
|
||||
Reference in New Issue
Block a user