mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-05 03:30:36 +00:00
Compare commits
15 Commits
split-prox
...
lfc_fixes
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f536dd830c | ||
|
|
831f3a921e | ||
|
|
649517d1d7 | ||
|
|
ac9b5c4726 | ||
|
|
36a821270b | ||
|
|
d9d1413383 | ||
|
|
3d04854895 | ||
|
|
f4ca360583 | ||
|
|
878f808fb5 | ||
|
|
947bc53e0d | ||
|
|
86a2058ad6 | ||
|
|
8239e46e44 | ||
|
|
bd84b3e264 | ||
|
|
ec8f7f66b3 | ||
|
|
9f45f89dcc |
@@ -72,6 +72,7 @@
|
|||||||
typedef struct FileCacheEntry
|
typedef struct FileCacheEntry
|
||||||
{
|
{
|
||||||
BufferTag key;
|
BufferTag key;
|
||||||
|
uint32 hash;
|
||||||
uint32 offset;
|
uint32 offset;
|
||||||
uint32 access_count;
|
uint32 access_count;
|
||||||
uint32 bitmap[BLOCKS_PER_CHUNK/32];
|
uint32 bitmap[BLOCKS_PER_CHUNK/32];
|
||||||
@@ -83,6 +84,9 @@ typedef struct FileCacheControl
|
|||||||
uint64 generation; /* generation is needed to handle correct hash reenabling */
|
uint64 generation; /* generation is needed to handle correct hash reenabling */
|
||||||
uint32 size; /* size of cache file in chunks */
|
uint32 size; /* size of cache file in chunks */
|
||||||
uint32 used; /* number of used chunks */
|
uint32 used; /* number of used chunks */
|
||||||
|
uint32 limit; /* shared copy of lfc_size_limit */
|
||||||
|
uint64 hits;
|
||||||
|
uint64 misses;
|
||||||
dlist_head lru; /* double linked list for LRU replacement algorithm */
|
dlist_head lru; /* double linked list for LRU replacement algorithm */
|
||||||
} FileCacheControl;
|
} FileCacheControl;
|
||||||
|
|
||||||
@@ -100,7 +104,9 @@ static shmem_request_hook_type prev_shmem_request_hook;
|
|||||||
#endif
|
#endif
|
||||||
static int lfc_shrinking_factor; /* power of two by which local cache size will be shrinked when lfc_free_space_watermark is reached */
|
static int lfc_shrinking_factor; /* power of two by which local cache size will be shrinked when lfc_free_space_watermark is reached */
|
||||||
|
|
||||||
void FileCacheMonitorMain(Datum main_arg);
|
#define LFC_ENABLED() (lfc_ctl->limit != 0)
|
||||||
|
|
||||||
|
void PGDLLEXPORT FileCacheMonitorMain(Datum main_arg);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Local file cache is mandatory and Neon can work without it.
|
* Local file cache is mandatory and Neon can work without it.
|
||||||
@@ -111,49 +117,68 @@ void FileCacheMonitorMain(Datum main_arg);
|
|||||||
static void
|
static void
|
||||||
lfc_disable(char const* op)
|
lfc_disable(char const* op)
|
||||||
{
|
{
|
||||||
HASH_SEQ_STATUS status;
|
|
||||||
FileCacheEntry* entry;
|
|
||||||
|
|
||||||
elog(WARNING, "Failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
|
elog(WARNING, "Failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
|
||||||
|
|
||||||
|
/* Invalidate hash */
|
||||||
|
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
if (LFC_ENABLED())
|
||||||
|
{
|
||||||
|
HASH_SEQ_STATUS status;
|
||||||
|
FileCacheEntry* entry;
|
||||||
|
|
||||||
|
hash_seq_init(&status, lfc_hash);
|
||||||
|
while ((entry = hash_seq_search(&status)) != NULL)
|
||||||
|
{
|
||||||
|
hash_search_with_hash_value(lfc_hash, &entry->key, entry->hash, HASH_REMOVE, NULL);
|
||||||
|
}
|
||||||
|
lfc_ctl->generation += 1;
|
||||||
|
lfc_ctl->size = 0;
|
||||||
|
lfc_ctl->used = 0;
|
||||||
|
lfc_ctl->limit = 0;
|
||||||
|
dlist_init(&lfc_ctl->lru);
|
||||||
|
|
||||||
|
if (lfc_desc > 0)
|
||||||
|
{
|
||||||
|
/* If the reason of error is ENOSPC, then truncation of file may help to reclaim some space */
|
||||||
|
int rc = ftruncate(lfc_desc, 0);
|
||||||
|
if (rc < 0)
|
||||||
|
elog(WARNING, "Failed to truncate local file cache %s: %m", lfc_path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LWLockRelease(lfc_lock);
|
||||||
|
|
||||||
|
|
||||||
if (lfc_desc > 0)
|
if (lfc_desc > 0)
|
||||||
close(lfc_desc);
|
close(lfc_desc);
|
||||||
|
|
||||||
lfc_desc = -1;
|
lfc_desc = -1;
|
||||||
lfc_size_limit = 0;
|
}
|
||||||
|
|
||||||
/* Invalidate hash */
|
/*
|
||||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
* This check is done without obtaining lfc_lock, so it is unreliable
|
||||||
|
*/
|
||||||
hash_seq_init(&status, lfc_hash);
|
static bool
|
||||||
while ((entry = hash_seq_search(&status)) != NULL)
|
lfc_maybe_disabled(void)
|
||||||
{
|
{
|
||||||
hash_search(lfc_hash, &entry->key, HASH_REMOVE, NULL);
|
return !lfc_ctl || !LFC_ENABLED();
|
||||||
memset(entry->bitmap, 0, sizeof entry->bitmap);
|
|
||||||
}
|
|
||||||
hash_seq_term(&status);
|
|
||||||
lfc_ctl->generation += 1;
|
|
||||||
lfc_ctl->size = 0;
|
|
||||||
lfc_ctl->used = 0;
|
|
||||||
dlist_init(&lfc_ctl->lru);
|
|
||||||
|
|
||||||
LWLockRelease(lfc_lock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
lfc_ensure_opened(void)
|
lfc_ensure_opened(void)
|
||||||
{
|
{
|
||||||
|
bool enabled = !lfc_maybe_disabled();
|
||||||
/* Open cache file if not done yet */
|
/* Open cache file if not done yet */
|
||||||
if (lfc_desc <= 0)
|
if (lfc_desc <= 0 && enabled)
|
||||||
{
|
{
|
||||||
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
|
lfc_desc = BasicOpenFile(lfc_path, O_RDWR);
|
||||||
|
|
||||||
if (lfc_desc < 0) {
|
if (lfc_desc < 0) {
|
||||||
lfc_disable("open");
|
lfc_disable("open");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true;
|
return enabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
@@ -172,6 +197,7 @@ lfc_shmem_startup(void)
|
|||||||
lfc_ctl = (FileCacheControl*)ShmemInitStruct("lfc", sizeof(FileCacheControl), &found);
|
lfc_ctl = (FileCacheControl*)ShmemInitStruct("lfc", sizeof(FileCacheControl), &found);
|
||||||
if (!found)
|
if (!found)
|
||||||
{
|
{
|
||||||
|
int fd;
|
||||||
uint32 lfc_size = SIZE_MB_TO_CHUNKS(lfc_max_size);
|
uint32 lfc_size = SIZE_MB_TO_CHUNKS(lfc_max_size);
|
||||||
lfc_lock = (LWLockId)GetNamedLWLockTranche("lfc_lock");
|
lfc_lock = (LWLockId)GetNamedLWLockTranche("lfc_lock");
|
||||||
info.keysize = sizeof(BufferTag);
|
info.keysize = sizeof(BufferTag);
|
||||||
@@ -184,10 +210,22 @@ lfc_shmem_startup(void)
|
|||||||
lfc_ctl->generation = 0;
|
lfc_ctl->generation = 0;
|
||||||
lfc_ctl->size = 0;
|
lfc_ctl->size = 0;
|
||||||
lfc_ctl->used = 0;
|
lfc_ctl->used = 0;
|
||||||
|
lfc_ctl->hits = 0;
|
||||||
|
lfc_ctl->misses = 0;
|
||||||
dlist_init(&lfc_ctl->lru);
|
dlist_init(&lfc_ctl->lru);
|
||||||
|
|
||||||
/* Remove file cache on restart */
|
/* Recreate file cache on restart */
|
||||||
(void)unlink(lfc_path);
|
fd = BasicOpenFile(lfc_path, O_RDWR|O_CREAT|O_TRUNC);
|
||||||
|
if (fd < 0)
|
||||||
|
{
|
||||||
|
elog(WARNING, "Failed to create local file cache %s: %m", lfc_path);
|
||||||
|
lfc_ctl->limit = 0;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
close(fd);
|
||||||
|
lfc_ctl->limit = SIZE_MB_TO_CHUNKS(lfc_size_limit);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
LWLockRelease(AddinShmemInitLock);
|
LWLockRelease(AddinShmemInitLock);
|
||||||
}
|
}
|
||||||
@@ -204,6 +242,17 @@ lfc_shmem_request(void)
|
|||||||
RequestNamedLWLockTranche("lfc_lock", 1);
|
RequestNamedLWLockTranche("lfc_lock", 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool
|
||||||
|
is_normal_backend(void)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Stats collector detach shared memory, so we should not try to access shared memory here.
|
||||||
|
* Parallel workers first assign default value (0), so not perform truncation in parallel workers.
|
||||||
|
* The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC.
|
||||||
|
*/
|
||||||
|
return lfc_ctl && MyProc && UsedShmemSegAddr && !IsParallelWorker();
|
||||||
|
}
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
lfc_check_limit_hook(int *newval, void **extra, GucSource source)
|
lfc_check_limit_hook(int *newval, void **extra, GucSource source)
|
||||||
{
|
{
|
||||||
@@ -219,25 +268,15 @@ static void
|
|||||||
lfc_change_limit_hook(int newval, void *extra)
|
lfc_change_limit_hook(int newval, void *extra)
|
||||||
{
|
{
|
||||||
uint32 new_size = SIZE_MB_TO_CHUNKS(newval);
|
uint32 new_size = SIZE_MB_TO_CHUNKS(newval);
|
||||||
/*
|
|
||||||
* Stats collector detach shared memory, so we should not try to access shared memory here.
|
if (!is_normal_backend())
|
||||||
* Parallel workers first assign default value (0), so not perform truncation in parallel workers.
|
return;
|
||||||
* The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC.
|
|
||||||
*/
|
if (!lfc_ensure_opened())
|
||||||
if (!lfc_ctl || !MyProc || !UsedShmemSegAddr || IsParallelWorker())
|
|
||||||
return;
|
return;
|
||||||
|
|
||||||
/* Open cache file if not done yet */
|
|
||||||
if (lfc_desc <= 0)
|
|
||||||
{
|
|
||||||
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
|
|
||||||
if (lfc_desc < 0) {
|
|
||||||
elog(WARNING, "Failed to open file cache %s: %m, disabling file cache", lfc_path);
|
|
||||||
lfc_size_limit = 0; /* disable file cache */
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
while (new_size < lfc_ctl->used && !dlist_is_empty(&lfc_ctl->lru))
|
while (new_size < lfc_ctl->used && !dlist_is_empty(&lfc_ctl->lru))
|
||||||
{
|
{
|
||||||
/* Shrink cache by throwing away least recently accessed chunks and returning their space to file system */
|
/* Shrink cache by throwing away least recently accessed chunks and returning their space to file system */
|
||||||
@@ -247,10 +286,12 @@ lfc_change_limit_hook(int newval, void *extra)
|
|||||||
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE, (off_t)victim->offset*BLOCKS_PER_CHUNK*BLCKSZ, BLOCKS_PER_CHUNK*BLCKSZ) < 0)
|
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE, (off_t)victim->offset*BLOCKS_PER_CHUNK*BLCKSZ, BLOCKS_PER_CHUNK*BLCKSZ) < 0)
|
||||||
elog(LOG, "Failed to punch hole in file: %m");
|
elog(LOG, "Failed to punch hole in file: %m");
|
||||||
#endif
|
#endif
|
||||||
hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL);
|
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
|
||||||
lfc_ctl->used -= 1;
|
lfc_ctl->used -= 1;
|
||||||
}
|
}
|
||||||
|
lfc_ctl->limit = new_size;
|
||||||
elog(DEBUG1, "set local file cache limit to %d", new_size);
|
elog(DEBUG1, "set local file cache limit to %d", new_size);
|
||||||
|
|
||||||
LWLockRelease(lfc_lock);
|
LWLockRelease(lfc_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -268,7 +309,7 @@ lfc_change_limit_hook(int newval, void *extra)
|
|||||||
* disk space with maximal possible disk write speed (1Gb/sec). But not larger than 1 second.
|
* disk space with maximal possible disk write speed (1Gb/sec). But not larger than 1 second.
|
||||||
* Calling statvfs each second should not add any noticeable overhead.
|
* Calling statvfs each second should not add any noticeable overhead.
|
||||||
*/
|
*/
|
||||||
void
|
PGDLLEXPORT void
|
||||||
FileCacheMonitorMain(Datum main_arg)
|
FileCacheMonitorMain(Datum main_arg)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
@@ -286,7 +327,7 @@ FileCacheMonitorMain(Datum main_arg)
|
|||||||
/* Periodically dump buffers until terminated. */
|
/* Periodically dump buffers until terminated. */
|
||||||
while (!ShutdownRequestPending)
|
while (!ShutdownRequestPending)
|
||||||
{
|
{
|
||||||
if (lfc_size_limit != 0)
|
if (!lfc_maybe_disabled())
|
||||||
{
|
{
|
||||||
struct statvfs sfs;
|
struct statvfs sfs;
|
||||||
if (statvfs(lfc_path, &sfs) < 0)
|
if (statvfs(lfc_path, &sfs) < 0)
|
||||||
@@ -300,7 +341,7 @@ FileCacheMonitorMain(Datum main_arg)
|
|||||||
if (lfc_shrinking_factor < 31) {
|
if (lfc_shrinking_factor < 31) {
|
||||||
lfc_shrinking_factor += 1;
|
lfc_shrinking_factor += 1;
|
||||||
}
|
}
|
||||||
lfc_change_limit_hook(lfc_size_limit >> lfc_shrinking_factor, NULL);
|
lfc_change_limit_hook(lfc_ctl->limit >> lfc_shrinking_factor, NULL);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
lfc_shrinking_factor = 0; /* reset to initial value */
|
lfc_shrinking_factor = 0; /* reset to initial value */
|
||||||
@@ -338,6 +379,7 @@ lfc_init(void)
|
|||||||
if (!process_shared_preload_libraries_in_progress)
|
if (!process_shared_preload_libraries_in_progress)
|
||||||
elog(ERROR, "Neon module should be loaded via shared_preload_libraries");
|
elog(ERROR, "Neon module should be loaded via shared_preload_libraries");
|
||||||
|
|
||||||
|
|
||||||
DefineCustomIntVariable("neon.max_file_cache_size",
|
DefineCustomIntVariable("neon.max_file_cache_size",
|
||||||
"Maximal size of Neon local file cache",
|
"Maximal size of Neon local file cache",
|
||||||
NULL,
|
NULL,
|
||||||
@@ -414,10 +456,10 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
|||||||
BufferTag tag;
|
BufferTag tag;
|
||||||
FileCacheEntry* entry;
|
FileCacheEntry* entry;
|
||||||
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
|
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
|
||||||
bool found;
|
bool found = false;
|
||||||
uint32 hash;
|
uint32 hash;
|
||||||
|
|
||||||
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
|
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
CopyNRelFileInfoToBufTag(tag, rinfo);
|
CopyNRelFileInfoToBufTag(tag, rinfo);
|
||||||
@@ -426,8 +468,11 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
|||||||
hash = get_hash_value(lfc_hash, &tag);
|
hash = get_hash_value(lfc_hash, &tag);
|
||||||
|
|
||||||
LWLockAcquire(lfc_lock, LW_SHARED);
|
LWLockAcquire(lfc_lock, LW_SHARED);
|
||||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
if (LFC_ENABLED())
|
||||||
found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0;
|
{
|
||||||
|
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
||||||
|
found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0;
|
||||||
|
}
|
||||||
LWLockRelease(lfc_lock);
|
LWLockRelease(lfc_lock);
|
||||||
return found;
|
return found;
|
||||||
}
|
}
|
||||||
@@ -444,7 +489,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
|||||||
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
|
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
|
||||||
uint32 hash;
|
uint32 hash;
|
||||||
|
|
||||||
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
|
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
|
||||||
return;
|
return;
|
||||||
|
|
||||||
CopyNRelFileInfoToBufTag(tag, rinfo);
|
CopyNRelFileInfoToBufTag(tag, rinfo);
|
||||||
@@ -454,6 +499,13 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
|||||||
hash = get_hash_value(lfc_hash, &tag);
|
hash = get_hash_value(lfc_hash, &tag);
|
||||||
|
|
||||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
if (!LFC_ENABLED())
|
||||||
|
{
|
||||||
|
LWLockRelease(lfc_lock);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, &found);
|
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, &found);
|
||||||
|
|
||||||
if (!found)
|
if (!found)
|
||||||
@@ -504,7 +556,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
|||||||
/*
|
/*
|
||||||
* Try to read page from local cache.
|
* Try to read page from local cache.
|
||||||
* Returns true if page is found in local cache.
|
* Returns true if page is found in local cache.
|
||||||
* In case of error lfc_size_limit is set to zero to disable any further opera-tins with cache.
|
* In case of error local file cache is disabled (lfc->limit is set to zero).
|
||||||
*/
|
*/
|
||||||
bool
|
bool
|
||||||
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||||
@@ -519,7 +571,7 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
|||||||
uint64 generation;
|
uint64 generation;
|
||||||
uint32 entry_offset;
|
uint32 entry_offset;
|
||||||
|
|
||||||
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
|
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
if (!lfc_ensure_opened())
|
if (!lfc_ensure_opened())
|
||||||
@@ -531,10 +583,18 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
|||||||
hash = get_hash_value(lfc_hash, &tag);
|
hash = get_hash_value(lfc_hash, &tag);
|
||||||
|
|
||||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
if (!LFC_ENABLED())
|
||||||
|
{
|
||||||
|
LWLockRelease(lfc_lock);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
||||||
if (entry == NULL || (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) == 0)
|
if (entry == NULL || (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) == 0)
|
||||||
{
|
{
|
||||||
/* Page is not cached */
|
/* Page is not cached */
|
||||||
|
lfc_ctl->misses += 1; /* race condition here, but precise value is not needed */
|
||||||
LWLockRelease(lfc_lock);
|
LWLockRelease(lfc_lock);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@@ -555,8 +615,11 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
|||||||
|
|
||||||
/* Place entry to the head of LRU list */
|
/* Place entry to the head of LRU list */
|
||||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
if (lfc_ctl->generation == generation)
|
if (lfc_ctl->generation == generation)
|
||||||
{
|
{
|
||||||
|
Assert(LFC_ENABLED());
|
||||||
|
lfc_ctl->hits += 1;
|
||||||
Assert(entry->access_count > 0);
|
Assert(entry->access_count > 0);
|
||||||
if (--entry->access_count == 0)
|
if (--entry->access_count == 0)
|
||||||
dlist_push_tail(&lfc_ctl->lru, &entry->lru_node);
|
dlist_push_tail(&lfc_ctl->lru, &entry->lru_node);
|
||||||
@@ -588,7 +651,7 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
|||||||
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
|
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
|
||||||
uint32 hash;
|
uint32 hash;
|
||||||
|
|
||||||
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
|
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
|
||||||
return;
|
return;
|
||||||
|
|
||||||
if (!lfc_ensure_opened())
|
if (!lfc_ensure_opened())
|
||||||
@@ -596,12 +659,17 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
|||||||
|
|
||||||
tag.forkNum = forkNum;
|
tag.forkNum = forkNum;
|
||||||
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1);
|
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1);
|
||||||
|
|
||||||
CopyNRelFileInfoToBufTag(tag, rinfo);
|
CopyNRelFileInfoToBufTag(tag, rinfo);
|
||||||
|
|
||||||
hash = get_hash_value(lfc_hash, &tag);
|
hash = get_hash_value(lfc_hash, &tag);
|
||||||
|
|
||||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
if (!LFC_ENABLED())
|
||||||
|
{
|
||||||
|
LWLockRelease(lfc_lock);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
|
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
|
||||||
|
|
||||||
if (found)
|
if (found)
|
||||||
@@ -620,13 +688,13 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
|||||||
* there are should be very large number of concurrent IO operations and them are limited by max_connections,
|
* there are should be very large number of concurrent IO operations and them are limited by max_connections,
|
||||||
* we prefer not to complicate code and use second approach.
|
* we prefer not to complicate code and use second approach.
|
||||||
*/
|
*/
|
||||||
if (lfc_ctl->used >= SIZE_MB_TO_CHUNKS(lfc_size_limit) && !dlist_is_empty(&lfc_ctl->lru))
|
if (lfc_ctl->used >= lfc_ctl->limit && !dlist_is_empty(&lfc_ctl->lru))
|
||||||
{
|
{
|
||||||
/* Cache overflow: evict least recently used chunk */
|
/* Cache overflow: evict least recently used chunk */
|
||||||
FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru));
|
FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru));
|
||||||
Assert(victim->access_count == 0);
|
Assert(victim->access_count == 0);
|
||||||
entry->offset = victim->offset; /* grab victim's chunk */
|
entry->offset = victim->offset; /* grab victim's chunk */
|
||||||
hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL);
|
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
|
||||||
elog(DEBUG2, "Swap file cache page");
|
elog(DEBUG2, "Swap file cache page");
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@@ -635,6 +703,7 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
|||||||
entry->offset = lfc_ctl->size++; /* allocate new chunk at end of file */
|
entry->offset = lfc_ctl->size++; /* allocate new chunk at end of file */
|
||||||
}
|
}
|
||||||
entry->access_count = 1;
|
entry->access_count = 1;
|
||||||
|
entry->hash = hash;
|
||||||
memset(entry->bitmap, 0, sizeof entry->bitmap);
|
memset(entry->bitmap, 0, sizeof entry->bitmap);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -679,6 +748,23 @@ typedef struct
|
|||||||
LocalCachePagesRec *record;
|
LocalCachePagesRec *record;
|
||||||
} LocalCachePagesContext;
|
} LocalCachePagesContext;
|
||||||
|
|
||||||
|
|
||||||
|
PG_FUNCTION_INFO_V1(local_cache_hits);
|
||||||
|
Datum
|
||||||
|
local_cache_hits(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
PG_RETURN_INT64(lfc_ctl ? lfc_ctl->hits : -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
PG_FUNCTION_INFO_V1(local_cache_misses);
|
||||||
|
Datum
|
||||||
|
local_cache_misses(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
PG_RETURN_INT64(lfc_ctl ? lfc_ctl->misses : -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Function returning data from the local file cache
|
* Function returning data from the local file cache
|
||||||
* relation node/tablespace/database/blocknum and access_counter
|
* relation node/tablespace/database/blocknum and access_counter
|
||||||
@@ -752,13 +838,15 @@ local_cache_pages(PG_FUNCTION_ARGS)
|
|||||||
|
|
||||||
LWLockAcquire(lfc_lock, LW_SHARED);
|
LWLockAcquire(lfc_lock, LW_SHARED);
|
||||||
|
|
||||||
hash_seq_init(&status, lfc_hash);
|
if (LFC_ENABLED())
|
||||||
while ((entry = hash_seq_search(&status)) != NULL)
|
|
||||||
{
|
{
|
||||||
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
|
hash_seq_init(&status, lfc_hash);
|
||||||
n_pages += (entry->bitmap[i >> 5] & (1 << (i & 31))) != 0;
|
while ((entry = hash_seq_search(&status)) != NULL)
|
||||||
|
{
|
||||||
|
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
|
||||||
|
n_pages += (entry->bitmap[i >> 5] & (1 << (i & 31))) != 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
hash_seq_term(&status);
|
|
||||||
fctx->record = (LocalCachePagesRec *)
|
fctx->record = (LocalCachePagesRec *)
|
||||||
MemoryContextAllocHuge(CurrentMemoryContext,
|
MemoryContextAllocHuge(CurrentMemoryContext,
|
||||||
sizeof(LocalCachePagesRec) * n_pages);
|
sizeof(LocalCachePagesRec) * n_pages);
|
||||||
@@ -770,35 +858,37 @@ local_cache_pages(PG_FUNCTION_ARGS)
|
|||||||
/* Return to original context when allocating transient memory */
|
/* Return to original context when allocating transient memory */
|
||||||
MemoryContextSwitchTo(oldcontext);
|
MemoryContextSwitchTo(oldcontext);
|
||||||
|
|
||||||
/*
|
if (n_pages != 0)
|
||||||
* Scan through all the buffers, saving the relevant fields in the
|
|
||||||
* fctx->record structure.
|
|
||||||
*
|
|
||||||
* We don't hold the partition locks, so we don't get a consistent
|
|
||||||
* snapshot across all buffers, but we do grab the buffer header
|
|
||||||
* locks, so the information of each buffer is self-consistent.
|
|
||||||
*/
|
|
||||||
n_pages = 0;
|
|
||||||
hash_seq_init(&status, lfc_hash);
|
|
||||||
while ((entry = hash_seq_search(&status)) != NULL)
|
|
||||||
{
|
{
|
||||||
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
|
/*
|
||||||
|
* Scan through all the buffers, saving the relevant fields in the
|
||||||
|
* fctx->record structure.
|
||||||
|
*
|
||||||
|
* We don't hold the partition locks, so we don't get a consistent
|
||||||
|
* snapshot across all buffers, but we do grab the buffer header
|
||||||
|
* locks, so the information of each buffer is self-consistent.
|
||||||
|
*/
|
||||||
|
n_pages = 0;
|
||||||
|
hash_seq_init(&status, lfc_hash);
|
||||||
|
while ((entry = hash_seq_search(&status)) != NULL)
|
||||||
{
|
{
|
||||||
if (entry->bitmap[i >> 5] & (1 << (i & 31)))
|
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
|
||||||
{
|
{
|
||||||
fctx->record[n_pages].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i;
|
if (entry->bitmap[i >> 5] & (1 << (i & 31)))
|
||||||
fctx->record[n_pages].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
|
{
|
||||||
fctx->record[n_pages].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
|
fctx->record[n_pages].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i;
|
||||||
fctx->record[n_pages].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
|
fctx->record[n_pages].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
|
||||||
fctx->record[n_pages].forknum = entry->key.forkNum;
|
fctx->record[n_pages].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
|
||||||
fctx->record[n_pages].blocknum = entry->key.blockNum + i;
|
fctx->record[n_pages].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
|
||||||
fctx->record[n_pages].accesscount = entry->access_count;
|
fctx->record[n_pages].forknum = entry->key.forkNum;
|
||||||
n_pages += 1;
|
fctx->record[n_pages].blocknum = entry->key.blockNum + i;
|
||||||
|
fctx->record[n_pages].accesscount = entry->access_count;
|
||||||
|
n_pages += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Assert(n_pages == funcctx->max_calls);
|
||||||
}
|
}
|
||||||
hash_seq_term(&status);
|
|
||||||
Assert(n_pages == funcctx->max_calls);
|
|
||||||
LWLockRelease(lfc_lock);
|
LWLockRelease(lfc_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
74
test_runner/regress/test_local_file_cache.py
Normal file
74
test_runner/regress/test_local_file_cache.py
Normal file
@@ -0,0 +1,74 @@
|
|||||||
|
import os
|
||||||
|
import random
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
from fixtures.neon_fixtures import NeonEnv
|
||||||
|
from fixtures.utils import query_scalar
|
||||||
|
|
||||||
|
|
||||||
|
def test_local_file_cache_unlink(neon_simple_env: NeonEnv):
|
||||||
|
env = neon_simple_env
|
||||||
|
|
||||||
|
cache_dir = os.path.join(env.repo_dir, "file_cache")
|
||||||
|
os.mkdir(cache_dir)
|
||||||
|
|
||||||
|
env.neon_cli.create_branch("test_local_file_cache_unlink", "empty")
|
||||||
|
|
||||||
|
endpoint = env.endpoints.create_start(
|
||||||
|
"test_local_file_cache_unlink",
|
||||||
|
config_lines=[
|
||||||
|
"shared_buffers='1MB'",
|
||||||
|
f"neon.file_cache_path='{cache_dir}/file.cache'",
|
||||||
|
"neon.max_file_cache_size='64MB'",
|
||||||
|
"neon.file_cache_size_limit='10MB'",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
cur = endpoint.connect().cursor()
|
||||||
|
|
||||||
|
n_rows = 100000
|
||||||
|
n_threads = 20
|
||||||
|
n_updates_per_thread = 10000
|
||||||
|
n_updates_per_connection = 1000
|
||||||
|
n_total_updates = n_threads * n_updates_per_thread
|
||||||
|
|
||||||
|
cur.execute("CREATE TABLE lfctest (id int4 PRIMARY KEY, n int) WITH (fillfactor=10)")
|
||||||
|
cur.execute(f"INSERT INTO lfctest SELECT g, 1 FROM generate_series(1, {n_rows}) g")
|
||||||
|
|
||||||
|
# Start threads that will perform random UPDATEs. Each UPDATE
|
||||||
|
# increments the counter on the row, so that we can check at the
|
||||||
|
# end that the sum of all the counters match the number of updates
|
||||||
|
# performed (plus the initial 1 on each row).
|
||||||
|
#
|
||||||
|
# Furthermore, each thread will reconnect between every 1000 updates.
|
||||||
|
def run_updates():
|
||||||
|
n_updates_performed = 0
|
||||||
|
conn = endpoint.connect()
|
||||||
|
cur = conn.cursor()
|
||||||
|
for _ in range(n_updates_per_thread):
|
||||||
|
id = random.randint(1, n_rows)
|
||||||
|
cur.execute(f"UPDATE lfctest SET n = n + 1 WHERE id = {id}")
|
||||||
|
n_updates_performed += 1
|
||||||
|
if n_updates_performed % n_updates_per_connection == 0:
|
||||||
|
cur.close()
|
||||||
|
conn.close()
|
||||||
|
conn = endpoint.connect()
|
||||||
|
cur = conn.cursor()
|
||||||
|
|
||||||
|
threads: List[threading.Thread] = []
|
||||||
|
for _i in range(n_threads):
|
||||||
|
thread = threading.Thread(target=run_updates, args=(), daemon=True)
|
||||||
|
thread.start()
|
||||||
|
threads.append(thread)
|
||||||
|
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
|
new_cache_dir = os.path.join(env.repo_dir, "file_cache_new")
|
||||||
|
os.rename(cache_dir, new_cache_dir)
|
||||||
|
|
||||||
|
for thread in threads:
|
||||||
|
thread.join()
|
||||||
|
|
||||||
|
assert query_scalar(cur, "SELECT SUM(n) FROM lfctest") == n_total_updates + n_rows
|
||||||
Reference in New Issue
Block a user