mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-03 13:30:38 +00:00
Add prewarm_local_cache and get_local_cache_state functions
This commit is contained in:
@@ -143,8 +143,6 @@ typedef struct FileCacheStateEntry
|
||||
uint32 bitmap[CHUNK_BITMAP_SIZE];
|
||||
} FileCacheStateEntry;
|
||||
|
||||
static FileCacheStateEntry* lfc_state;
|
||||
static size_t lfc_state_size;
|
||||
static HTAB *lfc_hash;
|
||||
static int lfc_desc = 0;
|
||||
static LWLockId lfc_lock;
|
||||
@@ -160,7 +158,6 @@ static shmem_request_hook_type prev_shmem_request_hook;
|
||||
#endif
|
||||
static CustomCheckpointHookType PrevCheckpointHook;
|
||||
|
||||
static void lfc_init_prewarm(void);
|
||||
|
||||
#define LFC_ENABLED() (lfc_ctl->limit != 0)
|
||||
|
||||
@@ -339,11 +336,6 @@ lfc_shmem_startup(void)
|
||||
{
|
||||
close(fd);
|
||||
lfc_ctl->limit = SIZE_MB_TO_CHUNKS(lfc_size_limit);
|
||||
/* Prewarming of replica has no sense because if WAL record's target page is not present in shared buffer, then correspondent LFC entry is invalidated */
|
||||
if (LFC_ENABLED() && lfc_prewarm_limit != 0)
|
||||
{
|
||||
lfc_init_prewarm();
|
||||
}
|
||||
}
|
||||
}
|
||||
LWLockRelease(AddinShmemInitLock);
|
||||
@@ -545,23 +537,15 @@ lfc_init(void)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Save state of local file cache as AUX file. Size of saved state is limited by lfc_prewarm_limit.
|
||||
* This function saves first mostrecently used pages.
|
||||
* It is expected to be called at shutdown checkpoint by checkpointer.
|
||||
*/
|
||||
void
|
||||
lfc_save_state(void)
|
||||
static FileCacheStateEntry*
|
||||
lfc_get_state(size_t* n_entries)
|
||||
{
|
||||
size_t i = 0, max_entries = lfc_prewarm_limit;
|
||||
FileCacheStateEntry* fs;
|
||||
|
||||
if (max_entries == 0)
|
||||
return;
|
||||
|
||||
fs = (FileCacheStateEntry*)malloc(sizeof(FileCacheStateEntry) * max_entries);
|
||||
size_t max_entries = *n_entries;
|
||||
size_t i = 0;
|
||||
FileCacheStateEntry* fs = (FileCacheStateEntry*)palloc(sizeof(FileCacheStateEntry) * max_entries);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_SHARED);
|
||||
|
||||
if (LFC_ENABLED())
|
||||
{
|
||||
dlist_iter iter;
|
||||
@@ -575,83 +559,43 @@ lfc_save_state(void)
|
||||
}
|
||||
elog(LOG, "LFC: save state of %ld chunks", (long)i);
|
||||
}
|
||||
|
||||
LWLockRelease(lfc_lock);
|
||||
|
||||
if (i != 0)
|
||||
*n_entries = i;
|
||||
return fs;
|
||||
}
|
||||
|
||||
/*
|
||||
* Save state of local file cache as AUX file. Size of saved state is limited by lfc_prewarm_limit.
|
||||
* This function saves first mostrecently used pages.
|
||||
* It is expected to be called at shutdown checkpoint by checkpointer.
|
||||
*/
|
||||
void
|
||||
lfc_save_state(void)
|
||||
{
|
||||
size_t n_entries = lfc_prewarm_limit;
|
||||
FileCacheStateEntry* fs;
|
||||
|
||||
if (n_entries == 0)
|
||||
return;
|
||||
|
||||
fs = lfc_get_state(&n_entries);
|
||||
if (n_entries != 0)
|
||||
{
|
||||
#if PG_MAJORVERSION_NUM < 17
|
||||
XLogFlush(LogLogicalMessage("neon-file:lfc.state", (char const*)fs, sizeof(FileCacheStateEntry) * i, false));
|
||||
XLogFlush(LogLogicalMessage("neon-file:lfc.state", (char const*)fs, sizeof(FileCacheStateEntry) * n_entries, false));
|
||||
#else
|
||||
LogLogicalMessage("neon-file:lfc.state", (char const*)fs, sizeof(FileCacheStateEntry) * i, false, true);
|
||||
LogLogicalMessage("neon-file:lfc.state", (char const*)fs, sizeof(FileCacheStateEntry) * n_entries, false, true);
|
||||
#endif
|
||||
}
|
||||
pfree(fs);
|
||||
}
|
||||
|
||||
/*
|
||||
* Load LFC state and enter entries in hash table.
|
||||
* It is needed to track modification of prewarmed pages.
|
||||
* All such entries have `prewarm` flag set. When entry is updated (some backed reads or writes
|
||||
* some pages from this chunk), then `prewarm` flag is cleared, prohibiting prefetch for this chunk.
|
||||
* It prevents overwritting page updated or loaded by backend with older one, loaded by prewarm.
|
||||
* This function is called while LFC initialization: no synchronization is needed.
|
||||
*/
|
||||
static void
|
||||
lfc_init_prewarm(void)
|
||||
{
|
||||
FileCacheStateEntry* fs;
|
||||
ssize_t rc;
|
||||
size_t i, max_entries = lfc_prewarm_limit;
|
||||
uint32_t hash;
|
||||
FileCacheEntry *entry;
|
||||
int fd;
|
||||
|
||||
fd = OpenTransientFile("lfc.state", O_RDONLY | PG_BINARY);
|
||||
if (fd < 0)
|
||||
{
|
||||
elog(LOG, "LFC: state file is missing");
|
||||
return;
|
||||
}
|
||||
|
||||
fs = (FileCacheStateEntry*)malloc(sizeof(FileCacheStateEntry) * max_entries);
|
||||
rc = read(fd, fs, sizeof(FileCacheStateEntry) * max_entries);
|
||||
if (rc <= 0)
|
||||
{
|
||||
elog(LOG, "LFC: Failed to read state file: %m");
|
||||
CloseTransientFile(fd);
|
||||
free(fs);
|
||||
return;
|
||||
}
|
||||
CloseTransientFile(fd);
|
||||
|
||||
/* Do not try to load more than fits in LFC */
|
||||
max_entries = Min(rc / sizeof(FileCacheStateEntry), lfc_ctl->limit);
|
||||
lfc_ctl->prewarm_total_chunks = max_entries;
|
||||
elog(LOG, "LFC: read state with %lu entries", (long)(rc / sizeof(FileCacheStateEntry)));
|
||||
|
||||
for (i = 0; i < max_entries; i++)
|
||||
{
|
||||
hash = get_hash_value(lfc_hash, &fs[i].key);
|
||||
entry = hash_search_with_hash_value(lfc_hash, &fs[i].key, hash, HASH_ENTER, NULL);
|
||||
entry->offset = i;
|
||||
entry->hash = hash;
|
||||
entry->access_count = 0;
|
||||
entry->prewarm_requested = true;
|
||||
entry->prewarm_started = false;
|
||||
memset(entry->bitmap, 0, sizeof entry->bitmap);
|
||||
/* Most recently visted pages are stored first */
|
||||
dlist_push_head(&lfc_ctl->lru, &entry->list_node);
|
||||
}
|
||||
Assert(lfc_ctl->size == 0);
|
||||
lfc_ctl->used = lfc_ctl->size = max_entries;
|
||||
lfc_state = fs;
|
||||
lfc_state_size = max_entries;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Load pages from saved LFC state.
|
||||
* Prewarm LFC cache to the specified state.
|
||||
*
|
||||
* Load is done by backgraound work. It can interfere with
|
||||
* Prewarming can interfere with
|
||||
* accessed to the pages by other backends. Usually access to LFC is protected by shared buffers: when Postgres
|
||||
* is reading page, it pins shared buffer and enforces that only one backend is reading it, while other are waiting read completion.
|
||||
*
|
||||
@@ -660,8 +604,8 @@ lfc_init_prewarm(void)
|
||||
* is performed without holding locks. So it can happen that two or more processes write different content to the same location in the LFC file.
|
||||
* Certainly we can not rely on disk content in this case.
|
||||
*
|
||||
* To solve this problem with use two flags in LFC entry: `prewarm_requested` and `prewarm_started`. First is set by `lfc_init_prewarm` when it loads saved LFC state.
|
||||
* Prewarm BGW perform write to LFC file only if this flag is set. This flag is cleared if any other backend perform write to this LFC chunk.
|
||||
* To solve this problem with use two flags in LFC entry: `prewarm_requested` and `prewarm_started`. First is set befroe prewarm is actually started.
|
||||
* lfc_prewarm writes to LFC file only if this flag is set. This flag is cleared if any other backend perform write to this LFC chunk.
|
||||
* In this case data loaded by prewarm BGW is considered to be deteriorated and should be just ignored.
|
||||
*
|
||||
* But as bat as far as write to LFC is performed without holding lock, there is no guarantee that such write is in progress.
|
||||
@@ -669,8 +613,9 @@ lfc_init_prewarm(void)
|
||||
* Any other backend writing to LFC should abandon it's write to LFC file (just not mark page as loaded in bitmap) once it sees this flag.
|
||||
* So nether prewarm BGW, nether backend are saving page in LFC - it is just skipped.
|
||||
*/
|
||||
void
|
||||
lfc_load_pages(void)
|
||||
|
||||
static void
|
||||
lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries)
|
||||
{
|
||||
ssize_t rc;
|
||||
size_t snd_idx = 0, rcv_idx = 0;
|
||||
@@ -679,25 +624,70 @@ lfc_load_pages(void)
|
||||
uint64 generation;
|
||||
uint32 entry_offset;
|
||||
uint32 hash;
|
||||
size_t i;
|
||||
bool found;
|
||||
int shard_no;
|
||||
FileCacheStateEntry* fs = lfc_state;
|
||||
size_t max_entries = lfc_state_size;
|
||||
|
||||
if (!lfc_ensure_opened())
|
||||
return;
|
||||
|
||||
if (max_entries == 0 || fs == NULL)
|
||||
if (n_entries == 0 || fs == NULL)
|
||||
{
|
||||
elog(LOG, "LFC: prewarm is disabled");
|
||||
return;
|
||||
}
|
||||
elog(LOG, "LFC: start loading %ld chunks", (long)max_entries);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
/* Do not prewarm more entries than LFC limit */
|
||||
if (lfc_ctl->limit <= lfc_ctl->size)
|
||||
{
|
||||
LWLockRelease(lfc_lock);
|
||||
return;
|
||||
}
|
||||
if (n_entries > lfc_ctl->limit - lfc_ctl->size)
|
||||
{
|
||||
n_entries = lfc_ctl->limit - lfc_ctl->size;
|
||||
}
|
||||
|
||||
lfc_ctl->prewarm_total_chunks = n_entries;
|
||||
lfc_ctl->prewarm_curr_chunk = 0;
|
||||
|
||||
/*
|
||||
* Load LFC state and enter entries in hash table.
|
||||
* It is needed to track modification of prewarmed pages.
|
||||
* All such entries have `prewarm` flag set. When entry is updated (some backed reads or writes
|
||||
* some pages from this chunk), then `prewarm` flag is cleared, prohibiting prefetch for this chunk.
|
||||
* It prevents overwritting page updated or loaded by backend with older one, loaded by prewarm.
|
||||
* This function is called while LFC initialization: no synchronization is needed.
|
||||
*/
|
||||
for (i = 0; i < n_entries; i++)
|
||||
{
|
||||
hash = get_hash_value(lfc_hash, &fs[i].key);
|
||||
entry = hash_search_with_hash_value(lfc_hash, &fs[i].key, hash, HASH_ENTER, &found);
|
||||
/* Do not prewarm chunks which are already present in LFC */
|
||||
if (!found)
|
||||
{
|
||||
entry->offset = lfc_ctl->size++;
|
||||
entry->hash = hash;
|
||||
entry->access_count = 0;
|
||||
entry->prewarm_requested = true;
|
||||
entry->prewarm_started = false;
|
||||
memset(entry->bitmap, 0, sizeof entry->bitmap);
|
||||
/* Most recently visted pages are stored first */
|
||||
dlist_push_head(&lfc_ctl->lru, &entry->list_node);
|
||||
lfc_ctl->used += 1;
|
||||
}
|
||||
}
|
||||
LWLockRelease(lfc_lock);
|
||||
|
||||
elog(LOG, "LFC: start loading %ld chunks", (long)n_entries);
|
||||
|
||||
while (true)
|
||||
{
|
||||
size_t chunk_no = snd_idx / BLOCKS_PER_CHUNK;
|
||||
size_t offs_in_chunk = snd_idx % BLOCKS_PER_CHUNK;
|
||||
if (chunk_no < max_entries)
|
||||
if (chunk_no < n_entries)
|
||||
{
|
||||
if (fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31)))
|
||||
{
|
||||
@@ -727,7 +717,7 @@ lfc_load_pages(void)
|
||||
}
|
||||
snd_idx += 1;
|
||||
}
|
||||
if (n_sent >= n_received + lfc_prewarm_batch || chunk_no == max_entries)
|
||||
if (n_sent >= n_received + lfc_prewarm_batch || chunk_no == n_entries)
|
||||
{
|
||||
NeonResponse * resp;
|
||||
do
|
||||
@@ -810,12 +800,47 @@ lfc_load_pages(void)
|
||||
}
|
||||
}
|
||||
}
|
||||
lfc_ctl->prewarm_curr_chunk = max_entries;
|
||||
free(fs);
|
||||
lfc_ctl->prewarm_curr_chunk = n_entries;
|
||||
elog(LOG, "LFC: complete prewarming: loaded %ld pages", (long)n_received);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Load pages from LFC state saved in AUX file.
|
||||
*/
|
||||
void
|
||||
lfc_load_pages(void)
|
||||
{
|
||||
int fd;
|
||||
FileCacheStateEntry *fs;
|
||||
ssize_t rc;
|
||||
size_t max_entries = lfc_prewarm_limit;
|
||||
|
||||
fd = OpenTransientFile("lfc.state", O_RDONLY | PG_BINARY);
|
||||
if (fd < 0)
|
||||
{
|
||||
elog(LOG, "LFC: state file is missing");
|
||||
return;
|
||||
}
|
||||
|
||||
fs = (FileCacheStateEntry*)palloc(sizeof(FileCacheStateEntry) * max_entries);
|
||||
rc = read(fd, fs, sizeof(FileCacheStateEntry) * max_entries);
|
||||
if (rc <= 0)
|
||||
{
|
||||
elog(LOG, "LFC: Failed to read state file: %m");
|
||||
CloseTransientFile(fd);
|
||||
free(fs);
|
||||
return;
|
||||
}
|
||||
CloseTransientFile(fd);
|
||||
elog(LOG, "LFC: read state with %lu entries", (long)(rc / sizeof(FileCacheStateEntry)));
|
||||
|
||||
lfc_prewarm(fs, rc / sizeof(FileCacheStateEntry));
|
||||
|
||||
pfree(fs);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Check if page is present in the cache.
|
||||
* Returns true if page is found in local cache.
|
||||
@@ -1725,6 +1750,37 @@ save_local_cache_state(PG_FUNCTION_ARGS)
|
||||
PG_RETURN_NULL();
|
||||
}
|
||||
|
||||
PG_FUNCTION_INFO_V1(get_local_cache_state);
|
||||
|
||||
Datum
|
||||
get_local_cache_state(PG_FUNCTION_ARGS)
|
||||
{
|
||||
size_t n_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0);
|
||||
FileCacheStateEntry* fs = lfc_get_state(&n_entries);
|
||||
size_t size_in_bytes = sizeof(FileCacheStateEntry) * n_entries;
|
||||
bytea* res = (bytea*)palloc(VARHDRSZ + size_in_bytes);
|
||||
|
||||
SET_VARSIZE(res, VARHDRSZ + size_in_bytes);
|
||||
memcpy(VARDATA(res), fs, size_in_bytes);
|
||||
pfree(fs);
|
||||
|
||||
PG_RETURN_BYTEA_P(res);
|
||||
}
|
||||
|
||||
PG_FUNCTION_INFO_V1(prewarm_local_cache);
|
||||
|
||||
Datum
|
||||
prewarm_local_cache(PG_FUNCTION_ARGS)
|
||||
{
|
||||
bytea* state = PG_GETARG_BYTEA_PP(0);
|
||||
uint32 n_entries = VARSIZE_ANY_EXHDR(state);
|
||||
FileCacheStateEntry* fs = (FileCacheStateEntry*)VARDATA_ANY(state);
|
||||
|
||||
lfc_prewarm(fs, n_entries);
|
||||
|
||||
PG_RETURN_NULL();
|
||||
}
|
||||
|
||||
PG_FUNCTION_INFO_V1(get_prewarm_info);
|
||||
|
||||
Datum
|
||||
|
||||
@@ -12,5 +12,17 @@ AS 'MODULE_PATHNAME', 'get_prewarm_info'
|
||||
LANGUAGE C STRICT
|
||||
PARALLEL SAFE;
|
||||
|
||||
CREATE FUNCTION get_local_cache_state(max_chunks integer default null)
|
||||
RETURNS bytea
|
||||
AS 'MODULE_PATHNAME', 'get_local_cache_state'
|
||||
LANGUAGE C
|
||||
PARALLEL UNSAFE;
|
||||
|
||||
CREATE FUNCTION prewarm_local_cache(state bytea)
|
||||
RETURNS void
|
||||
AS 'MODULE_PATHNAME', 'prewarm_local_cache'
|
||||
LANGUAGE C STRICT
|
||||
PARALLEL UNSAFE;
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,3 +1,9 @@
|
||||
DROP FUNCTION IF EXISTS save_local_cache_state();
|
||||
|
||||
DROP FUNCTION IF EXISTS get_prewarm_info(out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer);
|
||||
|
||||
DROP FUNCTION IF EXISTS get_local_cache_state(max_chunks integer);
|
||||
|
||||
DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea);
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user