Prewarm implementation (#11741)

## Problem

Continue work on prewarm started in PR
https://github.com/neondatabase/neon/pull/11740

## Summary of changes

Implement prewarm using prefetch

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
This commit is contained in:
Konstantin Knizhnik
2025-04-28 21:17:03 +03:00
committed by GitHub
parent 0482690534
commit 6d6b83e737
2 changed files with 397 additions and 6 deletions

View File

@@ -98,7 +98,6 @@
#define MB ((uint64)1024*1024)
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ >> lfc_chunk_size_log))
#define BLOCK_TO_CHUNK_OFF(blkno) ((blkno) & (lfc_blocks_per_chunk-1))
/*
@@ -135,6 +134,15 @@ typedef struct FileCacheEntry
#define N_COND_VARS 64
#define CV_WAIT_TIMEOUT 10
#define MAX_PREWARM_WORKERS 8
typedef struct PrewarmWorkerState
{
uint32 prewarmed_pages;
uint32 skipped_pages;
TimestampTz completed;
} PrewarmWorkerState;
typedef struct FileCacheControl
{
uint64 generation; /* generation is needed to handle correct hash
@@ -156,25 +164,43 @@ typedef struct FileCacheControl
dlist_head holes; /* double linked list of punched holes */
HyperLogLogState wss_estimation; /* estimation of working set size */
ConditionVariable cv[N_COND_VARS]; /* turnstile of condition variables */
PrewarmWorkerState prewarm_workers[MAX_PREWARM_WORKERS];
size_t n_prewarm_workers;
size_t n_prewarm_entries;
size_t total_prewarm_pages;
size_t prewarm_batch;
bool prewarm_active;
bool prewarm_canceled;
dsm_handle prewarm_lfc_state_handle;
} FileCacheControl;
bool lfc_store_prefetch_result;
#define FILE_CACHE_STATE_MAGIC 0xfcfcfcfc
#define FILE_CACHE_STATE_BITMAP(fcs) ((uint8*)&(fcs)->chunks[(fcs)->n_chunks])
#define FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_chunks) (sizeof(FileCacheState) + (n_chunks)*sizeof(BufferTag) + (((n_chunks) * lfc_blocks_per_chunk)+7)/8)
#define FILE_CACHE_STATE_SIZE(fcs) (sizeof(FileCacheState) + (fcs->n_chunks)*sizeof(BufferTag) + (((fcs->n_chunks) << fcs->chunk_size_log)+7)/8)
static HTAB *lfc_hash;
static int lfc_desc = -1;
static LWLockId lfc_lock;
static int lfc_max_size;
static int lfc_size_limit;
static int lfc_prewarm_limit;
static int lfc_prewarm_batch;
static int lfc_chunk_size_log = MAX_BLOCKS_PER_CHUNK_LOG;
static int lfc_blocks_per_chunk = MAX_BLOCKS_PER_CHUNK;
static char *lfc_path;
static uint64 lfc_generation;
static FileCacheControl *lfc_ctl;
static bool lfc_do_prewarm;
static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM>=150000
static shmem_request_hook_type prev_shmem_request_hook;
#endif
bool lfc_store_prefetch_result;
bool lfc_prewarm_update_ws_estimation;
#define LFC_ENABLED() (lfc_ctl->limit != 0)
/*
@@ -500,6 +526,17 @@ lfc_init(void)
NULL,
NULL);
DefineCustomBoolVariable("neon.prewarm_update_ws_estimation",
"Consider prewarmed pages for working set estimation",
NULL,
&lfc_prewarm_update_ws_estimation,
true,
PGC_SUSET,
0,
NULL,
NULL,
NULL);
DefineCustomIntVariable("neon.max_file_cache_size",
"Maximal size of Neon local file cache",
NULL,
@@ -550,6 +587,32 @@ lfc_init(void)
lfc_change_chunk_size,
NULL);
DefineCustomIntVariable("neon.file_cache_prewarm_limit",
"Maximal number of prewarmed chunks",
NULL,
&lfc_prewarm_limit,
INT_MAX, /* no limit by default */
0,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
DefineCustomIntVariable("neon.file_cache_prewarm_batch",
"Number of pages retrivied by prewarm from page server",
NULL,
&lfc_prewarm_batch,
64,
1,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
if (lfc_max_size == 0)
return;
@@ -563,6 +626,314 @@ lfc_init(void)
#endif
}
FileCacheState*
lfc_get_state(size_t max_entries)
{
FileCacheState* fcs = NULL;
if (lfc_maybe_disabled() || max_entries == 0) /* fast exit if file cache is disabled */
return NULL;
LWLockAcquire(lfc_lock, LW_SHARED);
if (LFC_ENABLED())
{
dlist_iter iter;
size_t i = 0;
uint8* bitmap;
size_t n_pages = 0;
size_t n_entries = Min(max_entries, lfc_ctl->used - lfc_ctl->pinned);
size_t state_size = FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_entries);
fcs = (FileCacheState*)palloc0(state_size);
SET_VARSIZE(fcs, state_size);
fcs->magic = FILE_CACHE_STATE_MAGIC;
fcs->chunk_size_log = lfc_chunk_size_log;
fcs->n_chunks = n_entries;
bitmap = FILE_CACHE_STATE_BITMAP(fcs);
dlist_reverse_foreach(iter, &lfc_ctl->lru)
{
FileCacheEntry *entry = dlist_container(FileCacheEntry, list_node, iter.cur);
fcs->chunks[i] = entry->key;
for (int j = 0; j < lfc_blocks_per_chunk; j++)
{
if (GET_STATE(entry, j) != UNAVAILABLE)
{
BITMAP_SET(bitmap, i*lfc_blocks_per_chunk + j);
n_pages += 1;
}
}
if (++i == n_entries)
break;
}
Assert(i == n_entries);
fcs->n_pages = n_pages;
Assert(pg_popcount((char*)bitmap, ((n_entries << lfc_chunk_size_log) + 7)/8) == n_pages);
elog(LOG, "LFC: save state of %d chunks %d pages", (int)n_entries, (int)n_pages);
}
LWLockRelease(lfc_lock);
return fcs;
}
/*
* Prewarm LFC cache to the specified state. It uses lfc_prefetch function to load prewarmed page without hoilding shared buffer lock
* and avoid race conditions with other backends.
*/
void
lfc_prewarm(FileCacheState* fcs, uint32 n_workers)
{
size_t fcs_chunk_size_log;
size_t n_entries;
size_t prewarm_batch = Min(lfc_prewarm_batch, readahead_buffer_size);
size_t fcs_size;
dsm_segment *seg;
BackgroundWorkerHandle* bgw_handle[MAX_PREWARM_WORKERS];
if (!lfc_ensure_opened())
return;
if (prewarm_batch == 0 || lfc_prewarm_limit == 0 || n_workers == 0)
{
elog(LOG, "LFC: prewarm is disabled");
return;
}
if (n_workers > MAX_PREWARM_WORKERS)
{
elog(ERROR, "LFC: Too much prewarm workers, maximum is %d", MAX_PREWARM_WORKERS);
}
if (fcs == NULL || fcs->n_chunks == 0)
{
elog(LOG, "LFC: nothing to prewarm");
return;
}
if (fcs->magic != FILE_CACHE_STATE_MAGIC)
{
elog(ERROR, "LFC: Invalid file cache state magic: %X", fcs->magic);
}
fcs_size = VARSIZE(fcs);
if (FILE_CACHE_STATE_SIZE(fcs) != fcs_size)
{
elog(ERROR, "LFC: Invalid file cache state size: %u vs. %u", (unsigned)FILE_CACHE_STATE_SIZE(fcs), VARSIZE(fcs));
}
fcs_chunk_size_log = fcs->chunk_size_log;
if (fcs_chunk_size_log > MAX_BLOCKS_PER_CHUNK_LOG)
{
elog(ERROR, "LFC: Invalid chunk size log: %u", fcs->chunk_size_log);
}
n_entries = Min(fcs->n_chunks, lfc_prewarm_limit);
Assert(n_entries != 0);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
/* Do not prewarm more entries than LFC limit */
if (lfc_ctl->limit <= lfc_ctl->size)
{
elog(LOG, "LFC: skip prewarm because LFC is already filled");
LWLockRelease(lfc_lock);
return;
}
if (lfc_ctl->prewarm_active)
{
LWLockRelease(lfc_lock);
elog(ERROR, "LFC: skip prewarm because another prewarm is still active");
}
lfc_ctl->n_prewarm_entries = n_entries;
lfc_ctl->n_prewarm_workers = n_workers;
lfc_ctl->prewarm_active = true;
lfc_ctl->prewarm_canceled = false;
lfc_ctl->prewarm_batch = prewarm_batch;
memset(lfc_ctl->prewarm_workers, 0, n_workers*sizeof(PrewarmWorkerState));
LWLockRelease(lfc_lock);
/* Calculate total number of pages to be prewarmed */
lfc_ctl->total_prewarm_pages = fcs->n_pages;
seg = dsm_create(fcs_size, 0);
memcpy(dsm_segment_address(seg), fcs, fcs_size);
lfc_ctl->prewarm_lfc_state_handle = dsm_segment_handle(seg);
/* Spawn background workers */
for (uint32 i = 0; i < n_workers; i++)
{
BackgroundWorker worker = {0};
worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
worker.bgw_start_time = BgWorkerStart_ConsistentState;
worker.bgw_restart_time = BGW_NEVER_RESTART;
strcpy(worker.bgw_library_name, "neon");
strcpy(worker.bgw_function_name, "lfc_prewarm_main");
snprintf(worker.bgw_name, BGW_MAXLEN, "LFC prewarm worker %d", i+1);
strcpy(worker.bgw_type, "LFC prewarm worker");
worker.bgw_main_arg = Int32GetDatum(i);
/* must set notify PID to wait for shutdown */
worker.bgw_notify_pid = MyProcPid;
if (!RegisterDynamicBackgroundWorker(&worker, &bgw_handle[i]))
{
ereport(LOG,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("LFC: registering dynamic bgworker prewarm failed"),
errhint("Consider increasing the configuration parameter \"%s\".", "max_worker_processes")));
n_workers = i;
lfc_ctl->prewarm_canceled = true;
break;
}
}
for (uint32 i = 0; i < n_workers; i++)
{
while (true)
{
PG_TRY();
{
BgwHandleStatus status = WaitForBackgroundWorkerShutdown(bgw_handle[i]);
if (status != BGWH_STOPPED && status != BGWH_POSTMASTER_DIED)
{
elog(LOG, "LFC: Unexpected status of prewarm worker termination: %d", status);
}
break;
}
PG_CATCH();
{
elog(LOG, "LFC: cancel prewarm");
lfc_ctl->prewarm_canceled = true;
}
PG_END_TRY();
}
if (!lfc_ctl->prewarm_workers[i].completed)
{
/* Background worker doesn't set completion time: it means that it was abnormally terminated */
elog(LOG, "LFC: prewarm worker %d failed", i+1);
/* Set completion time to prevent get_prewarm_info from considering this worker as active */
lfc_ctl->prewarm_workers[i].completed = GetCurrentTimestamp();
}
}
dsm_detach(seg);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
lfc_ctl->prewarm_active = false;
LWLockRelease(lfc_lock);
}
void
lfc_prewarm_main(Datum main_arg)
{
size_t snd_idx = 0, rcv_idx = 0;
size_t n_sent = 0, n_received = 0;
size_t fcs_chunk_size_log;
size_t max_prefetch_pages;
size_t prewarm_batch;
size_t n_workers;
dsm_segment *seg;
FileCacheState* fcs;
uint8* bitmap;
BufferTag tag;
PrewarmWorkerState* ws;
uint32 worker_id = DatumGetInt32(main_arg);
pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();
seg = dsm_attach(lfc_ctl->prewarm_lfc_state_handle);
if (seg == NULL)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not map dynamic shared memory segment")));
fcs = (FileCacheState*) dsm_segment_address(seg);
prewarm_batch = lfc_ctl->prewarm_batch;
fcs_chunk_size_log = fcs->chunk_size_log;
n_workers = lfc_ctl->n_prewarm_workers;
max_prefetch_pages = lfc_ctl->n_prewarm_entries << fcs_chunk_size_log;
ws = &lfc_ctl->prewarm_workers[worker_id];
bitmap = FILE_CACHE_STATE_BITMAP(fcs);
/* enable prefetch in LFC */
lfc_store_prefetch_result = true;
lfc_do_prewarm = true; /* Flag for lfc_prefetch preventing replacement of existed entries if LFC cache is full */
elog(LOG, "LFC: worker %d start prewarming", worker_id);
while (!lfc_ctl->prewarm_canceled)
{
if (snd_idx < max_prefetch_pages)
{
if ((snd_idx >> fcs_chunk_size_log) % n_workers != worker_id)
{
/* If there are multiple workers, split chunks between them */
snd_idx += 1 << fcs_chunk_size_log;
}
else
{
if (BITMAP_ISSET(bitmap, snd_idx))
{
tag = fcs->chunks[snd_idx >> fcs_chunk_size_log];
tag.blockNum += snd_idx & ((1 << fcs_chunk_size_log) - 1);
if (!lfc_cache_contains(BufTagGetNRelFileInfo(tag), tag.forkNum, tag.blockNum))
{
(void)communicator_prefetch_register_bufferv(tag, NULL, 1, NULL);
n_sent += 1;
}
else
{
ws->skipped_pages += 1;
BITMAP_CLR(bitmap, snd_idx);
}
}
snd_idx += 1;
}
}
if (n_sent >= n_received + prewarm_batch || snd_idx == max_prefetch_pages)
{
if (n_received == n_sent && snd_idx == max_prefetch_pages)
{
break;
}
if ((rcv_idx >> fcs_chunk_size_log) % n_workers != worker_id)
{
/* Skip chunks processed by other workers */
rcv_idx += 1 << fcs_chunk_size_log;
continue;
}
/* Locate next block to prefetch */
while (!BITMAP_ISSET(bitmap, rcv_idx))
{
rcv_idx += 1;
}
tag = fcs->chunks[rcv_idx >> fcs_chunk_size_log];
tag.blockNum += rcv_idx & ((1 << fcs_chunk_size_log) - 1);
if (communicator_prefetch_receive(tag))
{
ws->prewarmed_pages += 1;
}
else
{
ws->skipped_pages += 1;
}
rcv_idx += 1;
n_received += 1;
}
}
/* No need to perform prefetch cleanup here because prewarm worker will be terminated and
* connection to PS dropped just after return from this function.
*/
Assert(n_sent == n_received || lfc_ctl->prewarm_canceled);
elog(LOG, "LFC: worker %d complete prewarming: loaded %ld pages", worker_id, (long)n_received);
lfc_ctl->prewarm_workers[worker_id].completed = GetCurrentTimestamp();
}
/*
* Check if page is present in the cache.
* Returns true if page is found in local cache.
@@ -1001,8 +1372,11 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash)
* If we can't (e.g. because all other slots are being accessed)
* then we will remove this entry from the hash and continue
* on to the next chunk, as we may not exceed the limit.
*
* While prewarming LFC we do not want to replace existed entries,
* so we just stop prewarm is LFC cache is full.
*/
else if (!dlist_is_empty(&lfc_ctl->lru))
else if (!dlist_is_empty(&lfc_ctl->lru) && !lfc_do_prewarm)
{
/* Cache overflow: evict least recently used chunk */
FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node,
@@ -1026,6 +1400,7 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash)
/* Can't add this chunk - we don't have the space for it */
hash_search_with_hash_value(lfc_hash, &entry->key, hash,
HASH_REMOVE, NULL);
lfc_ctl->prewarm_canceled = true; /* cancel prewarm if LFC limit is reached */
return false;
}
@@ -1112,9 +1487,11 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
tag.blockNum = blkno;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
if (lfc_prewarm_update_ws_estimation)
{
tag.blockNum = blkno;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
}
if (found)
{
state = GET_STATE(entry, chunk_offs);

View File

@@ -13,6 +13,17 @@
#include "neon_pgversioncompat.h"
typedef struct FileCacheState
{
int32 vl_len_; /* varlena header (do not touch directly!) */
uint32 magic;
uint32 n_chunks;
uint32 n_pages;
uint16 chunk_size_log;
BufferTag chunks[FLEXIBLE_ARRAY_MEMBER];
/* followed by bitmap */
} FileCacheState;
/* GUCs */
extern bool lfc_store_prefetch_result;
@@ -32,7 +43,10 @@ extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
extern void lfc_init(void);
extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
const void* buffer, XLogRecPtr lsn);
extern FileCacheState* lfc_get_state(size_t max_entries);
extern void lfc_prewarm(FileCacheState* fcs, uint32 n_workers);
PGDLLEXPORT void lfc_prewarm_main(Datum main_arg);
static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,