Compare commits

...

14 Commits

Author SHA1 Message Date
Konstantin Knizhnik
8852e8a766 Fix prewarming terminatoin condition 2024-10-27 14:14:06 +02:00
Konstantin Knizhnik
c04ae556c5 Fix comments and other minor refactoring 2024-10-27 09:08:40 +02:00
Konstantin Knizhnik
44b283e107 Increase timeout in test_lfc_prewarm 2024-10-26 08:22:53 +03:00
Konstantin Knizhnik
c7a3359edd Add prewarm_local_cache and get_local_cache_state functions 2024-10-25 23:05:07 +03:00
Konstantin Knizhnik
012a8a360f Fix get_prewarm_info() 2024-10-25 10:11:58 +03:00
Konstantin Knizhnik
0b42695983 Report prewarm progress 2024-10-25 08:26:02 +03:00
Konstantin Knizhnik
284d7b4da6 Report prewarm progress 2024-10-25 08:26:00 +03:00
Konstantin Knizhnik
361fc04cd6 Fix warnings 2024-10-25 08:24:45 +03:00
Konstantin Knizhnik
12f635aa04 Fix warnings 2024-10-25 08:24:45 +03:00
Konstantin Knizhnik
ac6c53b94b Support prewarming of replica 2024-10-25 08:24:45 +03:00
Konstantin Knizhnik
df289738b8 Explain why we do not want to perform prewarm of LFC at replica and how it is avoided now 2024-10-25 08:24:45 +03:00
Konstantin Knizhnik
0d3503a187 Check for number of used pages rather than chunks in test_lfc_prewarm.py 2024-10-25 08:24:44 +03:00
Konstantin Knizhnik
f328d497e1 Wait LFC prewarm completion in the loop in test_lfc_prewarm.py 2024-10-25 08:24:44 +03:00
Konstantin Knizhnik
f971c3a786 Implement LFC prewarm 2024-10-25 08:24:44 +03:00
13 changed files with 600 additions and 27 deletions

View File

@@ -345,6 +345,7 @@ impl AuxFileV2 {
AuxFileV2::Recognized("pg_logical/replorigin_checkpoint", hash)
}
(2, 1) => AuxFileV2::Recognized("pg_replslot/", hash),
(4, 1) => AuxFileV2::Recognized("lfc.state", hash),
(1, 0xff) => AuxFileV2::OtherWithPrefix("pg_logical/", hash),
(0xff, 0xff) => AuxFileV2::Other(hash),
_ => return None,

View File

