Compare commits

...

15 Commits

Author SHA1 Message Date
Konstantin Knizhnik
f536dd830c Add functions for reporting LFC hits and misses 2023-10-06 09:22:30 +03:00
Konstantin Knizhnik
831f3a921e Use hash_search_with_hash_value instead of hash_search in file_cache 2023-09-25 21:49:22 +03:00
Konstantin Knizhnik
649517d1d7 Create local file cache file only on startup and truncate it in case of errors 2023-09-25 21:49:22 +03:00
Konstantin Knizhnik
ac9b5c4726 Make poetry run black happy 2023-09-25 21:49:22 +03:00
Konstantin Knizhnik
36a821270b Change local_file_cache.py test to rename directory rather than unlinking file 2023-09-25 21:49:22 +03:00
Konstantin Knizhnik
d9d1413383 Further LFC refactoring 2023-09-25 21:49:22 +03:00
Konstantin Knizhnik
3d04854895 Further LFC refactoring 2023-09-25 21:49:22 +03:00
Konstantin Knizhnik
f4ca360583 Undo bump of Postgres version 2023-09-25 21:49:22 +03:00
Konstantin Knizhnik
878f808fb5 Remove local_file_cache GUC 2023-09-25 21:49:22 +03:00
Konstantin Knizhnik
947bc53e0d Updatesubmodule version 2023-09-25 21:49:22 +03:00
Konstantin Knizhnik
86a2058ad6 Import os 2023-09-25 21:49:22 +03:00
Konstantin Knizhnik
8239e46e44 Make ruff happy 2023-09-25 21:49:22 +03:00
Konstantin Knizhnik
bd84b3e264 Add local_file_cache GUC 2023-09-25 21:49:22 +03:00
Konstantin Knizhnik
ec8f7f66b3 Add local_file_cache GUC 2023-09-25 21:49:22 +03:00
Konstantin Knizhnik
9f45f89dcc refer #5347
refer #5351

Make changing state of local file cache more consistent
2023-09-25 21:49:22 +03:00
2 changed files with 250 additions and 86 deletions

View File

