Implement prewarm using lfc_prefetch

This commit is contained in:
Konstantin Knizhnik
2025-01-30 18:04:28 +02:00
parent f370046e46
commit f98e54186f
9 changed files with 505 additions and 4 deletions

View File

@@ -36,6 +36,8 @@ DATA = \
neon--1.2--1.3.sql \
neon--1.3--1.4.sql \
neon--1.4--1.5.sql \
neon--1.5--1.6.sql \
neon--1.6--1.5.sql \
neon--1.5--1.4.sql \
neon--1.4--1.3.sql \
neon--1.3--1.2.sql \

View File

@@ -788,6 +788,27 @@ prefetch_read(PrefetchRequest *slot)
}
}
/*
* Wait completion of previosly registered prefetch request.
* Prefetch result should be placed in LFC by prefetch_wait_for.
*/
bool
prefetch_receive(BufferTag tag)
{
PrfHashEntry *entry;
PrefetchRequest hashkey;
hashkey.buftag = tag;
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
if (entry != NULL && prefetch_wait_for(entry->slot->my_ring_index))
{
prefetch_set_unused(entry->slot->my_ring_index);
return true;
}
return false;
}
/*
* Disconnect hook - drop prefetches when the connection drops
*

View File

@@ -95,6 +95,7 @@
#define MB ((uint64)1024*1024)
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK))
#define CHUNK_BITMAP_SIZE ((BLOCKS_PER_CHUNK + 31) / 32)
/*
* Blocks are read or written to LFC file outside LFC critical section.
@@ -119,10 +120,17 @@ typedef struct FileCacheEntry
uint32 hash;
uint32 offset;
uint32 access_count;
uint32 state[(BLOCKS_PER_CHUNK + 31) / 32 * 2]; /* two bits per block */
uint32 state[CHUNK_BITMAP_SIZE * 2]; /* two bits per block */
dlist_node list_node; /* LRU/holes list node */
} FileCacheEntry;
typedef struct PrewarmRequest
{
NeonRequestId reqid;
XLogRecPtr lsn;
XLogRecPtr not_modified_since;
} PrewarmRequest;
#define GET_STATE(entry, i) (((entry)->state[(i) / 16] >> ((i) % 16 * 2)) & 3)
#define SET_STATE(entry, i, new_state) (entry)->state[(i) / 16] = ((entry)->state[(i) / 16] & ~(3 << ((i) % 16 * 2))) | ((new_state) << ((i) % 16 * 2))
@@ -144,6 +152,10 @@ typedef struct FileCacheControl
uint64 time_write; /* time spent writing (us) */
uint64 resizes; /* number of LFC resizes */
uint64 evicted_pages; /* number of evicted pages */
uint32 prewarm_total_chunks;
uint32 prewarm_curr_chunk;
uint32 prewarmed_pages;
uint32 skipped_pages;
dlist_head lru; /* double linked list for LRU replacement
* algorithm */
dlist_head holes; /* double linked list of punched holes */
@@ -153,11 +165,19 @@ typedef struct FileCacheControl
bool lfc_store_prefetch_result;
typedef struct FileCacheStateEntry
{
BufferTag key;
uint32 bitmap[CHUNK_BITMAP_SIZE];
} FileCacheStateEntry;
static HTAB *lfc_hash;
static int lfc_desc = -1;
static LWLockId lfc_lock;
static int lfc_max_size;
static int lfc_size_limit;
static int lfc_prewarm_limit;
static int lfc_prewarm_batch;
static char *lfc_path;
static uint64 lfc_generation;
static FileCacheControl *lfc_ctl;
@@ -508,6 +528,32 @@ lfc_init(void)
NULL,
NULL);
DefineCustomIntVariable("neon.file_cache_prewarm_limit",
"Maximal number of prewarmed pages",
NULL,
&lfc_prewarm_limit,
0, /* disabled by default */
0,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
DefineCustomIntVariable("neon.file_cache_prewarm_batch",
"Number of pages retrivied by prewarm from page server",
NULL,
&lfc_prewarm_batch,
64,
1,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
if (lfc_max_size == 0)
return;
@@ -521,6 +567,212 @@ lfc_init(void)
#endif
}
static FileCacheStateEntry*
lfc_get_state(size_t* n_entries)
{
size_t max_entries = *n_entries;
size_t i = 0;
FileCacheStateEntry* fs;
if (lfc_maybe_disabled() || max_entries == 0) /* fast exit if file cache is disabled */
return NULL;
fs = (FileCacheStateEntry*)palloc0(sizeof(FileCacheStateEntry) * max_entries);
LWLockAcquire(lfc_lock, LW_SHARED);
if (LFC_ENABLED())
{
dlist_iter iter;
dlist_reverse_foreach(iter, &lfc_ctl->lru)
{
FileCacheEntry *entry = dlist_container(FileCacheEntry, list_node, iter.cur);
memcpy(&fs[i].key, &entry->key, sizeof entry->key);
for (int j = 0; j < BLOCKS_PER_CHUNK; j++)
{
if (GET_STATE(entry, j) != UNAVAILABLE)
fs[i].bitmap[j >> 5] |= (uint32)1 << (j & 31);
}
if (++i == max_entries)
break;
}
elog(LOG, "LFC: save state of %ld chunks", (long)i);
}
LWLockRelease(lfc_lock);
*n_entries = i;
return fs;
}
/*
* Prewarm LFC cache to the specified state. It uses lfc_prefetch function to load prewarmed page without hoilding shared buffer lock
* and avoid race conditions with other backends.
*/
static void
lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries)
{
size_t snd_idx = 0, rcv_idx = 0;
size_t n_sent = 0, n_received = 0;
int shard_no;
PrewarmRequest* ring;
size_t ring_size = pg_nextpower2_32(lfc_prewarm_batch);
if (!lfc_ensure_opened())
return;
if (n_entries == 0 || fs == NULL)
{
elog(LOG, "LFC: prewarm is disabled");
return;
}
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
/* Do not prewarm more entries than LFC limit */
if (lfc_ctl->limit <= lfc_ctl->size)
{
LWLockRelease(lfc_lock);
return;
}
if (n_entries > lfc_ctl->limit - lfc_ctl->size)
{
n_entries = lfc_ctl->limit - lfc_ctl->size;
}
LWLockRelease(lfc_lock);
/* Initialize fields used to track prewarming progress */
lfc_ctl->prewarm_total_chunks = n_entries;
lfc_ctl->prewarm_curr_chunk = 0;
ring = (PrewarmRequest*)palloc(sizeof(PrewarmRequest)*ring_size);
elog(LOG, "LFC: start loading %ld chunks", (long)n_entries);
while (true)
{
size_t chunk_no = snd_idx / BLOCKS_PER_CHUNK;
BlockNumber offs_in_chunk = snd_idx % BLOCKS_PER_CHUNK;
if (chunk_no < n_entries)
{
if (fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31)))
{
/*
* In case of prewarming replica we should be careful not to load too new version
* of the page - with LSN larger than current replay LSN.
* At primary we are always loading latest version.
*/
XLogRecPtr req_lsn = RecoveryInProgress() ? GetXLogReplayRecPtr(NULL) : UINT64_MAX;
NeonGetPageRequest request = {
.hdr.tag = T_NeonGetPageRequest,
.hdr.lsn = req_lsn,
/* not_modified_since is filled in below */
.rinfo = BufTagGetNRelFileInfo(fs[chunk_no].key),
.forknum = fs[chunk_no].key.forkNum,
.blkno = fs[chunk_no].key.blockNum + offs_in_chunk,
};
shard_no = get_shard_number(&fs[chunk_no].key);
request.hdr.not_modified_since = GetLastWrittenLSN(request.rinfo, request.forknum, request.blkno);
while (!page_server->send(shard_no, (NeonRequest *) &request)
|| !page_server->flush(shard_no))
{
/* page server disconnected: all previusly sent prefetch requests are lost */
n_sent = 0;
n_received = 0;
}
ring[n_sent & (ring_size-1)].reqid = request.hdr.reqid;
ring[n_sent & (ring_size-1)].lsn = request.hdr.lsn;
ring[n_sent & (ring_size-1)].not_modified_since = request.hdr.not_modified_since;
n_sent += 1;
}
snd_idx += 1;
}
if (n_sent >= n_received + lfc_prewarm_batch || chunk_no == n_entries)
{
NRelFileInfo rinfo;
NeonResponse* resp;
PrewarmRequest* req = &ring[n_received & (ring_size-1)];
do
{
chunk_no = rcv_idx / BLOCKS_PER_CHUNK;
offs_in_chunk = rcv_idx % BLOCKS_PER_CHUNK;
rcv_idx += 1;
} while (!(fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31))));
shard_no = get_shard_number(&fs[chunk_no].key);
resp = page_server->receive(shard_no);
lfc_ctl->prewarm_curr_chunk = chunk_no;
rinfo = BufTagGetNRelFileInfo(fs[chunk_no].key);
switch (resp->tag)
{
case T_NeonGetPageResponse:
if (neon_protocol_version >= 3)
{
NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp;
if (resp->reqid != req->reqid ||
resp->lsn != req->lsn ||
resp->not_modified_since != req->not_modified_since ||
!RelFileInfoEquals(getpage_resp->req.rinfo, rinfo) ||
getpage_resp->req.forknum != fs[chunk_no].key.forkNum ||
getpage_resp->req.blkno != fs[chunk_no].key.blockNum + offs_in_chunk)
{
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
"Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u} to get page request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(getpage_resp->req.rinfo), getpage_resp->req.forknum, getpage_resp->req.blkno,
req->reqid, LSN_FORMAT_ARGS(req->lsn), LSN_FORMAT_ARGS(req->not_modified_since), RelFileInfoFmt(rinfo), fs[chunk_no].key.forkNum, fs[chunk_no].key.blockNum + offs_in_chunk);
}
}
break;
case T_NeonErrorResponse:
if (neon_protocol_version >= 3)
{
if (resp->reqid != req->reqid ||
resp->lsn != req->lsn ||
resp->not_modified_since != req->not_modified_since)
{
elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since),
req->reqid, LSN_FORMAT_ARGS(req->lsn), LSN_FORMAT_ARGS(req->not_modified_since));
}
}
/* Prefech can request page which is already dropped so PS can respond with error: just ignore it */
elog(LOG, "LFC: page server failed to load page %u of relation %u/%u/%u.%u: %s",
fs[chunk_no].key.blockNum + offs_in_chunk, RelFileInfoFmt(rinfo), fs[chunk_no].key.forkNum, ((NeonErrorResponse *) resp)->message);
goto next_block;
default:
elog(LOG, "LFC: unexpected response type: %d", resp->tag);
return;
}
if (lfc_prefetch(rinfo, fs[chunk_no].key.forkNum, fs[chunk_no].key.blockNum + offs_in_chunk,
((NeonGetPageResponse*)resp)->page, req->not_modified_since))
{
lfc_ctl->prewarmed_pages += 1;
}
else
{
lfc_ctl->skipped_pages += 1;
}
next_block:
if (++n_received == n_sent && snd_idx >= n_entries * BLOCKS_PER_CHUNK)
{
break;
}
}
}
Assert(n_sent == n_received);
pfree(ring);
lfc_ctl->prewarm_curr_chunk = n_entries;
elog(LOG, "LFC: complete prewarming: loaded %ld pages", (long)n_received);
}
/*
* Check if page is present in the cache.
* Returns true if page is found in local cache.
@@ -1036,6 +1288,9 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return false;
if (!lfc_ensure_opened())
return false;
CopyNRelFileInfoToBufTag(tag, rinfo);
tag.forkNum = forknum;
@@ -1684,3 +1939,69 @@ approximate_working_set_size(PG_FUNCTION_ARGS)
}
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(get_local_cache_state);
Datum
get_local_cache_state(PG_FUNCTION_ARGS)
{
size_t n_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0);
FileCacheStateEntry* fs = lfc_get_state(&n_entries);
if (fs != NULL)
{
size_t size_in_bytes = sizeof(FileCacheStateEntry) * n_entries;
bytea* res = (bytea*)palloc(VARHDRSZ + size_in_bytes);
SET_VARSIZE(res, VARHDRSZ + size_in_bytes);
memcpy(VARDATA(res), fs, size_in_bytes);
pfree(fs);
PG_RETURN_BYTEA_P(res);
}
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(prewarm_local_cache);
Datum
prewarm_local_cache(PG_FUNCTION_ARGS)
{
bytea* state = PG_GETARG_BYTEA_PP(0);
uint32 n_entries = VARSIZE_ANY_EXHDR(state)/sizeof(FileCacheStateEntry);
FileCacheStateEntry* fs = (FileCacheStateEntry*)VARDATA_ANY(state);
lfc_prewarm(fs, n_entries);
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(get_prewarm_info);
Datum
get_prewarm_info(PG_FUNCTION_ARGS)
{
Datum values[4];
bool nulls[4];
TupleDesc tupdesc;
if (lfc_size_limit == 0)
PG_RETURN_NULL();
tupdesc = CreateTemplateTupleDesc(4);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "total_chunks", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "curr_chunk", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prewarmed_pages", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "skipped_pages", INT4OID, -1, 0);
tupdesc = BlessTupleDesc(tupdesc);
MemSet(nulls, 0, sizeof(nulls));
LWLockAcquire(lfc_lock, LW_SHARED);
values[0] = Int32GetDatum(lfc_ctl->prewarm_total_chunks);
values[1] = Int32GetDatum(lfc_ctl->prewarm_curr_chunk);
values[2] = Int32GetDatum(lfc_ctl->prewarmed_pages);
values[3] = Int32GetDatum(lfc_ctl->skipped_pages);
LWLockRelease(lfc_lock);
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}

View File

@@ -48,7 +48,6 @@
#define MIN_RECONNECT_INTERVAL_USEC 1000
#define MAX_RECONNECT_INTERVAL_USEC 1000000
enum NeonComputeMode {
CP_MODE_PRIMARY = 0,
CP_MODE_REPLICA,
@@ -994,6 +993,7 @@ pageserver_send(shardno_t shard_no, NeonRequest *request)
pageserver_conn = NULL;
}
request->reqid = GENERATE_REQUEST_ID();
req_buff = nm_pack_request(request);
/*

View File

@@ -0,0 +1,22 @@
\echo Use "ALTER EXTENSION neon UPDATE TO '1.6'" to load this file. \quit
CREATE FUNCTION get_prewarm_info(out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer)
RETURNS record
AS 'MODULE_PATHNAME', 'get_prewarm_info'
LANGUAGE C STRICT
PARALLEL SAFE;
CREATE FUNCTION get_local_cache_state(max_chunks integer default null)
RETURNS bytea
AS 'MODULE_PATHNAME', 'get_local_cache_state'
LANGUAGE C
PARALLEL UNSAFE;
CREATE FUNCTION prewarm_local_cache(state bytea)
RETURNS void
AS 'MODULE_PATHNAME', 'prewarm_local_cache'
LANGUAGE C STRICT
PARALLEL UNSAFE;

View File

@@ -0,0 +1,7 @@
DROP FUNCTION IF EXISTS get_prewarm_info(out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer);
DROP FUNCTION IF EXISTS get_local_cache_state(max_chunks integer);
DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea);

View File

@@ -56,7 +56,6 @@ extern uint32 WAIT_EVENT_NEON_WAL_DL;
(errmsg(NEON_TAG "[shard %d] " fmt, shard_no, ##__VA_ARGS__), \
errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0)))
extern void pg_init_libpagestore(void);
extern void pg_init_walproposer(void);

View File

@@ -65,7 +65,6 @@ typedef enum {
SLRU_MULTIXACT_OFFSETS
} SlruKind;
/*--
* supertype of all the Neon*Request structs below.
*

View File

@@ -0,0 +1,130 @@
import random
import threading
import time
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import USE_LFC
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_lfc_prewarm(neon_simple_env: NeonEnv):
env = neon_simple_env
n_records = 1000000
endpoint = env.endpoints.create_start(
branch_name="main",
config_lines=[
"autovacuum = off",
"shared_buffers=1MB",
"neon.max_file_cache_size=1GB",
"neon.file_cache_size_limit=1GB",
"neon.file_cache_prewarm_limit=1000",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon version '1.6'")
cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))")
cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))")
cur.execute("select get_local_cache_state()")
lfc_state = cur.fetchall()[0][0]
endpoint.stop()
endpoint.start()
conn = endpoint.connect()
cur = conn.cursor()
time.sleep(1) # wait until compute_ctl complete downgrade of extension to default version
cur.execute("alter extension neon update to '1.6'")
cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'")
lfc_used_pages = cur.fetchall()[0][0]
log.info(f"Used LFC size: {lfc_used_pages}")
cur.execute("select * from get_prewarm_info()")
prewarm_info = cur.fetchall()[0]
log.info(f"Prewarm info: {prewarm_info}")
log.info(f"Prewarm progress: {prewarm_info[1]*100//prewarm_info[0]}%")
assert lfc_used_pages > 10000
assert prewarm_info[0] > 0 and prewarm_info[0] == prewarm_info[1]
cur.execute("select sum(pk) from t")
assert cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
assert prewarm_info[1] > 0
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv):
env = neon_simple_env
n_records = 1000000
n_threads = 4
endpoint = env.endpoints.create_start(
branch_name="main",
config_lines=[
"shared_buffers=1MB",
"neon.max_file_cache_size=1GB",
"neon.file_cache_size_limit=1GB",
"neon.file_cache_prewarm_limit=1000000",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon version '1.6'")
cur.execute(
"create table accounts(id integer primary key, balance bigint default 0, payload text default repeat('?', 128))"
)
cur.execute(f"insert into accounts(id) values (generate_series(1,{n_records}))")
cur.execute("select get_local_cache_state()")
lfc_state = cur.fetchall()[0][0]
running = True
def workload():
conn = endpoint.connect()
cur = conn.cursor()
n_transfers = 0
while running:
src = random.randint(1, n_records)
dst = random.randint(1, n_records)
cur.execute("update accounts set balance=balance-100 where id=%s", (src,))
cur.execute("update accounts set balance=balance+100 where id=%s", (dst,))
n_transfers += 1
log.info(f"Number of transfers: {n_transfers}")
def prewarm():
conn = endpoint.connect()
cur = conn.cursor()
n_prewarms = 0
while running:
cur.execute("alter system set neon.file_cache_size_limit='1MB'")
cur.execute("select pg_reload_conf()")
cur.execute("alter system set neon.file_cache_size_limit='1GB'")
cur.execute("select pg_reload_conf()")
cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
n_prewarms += 1
log.info(f"Number of prewarms: {n_prewarms}")
workload_threads = []
for _ in range(n_threads):
t = threading.Thread(target=workload)
workload_threads.append(t)
t.start()
prewarm_thread = threading.Thread(target=prewarm)
prewarm_thread.start()
time.sleep(100)
running = False
for t in workload_threads:
t.join()
prewarm_thread.join()
cur.execute("select sum(balance) from accounts")
total_balance = cur.fetchall()[0][0]
assert total_balance == 0