diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index ec5a4973c8..f792ef6c27 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -143,8 +143,6 @@ typedef struct FileCacheStateEntry uint32 bitmap[CHUNK_BITMAP_SIZE]; } FileCacheStateEntry; -static FileCacheStateEntry* lfc_state; -static size_t lfc_state_size; static HTAB *lfc_hash; static int lfc_desc = 0; static LWLockId lfc_lock; @@ -160,7 +158,6 @@ static shmem_request_hook_type prev_shmem_request_hook; #endif static CustomCheckpointHookType PrevCheckpointHook; -static void lfc_init_prewarm(void); #define LFC_ENABLED() (lfc_ctl->limit != 0) @@ -339,11 +336,6 @@ lfc_shmem_startup(void) { close(fd); lfc_ctl->limit = SIZE_MB_TO_CHUNKS(lfc_size_limit); - /* Prewarming of replica has no sense because if WAL record's target page is not present in shared buffer, then correspondent LFC entry is invalidated */ - if (LFC_ENABLED() && lfc_prewarm_limit != 0) - { - lfc_init_prewarm(); - } } } LWLockRelease(AddinShmemInitLock); @@ -545,23 +537,15 @@ lfc_init(void) } } -/* - * Save state of local file cache as AUX file. Size of saved state is limited by lfc_prewarm_limit. - * This function saves first mostrecently used pages. - * It is expected to be called at shutdown checkpoint by checkpointer. - */ -void -lfc_save_state(void) +static FileCacheStateEntry* +lfc_get_state(size_t* n_entries) { - size_t i = 0, max_entries = lfc_prewarm_limit; - FileCacheStateEntry* fs; - - if (max_entries == 0) - return; - - fs = (FileCacheStateEntry*)malloc(sizeof(FileCacheStateEntry) * max_entries); + size_t max_entries = *n_entries; + size_t i = 0; + FileCacheStateEntry* fs = (FileCacheStateEntry*)palloc(sizeof(FileCacheStateEntry) * max_entries); LWLockAcquire(lfc_lock, LW_SHARED); + if (LFC_ENABLED()) { dlist_iter iter; @@ -575,83 +559,43 @@ lfc_save_state(void) } elog(LOG, "LFC: save state of %ld chunks", (long)i); } + LWLockRelease(lfc_lock); - if (i != 0) + *n_entries = i; + return fs; +} + +/* + * Save state of local file cache as AUX file. Size of saved state is limited by lfc_prewarm_limit. + * This function saves first mostrecently used pages. + * It is expected to be called at shutdown checkpoint by checkpointer. + */ +void +lfc_save_state(void) +{ + size_t n_entries = lfc_prewarm_limit; + FileCacheStateEntry* fs; + + if (n_entries == 0) + return; + + fs = lfc_get_state(&n_entries); + if (n_entries != 0) { #if PG_MAJORVERSION_NUM < 17 - XLogFlush(LogLogicalMessage("neon-file:lfc.state", (char const*)fs, sizeof(FileCacheStateEntry) * i, false)); + XLogFlush(LogLogicalMessage("neon-file:lfc.state", (char const*)fs, sizeof(FileCacheStateEntry) * n_entries, false)); #else - LogLogicalMessage("neon-file:lfc.state", (char const*)fs, sizeof(FileCacheStateEntry) * i, false, true); + LogLogicalMessage("neon-file:lfc.state", (char const*)fs, sizeof(FileCacheStateEntry) * n_entries, false, true); #endif } + pfree(fs); } /* - * Load LFC state and enter entries in hash table. - * It is needed to track modification of prewarmed pages. - * All such entries have `prewarm` flag set. When entry is updated (some backed reads or writes - * some pages from this chunk), then `prewarm` flag is cleared, prohibiting prefetch for this chunk. - * It prevents overwritting page updated or loaded by backend with older one, loaded by prewarm. - * This function is called while LFC initialization: no synchronization is needed. - */ -static void -lfc_init_prewarm(void) -{ - FileCacheStateEntry* fs; - ssize_t rc; - size_t i, max_entries = lfc_prewarm_limit; - uint32_t hash; - FileCacheEntry *entry; - int fd; - - fd = OpenTransientFile("lfc.state", O_RDONLY | PG_BINARY); - if (fd < 0) - { - elog(LOG, "LFC: state file is missing"); - return; - } - - fs = (FileCacheStateEntry*)malloc(sizeof(FileCacheStateEntry) * max_entries); - rc = read(fd, fs, sizeof(FileCacheStateEntry) * max_entries); - if (rc <= 0) - { - elog(LOG, "LFC: Failed to read state file: %m"); - CloseTransientFile(fd); - free(fs); - return; - } - CloseTransientFile(fd); - - /* Do not try to load more than fits in LFC */ - max_entries = Min(rc / sizeof(FileCacheStateEntry), lfc_ctl->limit); - lfc_ctl->prewarm_total_chunks = max_entries; - elog(LOG, "LFC: read state with %lu entries", (long)(rc / sizeof(FileCacheStateEntry))); - - for (i = 0; i < max_entries; i++) - { - hash = get_hash_value(lfc_hash, &fs[i].key); - entry = hash_search_with_hash_value(lfc_hash, &fs[i].key, hash, HASH_ENTER, NULL); - entry->offset = i; - entry->hash = hash; - entry->access_count = 0; - entry->prewarm_requested = true; - entry->prewarm_started = false; - memset(entry->bitmap, 0, sizeof entry->bitmap); - /* Most recently visted pages are stored first */ - dlist_push_head(&lfc_ctl->lru, &entry->list_node); - } - Assert(lfc_ctl->size == 0); - lfc_ctl->used = lfc_ctl->size = max_entries; - lfc_state = fs; - lfc_state_size = max_entries; -} - - -/* - * Load pages from saved LFC state. + * Prewarm LFC cache to the specified state. * - * Load is done by backgraound work. It can interfere with + * Prewarming can interfere with * accessed to the pages by other backends. Usually access to LFC is protected by shared buffers: when Postgres * is reading page, it pins shared buffer and enforces that only one backend is reading it, while other are waiting read completion. * @@ -660,8 +604,8 @@ lfc_init_prewarm(void) * is performed without holding locks. So it can happen that two or more processes write different content to the same location in the LFC file. * Certainly we can not rely on disk content in this case. * - * To solve this problem with use two flags in LFC entry: `prewarm_requested` and `prewarm_started`. First is set by `lfc_init_prewarm` when it loads saved LFC state. - * Prewarm BGW perform write to LFC file only if this flag is set. This flag is cleared if any other backend perform write to this LFC chunk. + * To solve this problem with use two flags in LFC entry: `prewarm_requested` and `prewarm_started`. First is set befroe prewarm is actually started. + * lfc_prewarm writes to LFC file only if this flag is set. This flag is cleared if any other backend perform write to this LFC chunk. * In this case data loaded by prewarm BGW is considered to be deteriorated and should be just ignored. * * But as bat as far as write to LFC is performed without holding lock, there is no guarantee that such write is in progress. @@ -669,8 +613,9 @@ lfc_init_prewarm(void) * Any other backend writing to LFC should abandon it's write to LFC file (just not mark page as loaded in bitmap) once it sees this flag. * So nether prewarm BGW, nether backend are saving page in LFC - it is just skipped. */ -void -lfc_load_pages(void) + +static void +lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries) { ssize_t rc; size_t snd_idx = 0, rcv_idx = 0; @@ -679,25 +624,70 @@ lfc_load_pages(void) uint64 generation; uint32 entry_offset; uint32 hash; + size_t i; + bool found; int shard_no; - FileCacheStateEntry* fs = lfc_state; - size_t max_entries = lfc_state_size; if (!lfc_ensure_opened()) return; - if (max_entries == 0 || fs == NULL) + if (n_entries == 0 || fs == NULL) { elog(LOG, "LFC: prewarm is disabled"); return; } - elog(LOG, "LFC: start loading %ld chunks", (long)max_entries); + + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + + /* Do not prewarm more entries than LFC limit */ + if (lfc_ctl->limit <= lfc_ctl->size) + { + LWLockRelease(lfc_lock); + return; + } + if (n_entries > lfc_ctl->limit - lfc_ctl->size) + { + n_entries = lfc_ctl->limit - lfc_ctl->size; + } + + lfc_ctl->prewarm_total_chunks = n_entries; + lfc_ctl->prewarm_curr_chunk = 0; + + /* + * Load LFC state and enter entries in hash table. + * It is needed to track modification of prewarmed pages. + * All such entries have `prewarm` flag set. When entry is updated (some backed reads or writes + * some pages from this chunk), then `prewarm` flag is cleared, prohibiting prefetch for this chunk. + * It prevents overwritting page updated or loaded by backend with older one, loaded by prewarm. + * This function is called while LFC initialization: no synchronization is needed. + */ + for (i = 0; i < n_entries; i++) + { + hash = get_hash_value(lfc_hash, &fs[i].key); + entry = hash_search_with_hash_value(lfc_hash, &fs[i].key, hash, HASH_ENTER, &found); + /* Do not prewarm chunks which are already present in LFC */ + if (!found) + { + entry->offset = lfc_ctl->size++; + entry->hash = hash; + entry->access_count = 0; + entry->prewarm_requested = true; + entry->prewarm_started = false; + memset(entry->bitmap, 0, sizeof entry->bitmap); + /* Most recently visted pages are stored first */ + dlist_push_head(&lfc_ctl->lru, &entry->list_node); + lfc_ctl->used += 1; + } + } + LWLockRelease(lfc_lock); + + elog(LOG, "LFC: start loading %ld chunks", (long)n_entries); while (true) { size_t chunk_no = snd_idx / BLOCKS_PER_CHUNK; size_t offs_in_chunk = snd_idx % BLOCKS_PER_CHUNK; - if (chunk_no < max_entries) + if (chunk_no < n_entries) { if (fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31))) { @@ -727,7 +717,7 @@ lfc_load_pages(void) } snd_idx += 1; } - if (n_sent >= n_received + lfc_prewarm_batch || chunk_no == max_entries) + if (n_sent >= n_received + lfc_prewarm_batch || chunk_no == n_entries) { NeonResponse * resp; do @@ -810,12 +800,47 @@ lfc_load_pages(void) } } } - lfc_ctl->prewarm_curr_chunk = max_entries; - free(fs); + lfc_ctl->prewarm_curr_chunk = n_entries; elog(LOG, "LFC: complete prewarming: loaded %ld pages", (long)n_received); } +/* + * Load pages from LFC state saved in AUX file. + */ +void +lfc_load_pages(void) +{ + int fd; + FileCacheStateEntry *fs; + ssize_t rc; + size_t max_entries = lfc_prewarm_limit; + + fd = OpenTransientFile("lfc.state", O_RDONLY | PG_BINARY); + if (fd < 0) + { + elog(LOG, "LFC: state file is missing"); + return; + } + + fs = (FileCacheStateEntry*)palloc(sizeof(FileCacheStateEntry) * max_entries); + rc = read(fd, fs, sizeof(FileCacheStateEntry) * max_entries); + if (rc <= 0) + { + elog(LOG, "LFC: Failed to read state file: %m"); + CloseTransientFile(fd); + free(fs); + return; + } + CloseTransientFile(fd); + elog(LOG, "LFC: read state with %lu entries", (long)(rc / sizeof(FileCacheStateEntry))); + + lfc_prewarm(fs, rc / sizeof(FileCacheStateEntry)); + + pfree(fs); +} + + /* * Check if page is present in the cache. * Returns true if page is found in local cache. @@ -1725,6 +1750,37 @@ save_local_cache_state(PG_FUNCTION_ARGS) PG_RETURN_NULL(); } +PG_FUNCTION_INFO_V1(get_local_cache_state); + +Datum +get_local_cache_state(PG_FUNCTION_ARGS) +{ + size_t n_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0); + FileCacheStateEntry* fs = lfc_get_state(&n_entries); + size_t size_in_bytes = sizeof(FileCacheStateEntry) * n_entries; + bytea* res = (bytea*)palloc(VARHDRSZ + size_in_bytes); + + SET_VARSIZE(res, VARHDRSZ + size_in_bytes); + memcpy(VARDATA(res), fs, size_in_bytes); + pfree(fs); + + PG_RETURN_BYTEA_P(res); +} + +PG_FUNCTION_INFO_V1(prewarm_local_cache); + +Datum +prewarm_local_cache(PG_FUNCTION_ARGS) +{ + bytea* state = PG_GETARG_BYTEA_PP(0); + uint32 n_entries = VARSIZE_ANY_EXHDR(state); + FileCacheStateEntry* fs = (FileCacheStateEntry*)VARDATA_ANY(state); + + lfc_prewarm(fs, n_entries); + + PG_RETURN_NULL(); +} + PG_FUNCTION_INFO_V1(get_prewarm_info); Datum diff --git a/pgxn/neon/neon--1.5--1.6.sql b/pgxn/neon/neon--1.5--1.6.sql index 501e4c4e12..77222ddd8e 100644 --- a/pgxn/neon/neon--1.5--1.6.sql +++ b/pgxn/neon/neon--1.5--1.6.sql @@ -12,5 +12,17 @@ AS 'MODULE_PATHNAME', 'get_prewarm_info' LANGUAGE C STRICT PARALLEL SAFE; +CREATE FUNCTION get_local_cache_state(max_chunks integer default null) +RETURNS bytea +AS 'MODULE_PATHNAME', 'get_local_cache_state' +LANGUAGE C +PARALLEL UNSAFE; + +CREATE FUNCTION prewarm_local_cache(state bytea) +RETURNS void +AS 'MODULE_PATHNAME', 'prewarm_local_cache' +LANGUAGE C STRICT +PARALLEL UNSAFE; + diff --git a/pgxn/neon/neon--1.6--1.5.sql b/pgxn/neon/neon--1.6--1.5.sql index 3d9f523541..8eb3257fc9 100644 --- a/pgxn/neon/neon--1.6--1.5.sql +++ b/pgxn/neon/neon--1.6--1.5.sql @@ -1,3 +1,9 @@ DROP FUNCTION IF EXISTS save_local_cache_state(); DROP FUNCTION IF EXISTS get_prewarm_info(out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer); + +DROP FUNCTION IF EXISTS get_local_cache_state(max_chunks integer); + +DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea); + +