@@ -72,6 +72,7 @@
typedef struct FileCacheEntry typedef struct FileCacheEntry
{ {
BufferTag key; BufferTag key;
uint32 hash;
uint32 offset; uint32 offset;
uint32 access_count; uint32 access_count;
uint32 bitmap[BLOCKS_PER_CHUNK/32]; uint32 bitmap[BLOCKS_PER_CHUNK/32];
@@ -83,6 +84,9 @@ typedef struct FileCacheControl
uint64 generation; /* generation is needed to handle correct hash reenabling */ uint64 generation; /* generation is needed to handle correct hash reenabling */
uint32 size; /* size of cache file in chunks */ uint32 size; /* size of cache file in chunks */
uint32 used; /* number of used chunks */ uint32 used; /* number of used chunks */
uint32 limit; /* shared copy of lfc_size_limit */
uint64 hits;
uint64 misses;
dlist_head lru; /* double linked list for LRU replacement algorithm */ dlist_head lru; /* double linked list for LRU replacement algorithm */
} FileCacheControl; } FileCacheControl;
@@ -100,7 +104,9 @@ static shmem_request_hook_type prev_shmem_request_hook;
#endif #endif
static int lfc_shrinking_factor; /* power of two by which local cache size will be shrinked when lfc_free_space_watermark is reached */ static int lfc_shrinking_factor; /* power of two by which local cache size will be shrinked when lfc_free_space_watermark is reached */
void FileCacheMonitorMain(Datum main_arg); #define LFC_ENABLED() (lfc_ctl->limit != 0)
void PGDLLEXPORT FileCacheMonitorMain(Datum main_arg);
/* /*
* Local file cache is mandatory and Neon can work without it. * Local file cache is mandatory and Neon can work without it.
@@ -111,49 +117,68 @@ void FileCacheMonitorMain(Datum main_arg);
static void static void
lfc_disable(char const* op) lfc_disable(char const* op)
{ {
HASH_SEQ_STATUS status;
FileCacheEntry* entry;
elog(WARNING, "Failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path); elog(WARNING, "Failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
/* Invalidate hash */
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (LFC_ENABLED())
{
HASH_SEQ_STATUS status;
FileCacheEntry* entry;
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
{
hash_search_with_hash_value(lfc_hash, &entry->key, entry->hash, HASH_REMOVE, NULL);
}
lfc_ctl->generation += 1;
lfc_ctl->size = 0;
lfc_ctl->used = 0;
lfc_ctl->limit = 0;
dlist_init(&lfc_ctl->lru);
if (lfc_desc > 0)
{
/* If the reason of error is ENOSPC, then truncation of file may help to reclaim some space */
int rc = ftruncate(lfc_desc, 0);
if (rc < 0)
elog(WARNING, "Failed to truncate local file cache %s: %m", lfc_path);
}
}
LWLockRelease(lfc_lock);
if (lfc_desc > 0) if (lfc_desc > 0)
close(lfc_desc); close(lfc_desc);
lfc_desc = -1; lfc_desc = -1;
lfc_size_limit = 0; }
/* Invalidate hash */ /*
LWLockAcquire(lfc_lock, LW_EXCLUSIVE); * This check is done without obtaining lfc_lock, so it is unreliable
*/
hash_seq_init(&status, lfc_hash); static bool
while ((entry = hash_seq_search(&status)) != NULL) lfc_maybe_disabled(void)
{ {
hash_search(lfc_hash, &entry->key, HASH_REMOVE, NULL); return !lfc_ctl || !LFC_ENABLED();
memset(entry->bitmap, 0, sizeof entry->bitmap);
}
hash_seq_term(&status);
lfc_ctl->generation += 1;
lfc_ctl->size = 0;
lfc_ctl->used = 0;
dlist_init(&lfc_ctl->lru);
LWLockRelease(lfc_lock);
} }
static bool static bool
lfc_ensure_opened(void) lfc_ensure_opened(void)
{ {
bool enabled = !lfc_maybe_disabled();
/* Open cache file if not done yet */ /* Open cache file if not done yet */
if (lfc_desc <= 0) if (lfc_desc <= 0 && enabled)
{ {
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT); lfc_desc = BasicOpenFile(lfc_path, O_RDWR);
if (lfc_desc < 0) { if (lfc_desc < 0) {
lfc_disable("open"); lfc_disable("open");
return false; return false;
} }
} }
return true; return enabled;
} }
static void static void
@@ -172,6 +197,7 @@ lfc_shmem_startup(void)
lfc_ctl = (FileCacheControl*)ShmemInitStruct("lfc", sizeof(FileCacheControl), &found); lfc_ctl = (FileCacheControl*)ShmemInitStruct("lfc", sizeof(FileCacheControl), &found);
if (!found) if (!found)
{ {
int fd;
uint32 lfc_size = SIZE_MB_TO_CHUNKS(lfc_max_size); uint32 lfc_size = SIZE_MB_TO_CHUNKS(lfc_max_size);
lfc_lock = (LWLockId)GetNamedLWLockTranche("lfc_lock"); lfc_lock = (LWLockId)GetNamedLWLockTranche("lfc_lock");
info.keysize = sizeof(BufferTag); info.keysize = sizeof(BufferTag);
@@ -184,10 +210,22 @@ lfc_shmem_startup(void)
lfc_ctl->generation = 0; lfc_ctl->generation = 0;
lfc_ctl->size = 0; lfc_ctl->size = 0;
lfc_ctl->used = 0; lfc_ctl->used = 0;
lfc_ctl->hits = 0;
lfc_ctl->misses = 0;
dlist_init(&lfc_ctl->lru); dlist_init(&lfc_ctl->lru);
/* Remove file cache on restart */ /* Recreate file cache on restart */
(void)unlink(lfc_path); fd = BasicOpenFile(lfc_path, O_RDWR|O_CREAT|O_TRUNC);
if (fd < 0)
{
elog(WARNING, "Failed to create local file cache %s: %m", lfc_path);
lfc_ctl->limit = 0;
}
else
{
close(fd);
lfc_ctl->limit = SIZE_MB_TO_CHUNKS(lfc_size_limit);
}
} }
LWLockRelease(AddinShmemInitLock); LWLockRelease(AddinShmemInitLock);
} }
@@ -204,6 +242,17 @@ lfc_shmem_request(void)
RequestNamedLWLockTranche("lfc_lock", 1); RequestNamedLWLockTranche("lfc_lock", 1);
} }
static bool
is_normal_backend(void)
{
/*
* Stats collector detach shared memory, so we should not try to access shared memory here.
* Parallel workers first assign default value (0), so not perform truncation in parallel workers.
* The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC.
*/
return lfc_ctl && MyProc && UsedShmemSegAddr && !IsParallelWorker();
}
static bool static bool
lfc_check_limit_hook(int *newval, void **extra, GucSource source) lfc_check_limit_hook(int *newval, void **extra, GucSource source)
{ {
@@ -219,25 +268,15 @@ static void
lfc_change_limit_hook(int newval, void *extra) lfc_change_limit_hook(int newval, void *extra)
{ {
uint32 new_size = SIZE_MB_TO_CHUNKS(newval); uint32 new_size = SIZE_MB_TO_CHUNKS(newval);
/*
* Stats collector detach shared memory, so we should not try to access shared memory here. if (!is_normal_backend())
* Parallel workers first assign default value (0), so not perform truncation in parallel workers. return;
* The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC.
*/ if (!lfc_ensure_opened())
if (!lfc_ctl || !MyProc || !UsedShmemSegAddr || IsParallelWorker())
return; return;
/* Open cache file if not done yet */
if (lfc_desc <= 0)
{
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
if (lfc_desc < 0) {
elog(WARNING, "Failed to open file cache %s: %m, disabling file cache", lfc_path);
lfc_size_limit = 0; /* disable file cache */
return;
}
}
LWLockAcquire(lfc_lock, LW_EXCLUSIVE); LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
while (new_size < lfc_ctl->used && !dlist_is_empty(&lfc_ctl->lru)) while (new_size < lfc_ctl->used && !dlist_is_empty(&lfc_ctl->lru))
{ {
/* Shrink cache by throwing away least recently accessed chunks and returning their space to file system */ /* Shrink cache by throwing away least recently accessed chunks and returning their space to file system */
@@ -247,10 +286,12 @@ lfc_change_limit_hook(int newval, void *extra)
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE, (off_t)victim->offset*BLOCKS_PER_CHUNK*BLCKSZ, BLOCKS_PER_CHUNK*BLCKSZ) < 0) if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE, (off_t)victim->offset*BLOCKS_PER_CHUNK*BLCKSZ, BLOCKS_PER_CHUNK*BLCKSZ) < 0)
elog(LOG, "Failed to punch hole in file: %m"); elog(LOG, "Failed to punch hole in file: %m");
#endif #endif
hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL); hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
lfc_ctl->used -= 1; lfc_ctl->used -= 1;
} }
lfc_ctl->limit = new_size;
elog(DEBUG1, "set local file cache limit to %d", new_size); elog(DEBUG1, "set local file cache limit to %d", new_size);
LWLockRelease(lfc_lock); LWLockRelease(lfc_lock);
} }
@@ -268,7 +309,7 @@ lfc_change_limit_hook(int newval, void *extra)
* disk space with maximal possible disk write speed (1Gb/sec). But not larger than 1 second. * disk space with maximal possible disk write speed (1Gb/sec). But not larger than 1 second.
* Calling statvfs each second should not add any noticeable overhead. * Calling statvfs each second should not add any noticeable overhead.
*/ */
void PGDLLEXPORT void
FileCacheMonitorMain(Datum main_arg) FileCacheMonitorMain(Datum main_arg)
{ {
/* /*
@@ -286,7 +327,7 @@ FileCacheMonitorMain(Datum main_arg)
/* Periodically dump buffers until terminated. */ /* Periodically dump buffers until terminated. */
while (!ShutdownRequestPending) while (!ShutdownRequestPending)
{ {
if (lfc_size_limit != 0) if (!lfc_maybe_disabled())
{ {
struct statvfs sfs; struct statvfs sfs;
if (statvfs(lfc_path, &sfs) < 0) if (statvfs(lfc_path, &sfs) < 0)
@@ -300,7 +341,7 @@ FileCacheMonitorMain(Datum main_arg)
if (lfc_shrinking_factor < 31) { if (lfc_shrinking_factor < 31) {
lfc_shrinking_factor += 1; lfc_shrinking_factor += 1;
} }
lfc_change_limit_hook(lfc_size_limit >> lfc_shrinking_factor, NULL); lfc_change_limit_hook(lfc_ctl->limit >> lfc_shrinking_factor, NULL);
} }
else else
lfc_shrinking_factor = 0; /* reset to initial value */ lfc_shrinking_factor = 0; /* reset to initial value */
@@ -338,6 +379,7 @@ lfc_init(void)
if (!process_shared_preload_libraries_in_progress) if (!process_shared_preload_libraries_in_progress)
elog(ERROR, "Neon module should be loaded via shared_preload_libraries"); elog(ERROR, "Neon module should be loaded via shared_preload_libraries");
DefineCustomIntVariable("neon.max_file_cache_size", DefineCustomIntVariable("neon.max_file_cache_size",
"Maximal size of Neon local file cache", "Maximal size of Neon local file cache",
NULL, NULL,
@@ -414,10 +456,10 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
BufferTag tag; BufferTag tag;
FileCacheEntry* entry; FileCacheEntry* entry;
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1); int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
bool found; bool found = false;
uint32 hash; uint32 hash;
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */ if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return false; return false;
CopyNRelFileInfoToBufTag(tag, rinfo); CopyNRelFileInfoToBufTag(tag, rinfo);
@@ -426,8 +468,11 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
hash = get_hash_value(lfc_hash, &tag); hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_SHARED); LWLockAcquire(lfc_lock, LW_SHARED);
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL); if (LFC_ENABLED())
found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0; {
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0;
}
LWLockRelease(lfc_lock); LWLockRelease(lfc_lock);
return found; return found;
} }
@@ -444,7 +489,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1); int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
uint32 hash; uint32 hash;
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */ if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return; return;
CopyNRelFileInfoToBufTag(tag, rinfo); CopyNRelFileInfoToBufTag(tag, rinfo);
@@ -454,6 +499,13 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
hash = get_hash_value(lfc_hash, &tag); hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE); LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (!LFC_ENABLED())
{
LWLockRelease(lfc_lock);
return;
}
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, &found); entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, &found);
if (!found) if (!found)
@@ -504,7 +556,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
/* /*
* Try to read page from local cache. * Try to read page from local cache.
* Returns true if page is found in local cache. * Returns true if page is found in local cache.
* In case of error lfc_size_limit is set to zero to disable any further opera-tins with cache. * In case of error local file cache is disabled (lfc->limit is set to zero).
*/ */
bool bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
@@ -519,7 +571,7 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
uint64 generation; uint64 generation;
uint32 entry_offset; uint32 entry_offset;
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */ if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return false; return false;
if (!lfc_ensure_opened()) if (!lfc_ensure_opened())
@@ -531,10 +583,18 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
hash = get_hash_value(lfc_hash, &tag); hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE); LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (!LFC_ENABLED())
{
LWLockRelease(lfc_lock);
return false;
}
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL); entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
if (entry == NULL || (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) == 0) if (entry == NULL || (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) == 0)
{ {
/* Page is not cached */ /* Page is not cached */
lfc_ctl->misses += 1; /* race condition here, but precise value is not needed */
LWLockRelease(lfc_lock); LWLockRelease(lfc_lock);
return false; return false;
} }
@@ -555,8 +615,11 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
/* Place entry to the head of LRU list */ /* Place entry to the head of LRU list */
LWLockAcquire(lfc_lock, LW_EXCLUSIVE); LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (lfc_ctl->generation == generation) if (lfc_ctl->generation == generation)
{ {
Assert(LFC_ENABLED());
lfc_ctl->hits += 1;
Assert(entry->access_count > 0); Assert(entry->access_count > 0);
if (--entry->access_count == 0) if (--entry->access_count == 0)
dlist_push_tail(&lfc_ctl->lru, &entry->lru_node); dlist_push_tail(&lfc_ctl->lru, &entry->lru_node);
@@ -588,7 +651,7 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1); int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
uint32 hash; uint32 hash;
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */ if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return; return;
if (!lfc_ensure_opened()) if (!lfc_ensure_opened())
@@ -596,12 +659,17 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
tag.forkNum = forkNum; tag.forkNum = forkNum;
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1); tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1);
CopyNRelFileInfoToBufTag(tag, rinfo); CopyNRelFileInfoToBufTag(tag, rinfo);
hash = get_hash_value(lfc_hash, &tag); hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE); LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (!LFC_ENABLED())
{
LWLockRelease(lfc_lock);
return;
}
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found); entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
if (found) if (found)
@@ -620,13 +688,13 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
* there are should be very large number of concurrent IO operations and them are limited by max_connections, * there are should be very large number of concurrent IO operations and them are limited by max_connections,
* we prefer not to complicate code and use second approach. * we prefer not to complicate code and use second approach.
*/ */
if (lfc_ctl->used >= SIZE_MB_TO_CHUNKS(lfc_size_limit) && !dlist_is_empty(&lfc_ctl->lru)) if (lfc_ctl->used >= lfc_ctl->limit && !dlist_is_empty(&lfc_ctl->lru))
{ {
/* Cache overflow: evict least recently used chunk */ /* Cache overflow: evict least recently used chunk */
FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru)); FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru));
Assert(victim->access_count == 0); Assert(victim->access_count == 0);
entry->offset = victim->offset; /* grab victim's chunk */ entry->offset = victim->offset; /* grab victim's chunk */
hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL); hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
elog(DEBUG2, "Swap file cache page"); elog(DEBUG2, "Swap file cache page");
} }
else else
@@ -635,6 +703,7 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
entry->offset = lfc_ctl->size++; /* allocate new chunk at end of file */ entry->offset = lfc_ctl->size++; /* allocate new chunk at end of file */
} }
entry->access_count = 1; entry->access_count = 1;
entry->hash = hash;
memset(entry->bitmap, 0, sizeof entry->bitmap); memset(entry->bitmap, 0, sizeof entry->bitmap);
} }
@@ -679,6 +748,23 @@ typedef struct
LocalCachePagesRec *record; LocalCachePagesRec *record;
} LocalCachePagesContext; } LocalCachePagesContext;
PG_FUNCTION_INFO_V1(local_cache_hits);
Datum
local_cache_hits(PG_FUNCTION_ARGS)
{
PG_RETURN_INT64(lfc_ctl ? lfc_ctl->hits : -1);
}
PG_FUNCTION_INFO_V1(local_cache_misses);
Datum
local_cache_misses(PG_FUNCTION_ARGS)
{
PG_RETURN_INT64(lfc_ctl ? lfc_ctl->misses : -1);
}
/* /*
* Function returning data from the local file cache * Function returning data from the local file cache
* relation node/tablespace/database/blocknum and access_counter * relation node/tablespace/database/blocknum and access_counter
@@ -752,13 +838,15 @@ local_cache_pages(PG_FUNCTION_ARGS)
LWLockAcquire(lfc_lock, LW_SHARED); LWLockAcquire(lfc_lock, LW_SHARED);
hash_seq_init(&status, lfc_hash); if (LFC_ENABLED())
while ((entry = hash_seq_search(&status)) != NULL)
{ {
for (int i = 0; i < BLOCKS_PER_CHUNK; i++) hash_seq_init(&status, lfc_hash);
n_pages += (entry->bitmap[i >> 5] & (1 << (i & 31))) != 0; while ((entry = hash_seq_search(&status)) != NULL)
{
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
n_pages += (entry->bitmap[i >> 5] & (1 << (i & 31))) != 0;
}
} }
hash_seq_term(&status);
fctx->record = (LocalCachePagesRec *) fctx->record = (LocalCachePagesRec *)
MemoryContextAllocHuge(CurrentMemoryContext, MemoryContextAllocHuge(CurrentMemoryContext,
sizeof(LocalCachePagesRec) * n_pages); sizeof(LocalCachePagesRec) * n_pages);
@@ -770,35 +858,37 @@ local_cache_pages(PG_FUNCTION_ARGS)
/* Return to original context when allocating transient memory */ /* Return to original context when allocating transient memory */
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
/* if (n_pages != 0)
* Scan through all the buffers, saving the relevant fields in the
* fctx->record structure.
*
* We don't hold the partition locks, so we don't get a consistent
* snapshot across all buffers, but we do grab the buffer header
* locks, so the information of each buffer is self-consistent.
*/
n_pages = 0;
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
{ {
for (int i = 0; i < BLOCKS_PER_CHUNK; i++) /*
* Scan through all the buffers, saving the relevant fields in the
* fctx->record structure.
*
* We don't hold the partition locks, so we don't get a consistent
* snapshot across all buffers, but we do grab the buffer header
* locks, so the information of each buffer is self-consistent.
*/
n_pages = 0;
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
{ {
if (entry->bitmap[i >> 5] & (1 << (i & 31))) for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
{ {
fctx->record[n_pages].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i; if (entry->bitmap[i >> 5] & (1 << (i & 31)))
fctx->record[n_pages].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)); {
fctx->record[n_pages].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key)); fctx->record[n_pages].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i;
fctx->record[n_pages].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key)); fctx->record[n_pages].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
fctx->record[n_pages].forknum = entry->key.forkNum; fctx->record[n_pages].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n_pages].blocknum = entry->key.blockNum + i; fctx->record[n_pages].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n_pages].accesscount = entry->access_count; fctx->record[n_pages].forknum = entry->key.forkNum;
n_pages += 1; fctx->record[n_pages].blocknum = entry->key.blockNum + i;
fctx->record[n_pages].accesscount = entry->access_count;
n_pages += 1;
}
} }
} }
Assert(n_pages == funcctx->max_calls);
} }
hash_seq_term(&status);
Assert(n_pages == funcctx->max_calls);
LWLockRelease(lfc_lock); LWLockRelease(lfc_lock);
} }