@@ -39,6 +39,7 @@ fn aux_hash_to_metadata_key(dir_level1: u8, dir_level2: u8, data: &[u8]) -> Key
const AUX_DIR_PG_LOGICAL: u8 = 0x01;
const AUX_DIR_PG_REPLSLOT: u8 = 0x02;
const AUX_DIR_LFC_STATE: u8 = 0x04;
const AUX_DIR_PG_UNKNOWN: u8 = 0xFF;
/// Encode the aux file into a fixed-size key.
@@ -75,6 +76,8 @@ pub fn encode_aux_file_key(path: &str) -> Key {
aux_hash_to_metadata_key(AUX_DIR_PG_LOGICAL, 0xFF, fname.as_bytes())
} else if let Some(fname) = path.strip_prefix("pg_replslot/") {
aux_hash_to_metadata_key(AUX_DIR_PG_REPLSLOT, 0x01, fname.as_bytes())
} else if let Some(fname) = path.strip_prefix("lfc.state") {
aux_hash_to_metadata_key(AUX_DIR_LFC_STATE, 0x01, fname.as_bytes())
} else {
if cfg!(debug_assertions) {
warn!(

View File

@@ -32,6 +32,8 @@ DATA = \
neon--1.2--1.3.sql \
neon--1.3--1.4.sql \
neon--1.4--1.5.sql \
neon--1.5--1.6.sql \
neon--1.6--1.5.sql \
neon--1.5--1.4.sql \
neon--1.4--1.3.sql \
neon--1.3--1.2.sql \

View File

@@ -22,6 +22,7 @@
#include "neon_pgversioncompat.h"
#include "access/parallel.h"
#include "access/xlog.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "pagestore_client.h"
@@ -30,22 +31,28 @@
#include "port/pg_iovec.h"
#include "postmaster/bgworker.h"
#include RELFILEINFO_HDR
#include "replication/message.h"
#include "storage/buf_internals.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/pg_shmem.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/dynahash.h"
#include "utils/guc.h"
#if PG_VERSION_NUM >= 150000
#include "access/xlogrecovery.h"
#endif
#include "hll.h"
#include "bitmap.h"
#include "neon.h"
#include "neon_perf_counters.h"
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "Assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "LFC: assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
/*
* Local file cache is used to temporary store relations pages in local file system.
@@ -100,7 +107,9 @@ typedef struct FileCacheEntry
BufferTag key;
uint32 hash;
uint32 offset;
uint32 access_count;
uint32 access_count : 30;
uint32 prewarm_requested : 1; /* entry should be filled by prewarm */
uint32 prewarm_started : 1; /* chunk is written by lfc_prewarm */
uint32 bitmap[CHUNK_BITMAP_SIZE];
dlist_node list_node; /* LRU/holes list node */
} FileCacheEntry;
@@ -118,26 +127,57 @@ typedef struct FileCacheControl
uint64 writes; /* number of writes issued */
uint64 time_read; /* time spent reading (us) */
uint64 time_write; /* time spent writing (us) */
uint32 prewarm_total_chunks;
uint32 prewarm_curr_chunk;
uint32 prewarmed_pages;
uint32 skipped_pages;
dlist_head lru; /* double linked list for LRU replacement
* algorithm */
dlist_head holes; /* double linked list of punched holes */
HyperLogLogState wss_estimation; /* estimation of working set size */
} FileCacheControl;
typedef struct FileCacheStateEntry
{
BufferTag key;
uint32 bitmap[CHUNK_BITMAP_SIZE];
} FileCacheStateEntry;
static HTAB *lfc_hash;
static int lfc_desc = 0;
static LWLockId lfc_lock;
static int lfc_max_size;
static int lfc_size_limit;
static int lfc_prewarm_limit;
static int lfc_prewarm_batch;
static char *lfc_path;
static FileCacheControl *lfc_ctl;
static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM>=150000
static shmem_request_hook_type prev_shmem_request_hook;
#endif
static CustomCheckpointHookType PrevCheckpointHook;
#define LFC_ENABLED() (lfc_ctl->limit != 0)
PGDLLEXPORT void LfcPrewarmMain(Datum main_arg);
static void
LfcCheckpointHook(int flags)
{
if (flags & CHECKPOINT_IS_SHUTDOWN)
{
lfc_save_state();
}
if (PrevCheckpointHook)
{
PrevCheckpointHook(flags);
}
}
/*
* Local file cache is optional and Neon can work without it.
* In case of any any errors with this cache, we should disable it but to not throw error.
@@ -149,7 +189,7 @@ lfc_disable(char const *op)
{
int fd;
elog(WARNING, "Failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
elog(WARNING, "LFC: failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
/* Invalidate hash */
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
@@ -184,7 +224,7 @@ lfc_disable(char const *op)
pgstat_report_wait_end();
if (rc < 0)
elog(WARNING, "Failed to truncate local file cache %s: %m", lfc_path);
elog(WARNING, "LFC: failed to truncate local file cache %s: %m", lfc_path);
}
}
@@ -196,7 +236,7 @@ lfc_disable(char const *op)
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
if (fd < 0)
elog(WARNING, "Failed to recreate local file cache %s: %m", lfc_path);
elog(WARNING, "LFC: failed to recreate local file cache %s: %m", lfc_path);
else
close(fd);
@@ -236,6 +276,17 @@ lfc_ensure_opened(void)
return enabled;
}
PGDLLEXPORT void
LfcPrewarmMain(Datum main_arg)
{
pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();
lfc_load_pages();
}
static void
lfc_shmem_startup(void)
{
@@ -267,14 +318,7 @@ lfc_shmem_startup(void)
n_chunks + 1, n_chunks + 1,
&info,
HASH_ELEM | HASH_BLOBS);
lfc_ctl->generation = 0;
lfc_ctl->size = 0;
lfc_ctl->used = 0;
lfc_ctl->hits = 0;
lfc_ctl->misses = 0;
lfc_ctl->writes = 0;
lfc_ctl->time_read = 0;
lfc_ctl->time_write = 0;
memset(lfc_ctl, 0, sizeof *lfc_ctl);
dlist_init(&lfc_ctl->lru);
dlist_init(&lfc_ctl->holes);
@@ -285,7 +329,7 @@ lfc_shmem_startup(void)
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
if (fd < 0)
{
elog(WARNING, "Failed to create local file cache %s: %m", lfc_path);
elog(WARNING, "LFC: failed to create local file cache %s: %m", lfc_path);
lfc_ctl->limit = 0;
}
else
@@ -295,6 +339,9 @@ lfc_shmem_startup(void)
}
}
LWLockRelease(AddinShmemInitLock);
PrevCheckpointHook = CustomCheckpointHook;
CustomCheckpointHook = LfcCheckpointHook;
}
static void
@@ -327,7 +374,7 @@ lfc_check_limit_hook(int *newval, void **extra, GucSource source)
{
if (*newval > lfc_max_size)
{
elog(ERROR, "neon.file_cache_size_limit can not be larger than neon.max_file_cache_size");
elog(ERROR, "LFC: neon.file_cache_size_limit can not be larger than neon.max_file_cache_size");
return false;
}
return true;
@@ -436,6 +483,32 @@ lfc_init(void)
NULL,
NULL);
DefineCustomIntVariable("neon.file_cache_prewarm_limit",
"Maximal number of prewarmed pages",
NULL,
&lfc_prewarm_limit,
0, /* disabled by default */
0,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
DefineCustomIntVariable("neon.file_cache_prewarm_batch",
"Number of pages retrivied by prewarm from page server",
NULL,
&lfc_prewarm_batch,
64,
1,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
if (lfc_max_size == 0)
return;
@@ -447,8 +520,326 @@ lfc_init(void)
#else
lfc_shmem_request();
#endif
if (lfc_prewarm_limit != 0)
{
BackgroundWorker bgw;
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
bgw.bgw_start_time = BgWorkerStart_ConsistentState;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "LfcPrewarmMain");
snprintf(bgw.bgw_name, BGW_MAXLEN, "LFC prewarm");
snprintf(bgw.bgw_type, BGW_MAXLEN, "LFC prewarm");
RegisterBackgroundWorker(&bgw);
}
}
static FileCacheStateEntry*
lfc_get_state(size_t* n_entries)
{
size_t max_entries = *n_entries;
size_t i = 0;
FileCacheStateEntry* fs = (FileCacheStateEntry*)palloc(sizeof(FileCacheStateEntry) * max_entries);
LWLockAcquire(lfc_lock, LW_SHARED);
if (LFC_ENABLED())
{
dlist_iter iter;
dlist_reverse_foreach(iter, &lfc_ctl->lru)
{
FileCacheEntry *entry = dlist_container(FileCacheEntry, list_node, iter.cur);
memcpy(&fs[i].key, &entry->key, sizeof entry->key);
memcpy(fs[i].bitmap, entry->bitmap, sizeof entry->bitmap);
if (++i == max_entries)
break;
}
elog(LOG, "LFC: save state of %ld chunks", (long)i);
}
LWLockRelease(lfc_lock);
*n_entries = i;
return fs;
}
/*
* Save state of local file cache as AUX file. Size of saved state is limited by lfc_prewarm_limit.
* This function saves first mostrecently used pages.
* It is expected to be called at shutdown checkpoint by checkpointer.
*/
void
lfc_save_state(void)
{
size_t n_entries = lfc_prewarm_limit;
FileCacheStateEntry* fs;
if (n_entries == 0)
return;
fs = lfc_get_state(&n_entries);
if (n_entries != 0)
{
#if PG_MAJORVERSION_NUM < 17
XLogFlush(LogLogicalMessage("neon-file:lfc.state", (char const*)fs, sizeof(FileCacheStateEntry) * n_entries, false));
#else
LogLogicalMessage("neon-file:lfc.state", (char const*)fs, sizeof(FileCacheStateEntry) * n_entries, false, true);
#endif
}
pfree(fs);
}
/*
* Prewarm LFC cache to the specified state.
*
* Prewarming can interfere with accesses to the pages by other backends. Usually access to LFC is protected by shared buffers: when Postgres
* is reading page, it pins shared buffer and enforces that only one backend is reading it, while other are waiting for read completion.
*
* But it is not true for prewarming: backend can fetch page itself, modify and then write it to LFC. At the
* same time `lfc_prewarm` tries to write deteriorated image of this page in LFC. To increase concurrency, access to LFC files (both read and write)
* is performed without holding locks. So it can happen that two or more processes write different content to the same location in the LFC file.
* Certainly we can not rely on disk content in this case.
*
* To solve this problem we use two flags in LFC entry: `prewarm_requested` and `prewarm_started`. First is set before prewarm is actually started.
* `lfc_prewarm` writes to LFC file only if this flag is set. This flag is cleared if any other backend performs write to this LFC chunk.
* In this case data loaded by `lfc_prewarm` is considered to be deteriorated and should be just ignored.
*
* But as far as write to LFC is performed without holding lock, there is no guarantee that no such write is in progress.
* This is why second flag is used: `prewarm_started`. It is set by `lfc_prewarm` when is starts writing page and cleared when write is completed.
* Any other backend writing to LFC should abandon it's write to LFC file (just not mark page as loaded in bitmap) if this flag is set.
* So neither `lfc_prewarm`, neither backend are saving page in LFC in this case - it is just skipped.
*/
static void
lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries)
{
ssize_t rc;
size_t snd_idx = 0, rcv_idx = 0;
size_t n_sent = 0, n_received = 0;
FileCacheEntry *entry;
uint64 generation;
uint32 entry_offset;
uint32 hash;
size_t i;
bool found;
int shard_no;
if (!lfc_ensure_opened())
return;
if (n_entries == 0 || fs == NULL)
{
elog(LOG, "LFC: prewarm is disabled");
return;
}
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
/* Do not prewarm more entries than LFC limit */
if (lfc_ctl->limit <= lfc_ctl->size)
{
LWLockRelease(lfc_lock);
return;
}
if (n_entries > lfc_ctl->limit - lfc_ctl->size)
{
n_entries = lfc_ctl->limit - lfc_ctl->size;
}
/* Initialize fields used to track prewarming progress */
lfc_ctl->prewarm_total_chunks = n_entries;
lfc_ctl->prewarm_curr_chunk = 0;
/*
* Load LFC state and add entries in hash table.
* It is needed to track modification of prewarmed pages.
* All such entries have `prewarm_requested` flag set. When entry is updated (some backed reads or writes
* some pages from this chunk), then `prewarm_requested` flag is cleared, prohibiting prewarm of this chunk.
* It prevents overwritting page updated or loaded by backend with older one, loaded by prewarm.
*/
for (i = 0; i < n_entries; i++)
{
hash = get_hash_value(lfc_hash, &fs[i].key);
entry = hash_search_with_hash_value(lfc_hash, &fs[i].key, hash, HASH_ENTER, &found);
/* Do not prewarm chunks which are already present in LFC */
if (!found)
{
entry->offset = lfc_ctl->size++;
entry->hash = hash;
entry->access_count = 0;
entry->prewarm_requested = true;
entry->prewarm_started = false;
memset(entry->bitmap, 0, sizeof entry->bitmap);
/* Most recently visted pages are stored first */
dlist_push_head(&lfc_ctl->lru, &entry->list_node);
lfc_ctl->used += 1;
}
}
LWLockRelease(lfc_lock);
elog(LOG, "LFC: start loading %ld chunks", (long)n_entries);
while (true)
{
size_t chunk_no = snd_idx / BLOCKS_PER_CHUNK;
size_t offs_in_chunk = snd_idx % BLOCKS_PER_CHUNK;
if (chunk_no < n_entries)
{
if (fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31)))
{
/*
* In case of prewarming replica we should be careful not to load too new version
* of the page - with LSN larger than current replay LSN.
* At primary we are always loading latest version.
*/
XLogRecPtr req_lsn = RecoveryInProgress() ? GetXLogReplayRecPtr(NULL) : UINT64_MAX;
NeonGetPageRequest request = {
.req.tag = T_NeonGetPageRequest,
/* lsn and not_modified_since are filled in below */
.rinfo = BufTagGetNRelFileInfo(fs[chunk_no].key),
.forknum = fs[chunk_no].key.forkNum,
.blkno = fs[chunk_no].key.blockNum + offs_in_chunk,
.req.lsn = req_lsn,
.req.not_modified_since = 0
};
shard_no = get_shard_number(&fs[chunk_no].key);
while (!page_server->send(shard_no, (NeonRequest *) &request)
|| !page_server->flush(shard_no))
{
/* do nothing */
}
n_sent += 1;
}
snd_idx += 1;
}
if (n_sent >= n_received + lfc_prewarm_batch || chunk_no == n_entries)
{
NeonResponse * resp;
do
{
chunk_no = rcv_idx / BLOCKS_PER_CHUNK;
offs_in_chunk = rcv_idx % BLOCKS_PER_CHUNK;
rcv_idx += 1;
} while (!(fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31))));
shard_no = get_shard_number(&fs[chunk_no].key);
resp = page_server->receive(shard_no);
lfc_ctl->prewarm_curr_chunk = chunk_no;
if (resp->tag != T_NeonGetPageResponse)
{
elog(LOG, "LFC: unexpected response type: %d", resp->tag);
return;
}
hash = get_hash_value(lfc_hash, &fs[chunk_no].key);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
entry = hash_search_with_hash_value(lfc_hash, &fs[chunk_no].key, hash, HASH_FIND, NULL);
if (entry != NULL && entry->prewarm_requested)
{
/* Unlink entry from LRU list to pin it for the duration of IO operation */
if (entry->access_count++ == 0)
dlist_delete(&entry->list_node);
generation = lfc_ctl->generation;
entry_offset = entry->offset;
Assert(!entry->prewarm_started);
entry->prewarm_started = true;
LWLockRelease(lfc_lock);
rc = pwrite(lfc_desc, ((NeonGetPageResponse*)resp)->page, BLCKSZ, ((off_t) entry_offset * BLOCKS_PER_CHUNK + offs_in_chunk) * BLCKSZ);
if (rc != BLCKSZ)
{
lfc_disable("write");
break;
}
else
{
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (lfc_ctl->generation == generation)
{
CriticalAssert(LFC_ENABLED());
if (--entry->access_count == 0)
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
if (entry->prewarm_requested)
{
lfc_ctl->used_pages += 1 - ((entry->bitmap[offs_in_chunk >> 5] >> (offs_in_chunk & 31)) & 1);
entry->bitmap[offs_in_chunk >> 5] |= 1 << (offs_in_chunk & 31);
lfc_ctl->prewarmed_pages += 1;
}
else
{
lfc_ctl->skipped_pages += 1;
}
Assert(entry->prewarm_started);
entry->prewarm_started = false;
}
LWLockRelease(lfc_lock);
}
}
else
{
Assert(!entry || !entry->prewarm_started);
lfc_ctl->skipped_pages += 1;
LWLockRelease(lfc_lock);
}
if (++n_received == n_sent && snd_idx >= n_entries * BLOCKS_PER_CHUNK)
{
break;
}
}
}
Assert(n_sent == n_received);
lfc_ctl->prewarm_curr_chunk = n_entries;
elog(LOG, "LFC: complete prewarming: loaded %ld pages", (long)n_received);
}
/*
* Load pages from LFC state saved in AUX file.
*/
void
lfc_load_pages(void)
{
int fd;
FileCacheStateEntry *fs;
ssize_t rc;
size_t max_entries = lfc_prewarm_limit;
fd = OpenTransientFile("lfc.state", O_RDONLY | PG_BINARY);
if (fd < 0)
{
elog(LOG, "LFC: state file is missing");
return;
}
fs = (FileCacheStateEntry*)palloc(sizeof(FileCacheStateEntry) * max_entries);
rc = read(fd, fs, sizeof(FileCacheStateEntry) * max_entries);
if (rc <= 0)
{
elog(LOG, "LFC: Failed to read state file: %m");
CloseTransientFile(fd);
}
else
{
CloseTransientFile(fd);
elog(LOG, "LFC: read state with %lu entries", (long)(rc / sizeof(FileCacheStateEntry)));
lfc_prewarm(fs, rc / sizeof(FileCacheStateEntry));
}
pfree(fs);
}
/*
* Check if page is present in the cache.
* Returns true if page is found in local cache.
@@ -616,6 +1007,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
/* remove the page from the cache */
entry->bitmap[chunk_offs >> 5] &= ~(1 << (chunk_offs & (32 - 1)));
entry->prewarm_requested = false; /* prohibit prewarm of this LFC entry */
if (entry->access_count == 0)
{
@@ -861,7 +1253,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
/*
/*
* For every chunk that has blocks we're interested in, we
* 1. get the chunk header
* 2. Check if the chunk actually has the blocks we're interested in
@@ -899,6 +1291,17 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (found)
{
if (entry->prewarm_started)
{
/*
* Some page of this chunk is currently written by `lfc_prewarm`.
* We should give-up not to interfere with it.
* But clearing `prewarm_requested` flag also will not allow `lfc_prewarm` to fix it result.
*/
entry->prewarm_requested = false;
LWLockRelease(lfc_lock);
return;
}
/*
* Unlink entry from LRU list to pin it for the duration of IO
* operation
@@ -928,7 +1331,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
{
/* Cache overflow: evict least recently used chunk */
FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->lru));
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
{
lfc_ctl->used_pages -= (victim->bitmap[i >> 5] >> (i & 31)) & 1;
@@ -944,10 +1347,10 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
FileCacheEntry *hole = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->holes));
uint32 offset = hole->offset;
bool hole_found;
hash_search_with_hash_value(lfc_hash, &hole->key, hole->hash, HASH_REMOVE, &hole_found);
CriticalAssert(hole_found);
lfc_ctl->used += 1;
entry->offset = offset; /* reuse the hole */
}
@@ -959,9 +1362,11 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
entry->access_count = 1;
entry->hash = hash;
entry->prewarm_started = false;
memset(entry->bitmap, 0, sizeof entry->bitmap);
}
entry->prewarm_requested = false; /* prohibit prewarm if LFC entry is updated by some backend */
generation = lfc_ctl->generation;
entry_offset = entry->offset;
LWLockRelease(lfc_lock);
@@ -1334,3 +1739,74 @@ approximate_working_set_size(PG_FUNCTION_ARGS)
}
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(save_local_cache_state);
Datum
save_local_cache_state(PG_FUNCTION_ARGS)
{
lfc_save_state();
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(get_local_cache_state);
Datum
get_local_cache_state(PG_FUNCTION_ARGS)
{
size_t n_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0);
FileCacheStateEntry* fs = lfc_get_state(&n_entries);
size_t size_in_bytes = sizeof(FileCacheStateEntry) * n_entries;
bytea* res = (bytea*)palloc(VARHDRSZ + size_in_bytes);
SET_VARSIZE(res, VARHDRSZ + size_in_bytes);
memcpy(VARDATA(res), fs, size_in_bytes);
pfree(fs);
PG_RETURN_BYTEA_P(res);
}
PG_FUNCTION_INFO_V1(prewarm_local_cache);
Datum
prewarm_local_cache(PG_FUNCTION_ARGS)
{
bytea* state = PG_GETARG_BYTEA_PP(0);
uint32 n_entries = VARSIZE_ANY_EXHDR(state);
FileCacheStateEntry* fs = (FileCacheStateEntry*)VARDATA_ANY(state);
lfc_prewarm(fs, n_entries);
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(get_prewarm_info);
Datum
get_prewarm_info(PG_FUNCTION_ARGS)
{
Datum values[4];
bool nulls[4];
TupleDesc tupdesc;
if (lfc_size_limit == 0)
PG_RETURN_NULL();
tupdesc = CreateTemplateTupleDesc(4);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "total_chunks", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "curr_chunk", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prewarmed_pages", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "skipped_pages", INT4OID, -1, 0);
tupdesc = BlessTupleDesc(tupdesc);
MemSet(nulls, 0, sizeof(nulls));
LWLockAcquire(lfc_lock, LW_SHARED);
values[0] = Int32GetDatum(lfc_ctl->prewarm_total_chunks);
values[1] = Int32GetDatum(lfc_ctl->prewarm_curr_chunk);
values[2] = Int32GetDatum(lfc_ctl->prewarmed_pages);
values[3] = Int32GetDatum(lfc_ctl->skipped_pages);
LWLockRelease(lfc_lock);
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}

View File

@@ -0,0 +1,28 @@
\echo Use "ALTER EXTENSION neon UPDATE TO '1.6'" to load this file. \quit
CREATE FUNCTION save_local_cache_state()
RETURNS void
AS 'MODULE_PATHNAME', 'save_local_cache_state'
LANGUAGE C STRICT
PARALLEL UNSAFE;
CREATE FUNCTION get_prewarm_info(out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer)
RETURNS record
AS 'MODULE_PATHNAME', 'get_prewarm_info'
LANGUAGE C STRICT
PARALLEL SAFE;
CREATE FUNCTION get_local_cache_state(max_chunks integer default null)
RETURNS bytea
AS 'MODULE_PATHNAME', 'get_local_cache_state'
LANGUAGE C
PARALLEL UNSAFE;
CREATE FUNCTION prewarm_local_cache(state bytea)
RETURNS void
AS 'MODULE_PATHNAME', 'prewarm_local_cache'
LANGUAGE C STRICT
PARALLEL UNSAFE;

View File

@@ -0,0 +1,9 @@
DROP FUNCTION IF EXISTS save_local_cache_state();
DROP FUNCTION IF EXISTS get_prewarm_info(out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer);
DROP FUNCTION IF EXISTS get_local_cache_state(max_chunks integer);
DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea);

