From f98e54186fb1a792c7261104c18db46d1908a012 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 30 Jan 2025 18:04:28 +0200 Subject: [PATCH] Implement prewarm using lfc_prefetch --- pgxn/neon/Makefile | 2 + pgxn/neon/communicator.c | 21 ++ pgxn/neon/file_cache.c | 323 +++++++++++++++++++++++- pgxn/neon/libpagestore.c | 2 +- pgxn/neon/neon--1.5--1.6.sql | 22 ++ pgxn/neon/neon--1.6--1.5.sql | 7 + pgxn/neon/neon.h | 1 - pgxn/neon/pagestore_client.h | 1 - test_runner/regress/test_lfc_prewarm.py | 130 ++++++++++ 9 files changed, 505 insertions(+), 4 deletions(-) create mode 100644 pgxn/neon/neon--1.5--1.6.sql create mode 100644 pgxn/neon/neon--1.6--1.5.sql create mode 100644 test_runner/regress/test_lfc_prewarm.py diff --git a/pgxn/neon/Makefile b/pgxn/neon/Makefile index 426b176af9..8bcc6bf924 100644 --- a/pgxn/neon/Makefile +++ b/pgxn/neon/Makefile @@ -36,6 +36,8 @@ DATA = \ neon--1.2--1.3.sql \ neon--1.3--1.4.sql \ neon--1.4--1.5.sql \ + neon--1.5--1.6.sql \ + neon--1.6--1.5.sql \ neon--1.5--1.4.sql \ neon--1.4--1.3.sql \ neon--1.3--1.2.sql \ diff --git a/pgxn/neon/communicator.c b/pgxn/neon/communicator.c index db3e053321..142590c66d 100644 --- a/pgxn/neon/communicator.c +++ b/pgxn/neon/communicator.c @@ -788,6 +788,27 @@ prefetch_read(PrefetchRequest *slot) } } + +/* + * Wait completion of previosly registered prefetch request. + * Prefetch result should be placed in LFC by prefetch_wait_for. + */ +bool +prefetch_receive(BufferTag tag) +{ + PrfHashEntry *entry; + PrefetchRequest hashkey; + + hashkey.buftag = tag; + entry = prfh_lookup(MyPState->prf_hash, &hashkey); + if (entry != NULL && prefetch_wait_for(entry->slot->my_ring_index)) + { + prefetch_set_unused(entry->slot->my_ring_index); + return true; + } + return false; +} + /* * Disconnect hook - drop prefetches when the connection drops * diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 8c2990e57a..2da24d53f9 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -95,6 +95,7 @@ #define MB ((uint64)1024*1024) #define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK)) +#define CHUNK_BITMAP_SIZE ((BLOCKS_PER_CHUNK + 31) / 32) /* * Blocks are read or written to LFC file outside LFC critical section. @@ -119,10 +120,17 @@ typedef struct FileCacheEntry uint32 hash; uint32 offset; uint32 access_count; - uint32 state[(BLOCKS_PER_CHUNK + 31) / 32 * 2]; /* two bits per block */ + uint32 state[CHUNK_BITMAP_SIZE * 2]; /* two bits per block */ dlist_node list_node; /* LRU/holes list node */ } FileCacheEntry; +typedef struct PrewarmRequest +{ + NeonRequestId reqid; + XLogRecPtr lsn; + XLogRecPtr not_modified_since; +} PrewarmRequest; + #define GET_STATE(entry, i) (((entry)->state[(i) / 16] >> ((i) % 16 * 2)) & 3) #define SET_STATE(entry, i, new_state) (entry)->state[(i) / 16] = ((entry)->state[(i) / 16] & ~(3 << ((i) % 16 * 2))) | ((new_state) << ((i) % 16 * 2)) @@ -144,6 +152,10 @@ typedef struct FileCacheControl uint64 time_write; /* time spent writing (us) */ uint64 resizes; /* number of LFC resizes */ uint64 evicted_pages; /* number of evicted pages */ + uint32 prewarm_total_chunks; + uint32 prewarm_curr_chunk; + uint32 prewarmed_pages; + uint32 skipped_pages; dlist_head lru; /* double linked list for LRU replacement * algorithm */ dlist_head holes; /* double linked list of punched holes */ @@ -153,11 +165,19 @@ typedef struct FileCacheControl bool lfc_store_prefetch_result; +typedef struct FileCacheStateEntry +{ + BufferTag key; + uint32 bitmap[CHUNK_BITMAP_SIZE]; +} FileCacheStateEntry; + static HTAB *lfc_hash; static int lfc_desc = -1; static LWLockId lfc_lock; static int lfc_max_size; static int lfc_size_limit; +static int lfc_prewarm_limit; +static int lfc_prewarm_batch; static char *lfc_path; static uint64 lfc_generation; static FileCacheControl *lfc_ctl; @@ -508,6 +528,32 @@ lfc_init(void) NULL, NULL); + DefineCustomIntVariable("neon.file_cache_prewarm_limit", + "Maximal number of prewarmed pages", + NULL, + &lfc_prewarm_limit, + 0, /* disabled by default */ + 0, + INT_MAX, + PGC_SIGHUP, + 0, + NULL, + NULL, + NULL); + + DefineCustomIntVariable("neon.file_cache_prewarm_batch", + "Number of pages retrivied by prewarm from page server", + NULL, + &lfc_prewarm_batch, + 64, + 1, + INT_MAX, + PGC_SIGHUP, + 0, + NULL, + NULL, + NULL); + if (lfc_max_size == 0) return; @@ -521,6 +567,212 @@ lfc_init(void) #endif } +static FileCacheStateEntry* +lfc_get_state(size_t* n_entries) +{ + size_t max_entries = *n_entries; + size_t i = 0; + FileCacheStateEntry* fs; + + if (lfc_maybe_disabled() || max_entries == 0) /* fast exit if file cache is disabled */ + return NULL; + + fs = (FileCacheStateEntry*)palloc0(sizeof(FileCacheStateEntry) * max_entries); + + LWLockAcquire(lfc_lock, LW_SHARED); + + if (LFC_ENABLED()) + { + dlist_iter iter; + dlist_reverse_foreach(iter, &lfc_ctl->lru) + { + FileCacheEntry *entry = dlist_container(FileCacheEntry, list_node, iter.cur); + memcpy(&fs[i].key, &entry->key, sizeof entry->key); + for (int j = 0; j < BLOCKS_PER_CHUNK; j++) + { + if (GET_STATE(entry, j) != UNAVAILABLE) + fs[i].bitmap[j >> 5] |= (uint32)1 << (j & 31); + } + if (++i == max_entries) + break; + } + elog(LOG, "LFC: save state of %ld chunks", (long)i); + } + + LWLockRelease(lfc_lock); + + *n_entries = i; + return fs; +} + +/* + * Prewarm LFC cache to the specified state. It uses lfc_prefetch function to load prewarmed page without hoilding shared buffer lock + * and avoid race conditions with other backends. + */ +static void +lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries) +{ + size_t snd_idx = 0, rcv_idx = 0; + size_t n_sent = 0, n_received = 0; + int shard_no; + PrewarmRequest* ring; + size_t ring_size = pg_nextpower2_32(lfc_prewarm_batch); + + if (!lfc_ensure_opened()) + return; + + if (n_entries == 0 || fs == NULL) + { + elog(LOG, "LFC: prewarm is disabled"); + return; + } + + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + + /* Do not prewarm more entries than LFC limit */ + if (lfc_ctl->limit <= lfc_ctl->size) + { + LWLockRelease(lfc_lock); + return; + } + + if (n_entries > lfc_ctl->limit - lfc_ctl->size) + { + n_entries = lfc_ctl->limit - lfc_ctl->size; + } + + LWLockRelease(lfc_lock); + + /* Initialize fields used to track prewarming progress */ + lfc_ctl->prewarm_total_chunks = n_entries; + lfc_ctl->prewarm_curr_chunk = 0; + + ring = (PrewarmRequest*)palloc(sizeof(PrewarmRequest)*ring_size); + + elog(LOG, "LFC: start loading %ld chunks", (long)n_entries); + + while (true) + { + size_t chunk_no = snd_idx / BLOCKS_PER_CHUNK; + BlockNumber offs_in_chunk = snd_idx % BLOCKS_PER_CHUNK; + if (chunk_no < n_entries) + { + if (fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31))) + { + /* + * In case of prewarming replica we should be careful not to load too new version + * of the page - with LSN larger than current replay LSN. + * At primary we are always loading latest version. + */ + XLogRecPtr req_lsn = RecoveryInProgress() ? GetXLogReplayRecPtr(NULL) : UINT64_MAX; + + NeonGetPageRequest request = { + .hdr.tag = T_NeonGetPageRequest, + .hdr.lsn = req_lsn, + /* not_modified_since is filled in below */ + .rinfo = BufTagGetNRelFileInfo(fs[chunk_no].key), + .forknum = fs[chunk_no].key.forkNum, + .blkno = fs[chunk_no].key.blockNum + offs_in_chunk, + }; + shard_no = get_shard_number(&fs[chunk_no].key); + request.hdr.not_modified_since = GetLastWrittenLSN(request.rinfo, request.forknum, request.blkno); + + while (!page_server->send(shard_no, (NeonRequest *) &request) + || !page_server->flush(shard_no)) + { + /* page server disconnected: all previusly sent prefetch requests are lost */ + n_sent = 0; + n_received = 0; + } + ring[n_sent & (ring_size-1)].reqid = request.hdr.reqid; + ring[n_sent & (ring_size-1)].lsn = request.hdr.lsn; + ring[n_sent & (ring_size-1)].not_modified_since = request.hdr.not_modified_since; + n_sent += 1; + } + snd_idx += 1; + } + if (n_sent >= n_received + lfc_prewarm_batch || chunk_no == n_entries) + { + NRelFileInfo rinfo; + NeonResponse* resp; + PrewarmRequest* req = &ring[n_received & (ring_size-1)]; + + do + { + chunk_no = rcv_idx / BLOCKS_PER_CHUNK; + offs_in_chunk = rcv_idx % BLOCKS_PER_CHUNK; + rcv_idx += 1; + } while (!(fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31)))); + + shard_no = get_shard_number(&fs[chunk_no].key); + resp = page_server->receive(shard_no); + lfc_ctl->prewarm_curr_chunk = chunk_no; + rinfo = BufTagGetNRelFileInfo(fs[chunk_no].key); + + switch (resp->tag) + { + case T_NeonGetPageResponse: + if (neon_protocol_version >= 3) + { + NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp; + if (resp->reqid != req->reqid || + resp->lsn != req->lsn || + resp->not_modified_since != req->not_modified_since || + !RelFileInfoEquals(getpage_resp->req.rinfo, rinfo) || + getpage_resp->req.forknum != fs[chunk_no].key.forkNum || + getpage_resp->req.blkno != fs[chunk_no].key.blockNum + offs_in_chunk) + { + NEON_PANIC_CONNECTION_STATE(-1, PANIC, + "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u} to get page request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u}", + resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(getpage_resp->req.rinfo), getpage_resp->req.forknum, getpage_resp->req.blkno, + req->reqid, LSN_FORMAT_ARGS(req->lsn), LSN_FORMAT_ARGS(req->not_modified_since), RelFileInfoFmt(rinfo), fs[chunk_no].key.forkNum, fs[chunk_no].key.blockNum + offs_in_chunk); + } + } + break; + case T_NeonErrorResponse: + if (neon_protocol_version >= 3) + { + if (resp->reqid != req->reqid || + resp->lsn != req->lsn || + resp->not_modified_since != req->not_modified_since) + { + elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", + resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), + req->reqid, LSN_FORMAT_ARGS(req->lsn), LSN_FORMAT_ARGS(req->not_modified_since)); + } + } + /* Prefech can request page which is already dropped so PS can respond with error: just ignore it */ + elog(LOG, "LFC: page server failed to load page %u of relation %u/%u/%u.%u: %s", + fs[chunk_no].key.blockNum + offs_in_chunk, RelFileInfoFmt(rinfo), fs[chunk_no].key.forkNum, ((NeonErrorResponse *) resp)->message); + goto next_block; + default: + elog(LOG, "LFC: unexpected response type: %d", resp->tag); + return; + } + + if (lfc_prefetch(rinfo, fs[chunk_no].key.forkNum, fs[chunk_no].key.blockNum + offs_in_chunk, + ((NeonGetPageResponse*)resp)->page, req->not_modified_since)) + { + lfc_ctl->prewarmed_pages += 1; + } + else + { + lfc_ctl->skipped_pages += 1; + } + next_block: + if (++n_received == n_sent && snd_idx >= n_entries * BLOCKS_PER_CHUNK) + { + break; + } + } + } + Assert(n_sent == n_received); + pfree(ring); + lfc_ctl->prewarm_curr_chunk = n_entries; + elog(LOG, "LFC: complete prewarming: loaded %ld pages", (long)n_received); +} + + /* * Check if page is present in the cache. * Returns true if page is found in local cache. @@ -1036,6 +1288,9 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return false; + if (!lfc_ensure_opened()) + return false; + CopyNRelFileInfoToBufTag(tag, rinfo); tag.forkNum = forknum; @@ -1684,3 +1939,69 @@ approximate_working_set_size(PG_FUNCTION_ARGS) } PG_RETURN_NULL(); } + +PG_FUNCTION_INFO_V1(get_local_cache_state); + +Datum +get_local_cache_state(PG_FUNCTION_ARGS) +{ + size_t n_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0); + FileCacheStateEntry* fs = lfc_get_state(&n_entries); + if (fs != NULL) + { + size_t size_in_bytes = sizeof(FileCacheStateEntry) * n_entries; + bytea* res = (bytea*)palloc(VARHDRSZ + size_in_bytes); + + SET_VARSIZE(res, VARHDRSZ + size_in_bytes); + memcpy(VARDATA(res), fs, size_in_bytes); + pfree(fs); + + PG_RETURN_BYTEA_P(res); + } + PG_RETURN_NULL(); +} + +PG_FUNCTION_INFO_V1(prewarm_local_cache); + +Datum +prewarm_local_cache(PG_FUNCTION_ARGS) +{ + bytea* state = PG_GETARG_BYTEA_PP(0); + uint32 n_entries = VARSIZE_ANY_EXHDR(state)/sizeof(FileCacheStateEntry); + FileCacheStateEntry* fs = (FileCacheStateEntry*)VARDATA_ANY(state); + + lfc_prewarm(fs, n_entries); + + PG_RETURN_NULL(); +} + +PG_FUNCTION_INFO_V1(get_prewarm_info); + +Datum +get_prewarm_info(PG_FUNCTION_ARGS) +{ + Datum values[4]; + bool nulls[4]; + TupleDesc tupdesc; + + if (lfc_size_limit == 0) + PG_RETURN_NULL(); + + tupdesc = CreateTemplateTupleDesc(4); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "total_chunks", INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "curr_chunk", INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prewarmed_pages", INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "skipped_pages", INT4OID, -1, 0); + tupdesc = BlessTupleDesc(tupdesc); + + MemSet(nulls, 0, sizeof(nulls)); + LWLockAcquire(lfc_lock, LW_SHARED); + values[0] = Int32GetDatum(lfc_ctl->prewarm_total_chunks); + values[1] = Int32GetDatum(lfc_ctl->prewarm_curr_chunk); + values[2] = Int32GetDatum(lfc_ctl->prewarmed_pages); + values[3] = Int32GetDatum(lfc_ctl->skipped_pages); + LWLockRelease(lfc_lock); + + PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); +} + diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 64d38e7913..42f3ef673a 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -48,7 +48,6 @@ #define MIN_RECONNECT_INTERVAL_USEC 1000 #define MAX_RECONNECT_INTERVAL_USEC 1000000 - enum NeonComputeMode { CP_MODE_PRIMARY = 0, CP_MODE_REPLICA, @@ -994,6 +993,7 @@ pageserver_send(shardno_t shard_no, NeonRequest *request) pageserver_conn = NULL; } + request->reqid = GENERATE_REQUEST_ID(); req_buff = nm_pack_request(request); /* diff --git a/pgxn/neon/neon--1.5--1.6.sql b/pgxn/neon/neon--1.5--1.6.sql new file mode 100644 index 0000000000..c2f3895883 --- /dev/null +++ b/pgxn/neon/neon--1.5--1.6.sql @@ -0,0 +1,22 @@ +\echo Use "ALTER EXTENSION neon UPDATE TO '1.6'" to load this file. \quit + +CREATE FUNCTION get_prewarm_info(out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer) +RETURNS record +AS 'MODULE_PATHNAME', 'get_prewarm_info' +LANGUAGE C STRICT +PARALLEL SAFE; + +CREATE FUNCTION get_local_cache_state(max_chunks integer default null) +RETURNS bytea +AS 'MODULE_PATHNAME', 'get_local_cache_state' +LANGUAGE C +PARALLEL UNSAFE; + +CREATE FUNCTION prewarm_local_cache(state bytea) +RETURNS void +AS 'MODULE_PATHNAME', 'prewarm_local_cache' +LANGUAGE C STRICT +PARALLEL UNSAFE; + + + diff --git a/pgxn/neon/neon--1.6--1.5.sql b/pgxn/neon/neon--1.6--1.5.sql new file mode 100644 index 0000000000..0ff29933b8 --- /dev/null +++ b/pgxn/neon/neon--1.6--1.5.sql @@ -0,0 +1,7 @@ +DROP FUNCTION IF EXISTS get_prewarm_info(out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer); + +DROP FUNCTION IF EXISTS get_local_cache_state(max_chunks integer); + +DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea); + + diff --git a/pgxn/neon/neon.h b/pgxn/neon/neon.h index a2e81feb5f..b6ec62bb89 100644 --- a/pgxn/neon/neon.h +++ b/pgxn/neon/neon.h @@ -56,7 +56,6 @@ extern uint32 WAIT_EVENT_NEON_WAL_DL; (errmsg(NEON_TAG "[shard %d] " fmt, shard_no, ##__VA_ARGS__), \ errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0))) - extern void pg_init_libpagestore(void); extern void pg_init_walproposer(void); diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 0ab539fe56..858dd8b85f 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -65,7 +65,6 @@ typedef enum { SLRU_MULTIXACT_OFFSETS } SlruKind; - /*-- * supertype of all the Neon*Request structs below. * diff --git a/test_runner/regress/test_lfc_prewarm.py b/test_runner/regress/test_lfc_prewarm.py new file mode 100644 index 0000000000..a27f1423e5 --- /dev/null +++ b/test_runner/regress/test_lfc_prewarm.py @@ -0,0 +1,130 @@ +import random +import threading +import time + +import pytest +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnv +from fixtures.utils import USE_LFC + + +@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping") +def test_lfc_prewarm(neon_simple_env: NeonEnv): + env = neon_simple_env + n_records = 1000000 + + endpoint = env.endpoints.create_start( + branch_name="main", + config_lines=[ + "autovacuum = off", + "shared_buffers=1MB", + "neon.max_file_cache_size=1GB", + "neon.file_cache_size_limit=1GB", + "neon.file_cache_prewarm_limit=1000", + ], + ) + conn = endpoint.connect() + cur = conn.cursor() + cur.execute("create extension neon version '1.6'") + cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))") + cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))") + cur.execute("select get_local_cache_state()") + lfc_state = cur.fetchall()[0][0] + + endpoint.stop() + endpoint.start() + + conn = endpoint.connect() + cur = conn.cursor() + time.sleep(1) # wait until compute_ctl complete downgrade of extension to default version + cur.execute("alter extension neon update to '1.6'") + cur.execute("select prewarm_local_cache(%s)", (lfc_state,)) + + cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'") + lfc_used_pages = cur.fetchall()[0][0] + log.info(f"Used LFC size: {lfc_used_pages}") + cur.execute("select * from get_prewarm_info()") + prewarm_info = cur.fetchall()[0] + log.info(f"Prewarm info: {prewarm_info}") + log.info(f"Prewarm progress: {prewarm_info[1]*100//prewarm_info[0]}%") + + assert lfc_used_pages > 10000 + assert prewarm_info[0] > 0 and prewarm_info[0] == prewarm_info[1] + + cur.execute("select sum(pk) from t") + assert cur.fetchall()[0][0] == n_records * (n_records + 1) / 2 + + assert prewarm_info[1] > 0 + + +@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping") +def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv): + env = neon_simple_env + n_records = 1000000 + n_threads = 4 + + endpoint = env.endpoints.create_start( + branch_name="main", + config_lines=[ + "shared_buffers=1MB", + "neon.max_file_cache_size=1GB", + "neon.file_cache_size_limit=1GB", + "neon.file_cache_prewarm_limit=1000000", + ], + ) + conn = endpoint.connect() + cur = conn.cursor() + cur.execute("create extension neon version '1.6'") + cur.execute( + "create table accounts(id integer primary key, balance bigint default 0, payload text default repeat('?', 128))" + ) + cur.execute(f"insert into accounts(id) values (generate_series(1,{n_records}))") + cur.execute("select get_local_cache_state()") + lfc_state = cur.fetchall()[0][0] + + running = True + + def workload(): + conn = endpoint.connect() + cur = conn.cursor() + n_transfers = 0 + while running: + src = random.randint(1, n_records) + dst = random.randint(1, n_records) + cur.execute("update accounts set balance=balance-100 where id=%s", (src,)) + cur.execute("update accounts set balance=balance+100 where id=%s", (dst,)) + n_transfers += 1 + log.info(f"Number of transfers: {n_transfers}") + + def prewarm(): + conn = endpoint.connect() + cur = conn.cursor() + n_prewarms = 0 + while running: + cur.execute("alter system set neon.file_cache_size_limit='1MB'") + cur.execute("select pg_reload_conf()") + cur.execute("alter system set neon.file_cache_size_limit='1GB'") + cur.execute("select pg_reload_conf()") + cur.execute("select prewarm_local_cache(%s)", (lfc_state,)) + n_prewarms += 1 + log.info(f"Number of prewarms: {n_prewarms}") + + workload_threads = [] + for _ in range(n_threads): + t = threading.Thread(target=workload) + workload_threads.append(t) + t.start() + + prewarm_thread = threading.Thread(target=prewarm) + prewarm_thread.start() + + time.sleep(100) + + running = False + for t in workload_threads: + t.join() + prewarm_thread.join() + + cur.execute("select sum(balance) from accounts") + total_balance = cur.fetchall()[0][0] + assert total_balance == 0