View File

@@ -0,0 +1,74 @@
import os
import random
import threading
import time
from typing import List
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import query_scalar
def test_local_file_cache_unlink(neon_simple_env: NeonEnv):
env = neon_simple_env
cache_dir = os.path.join(env.repo_dir, "file_cache")
os.mkdir(cache_dir)
env.neon_cli.create_branch("test_local_file_cache_unlink", "empty")
endpoint = env.endpoints.create_start(
"test_local_file_cache_unlink",
config_lines=[
"shared_buffers='1MB'",
f"neon.file_cache_path='{cache_dir}/file.cache'",
"neon.max_file_cache_size='64MB'",
"neon.file_cache_size_limit='10MB'",
],
)
cur = endpoint.connect().cursor()
n_rows = 100000
n_threads = 20
n_updates_per_thread = 10000
n_updates_per_connection = 1000
n_total_updates = n_threads * n_updates_per_thread
cur.execute("CREATE TABLE lfctest (id int4 PRIMARY KEY, n int) WITH (fillfactor=10)")
cur.execute(f"INSERT INTO lfctest SELECT g, 1 FROM generate_series(1, {n_rows}) g")
# Start threads that will perform random UPDATEs. Each UPDATE
# increments the counter on the row, so that we can check at the
# end that the sum of all the counters match the number of updates
# performed (plus the initial 1 on each row).
#
# Furthermore, each thread will reconnect between every 1000 updates.
def run_updates():
n_updates_performed = 0
conn = endpoint.connect()
cur = conn.cursor()
for _ in range(n_updates_per_thread):
id = random.randint(1, n_rows)
cur.execute(f"UPDATE lfctest SET n = n + 1 WHERE id = {id}")
n_updates_performed += 1
if n_updates_performed % n_updates_per_connection == 0:
cur.close()
conn.close()
conn = endpoint.connect()
cur = conn.cursor()
threads: List[threading.Thread] = []
for _i in range(n_threads):
thread = threading.Thread(target=run_updates, args=(), daemon=True)
thread.start()
threads.append(thread)
time.sleep(5)
new_cache_dir = os.path.join(env.repo_dir, "file_cache_new")
os.rename(cache_dir, new_cache_dir)
for thread in threads:
thread.join()
assert query_scalar(cur, "SELECT SUM(n) FROM lfctest") == n_total_updates + n_rows