View File

@@ -276,6 +276,8 @@ extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, int nblocks, bits8 *bitmap);
extern void lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno);
extern void lfc_init(void);
extern void lfc_save_state(void);
extern void lfc_load_pages(void);
static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,

View File

@@ -0,0 +1,52 @@
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
def test_lfc_prewarm(neon_simple_env: NeonEnv):
env = neon_simple_env
n_records = 1000000
endpoint = env.endpoints.create_start(
branch_name="main",
config_lines=[
"autovacuum = off",
"shared_buffers=1MB",
"neon.max_file_cache_size=1GB",
"neon.file_cache_size_limit=1GB",
"neon.file_cache_prewarm_limit=1000",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))")
cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))")
endpoint.stop()
endpoint.start()
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon version '1.6'")
for _ in range(60):
time.sleep(1) # give prewarm BGW some time to proceed
cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'")
lfc_used_pages = cur.fetchall()[0][0]
log.info(f"Used LFC size: {lfc_used_pages}")
cur.execute("select * from get_prewarm_info()")
prewarm_info = cur.fetchall()[0]
log.info(f"Prewarm info: {prewarm_info}")
if prewarm_info[0] > 0:
log.info(f"Prewarm progress: {prewarm_info[1]*100//prewarm_info[0]}%")
if prewarm_info[0] == prewarm_info[1]:
break
assert lfc_used_pages > 10000
assert prewarm_info[0] > 0 and prewarm_info[0] == prewarm_info[1]
cur.execute("select sum(pk) from t")
assert cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
assert prewarm_info[1] > 0

View File

@@ -1,18 +1,18 @@
{
"v17": [
"17.0",
"68b5038f27e493bde6ae552fe066f10cbdfe6a14"
"37d5ead146b028dd9a5c07e7a37068ec0df9f465"
],
"v16": [
"16.4",
"e131a9c027b202ce92bd7b9cf2569d48a6f9948e"
"cc36e03bd0c927022cf3b3563e291e42d75366a1"
],
"v15": [
"15.8",
"22e580fe9ffcea7e02592110b1c9bf426d83cada"
"a4830163a65811578824ce4022c1cd3daef33d4e"
],
"v14": [
"14.13",
"2199b83fb72680001ce0f43bf6187a21dfb8f45d"
"ecb1020ff71927e9dd59c526254bb8846bb73ee1"
]
}