From 6afbadc90e8b97bce72cddd362ecf7500e263752 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 23 Nov 2023 08:59:19 +0200 Subject: [PATCH] LFC fixes + statistics (#5727) ## Problem ## Summary of changes See #5500 ## Checklist before requesting a review - [ ] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. ## Checklist before merging - [ ] Do not forget to reformat commit message to not include the above checklist --------- Co-authored-by: Konstantin Knizhnik --- pgxn/neon/Makefile | 2 +- pgxn/neon/file_cache.c | 383 ++++++++++++++----- pgxn/neon/neon--1.0--1.1.sql | 10 + pgxn/neon/neon.control | 2 +- test_runner/regress/test_local_file_cache.py | 74 ++++ 5 files changed, 373 insertions(+), 98 deletions(-) create mode 100644 pgxn/neon/neon--1.0--1.1.sql create mode 100644 test_runner/regress/test_local_file_cache.py diff --git a/pgxn/neon/Makefile b/pgxn/neon/Makefile index 84835843bc..7fc99523db 100644 --- a/pgxn/neon/Makefile +++ b/pgxn/neon/Makefile @@ -20,7 +20,7 @@ SHLIB_LINK_INTERNAL = $(libpq) SHLIB_LINK = -lcurl EXTENSION = neon -DATA = neon--1.0.sql +DATA = neon--1.0.sql neon--1.0--1.1.sql PGFILEDESC = "neon - cloud storage for PostgreSQL" EXTRA_CLEAN = \ diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 22f20a4c0b..b28b95e42f 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -32,11 +32,13 @@ #include "storage/latch.h" #include "storage/ipc.h" #include "storage/lwlock.h" +#include "utils/builtins.h" #include "utils/dynahash.h" #include "utils/guc.h" #include "storage/fd.h" #include "storage/pg_shmem.h" #include "storage/buf_internals.h" +#include "pgstat.h" /* * Local file cache is used to temporary store relations pages in local file system. @@ -65,6 +67,7 @@ typedef struct FileCacheEntry { BufferTag key; + uint32 hash; uint32 offset; uint32 access_count; uint32 bitmap[BLOCKS_PER_CHUNK/32]; @@ -76,6 +79,10 @@ typedef struct FileCacheControl uint64 generation; /* generation is needed to handle correct hash reenabling */ uint32 size; /* size of cache file in chunks */ uint32 used; /* number of used chunks */ + uint32 limit; /* shared copy of lfc_size_limit */ + uint64 hits; + uint64 misses; + uint64 writes; dlist_head lru; /* double linked list for LRU replacement algorithm */ } FileCacheControl; @@ -91,10 +98,12 @@ static shmem_startup_hook_type prev_shmem_startup_hook; static shmem_request_hook_type prev_shmem_request_hook; #endif -void FileCacheMonitorMain(Datum main_arg); +#define LFC_ENABLED() (lfc_ctl->limit != 0) + +void PGDLLEXPORT FileCacheMonitorMain(Datum main_arg); /* - * Local file cache is mandatory and Neon can work without it. + * Local file cache is optional and Neon can work without it. * In case of any any errors with this cache, we should disable it but to not throw error. * Also we should allow re-enable it if source of failure (lack of disk space, permissions,...) is fixed. * All cache content should be invalidated to avoid reading of stale or corrupted data @@ -102,49 +111,77 @@ void FileCacheMonitorMain(Datum main_arg); static void lfc_disable(char const* op) { - HASH_SEQ_STATUS status; - FileCacheEntry* entry; - + int fd; elog(WARNING, "Failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path); + /* Invalidate hash */ + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + + if (LFC_ENABLED()) + { + HASH_SEQ_STATUS status; + FileCacheEntry* entry; + + hash_seq_init(&status, lfc_hash); + while ((entry = hash_seq_search(&status)) != NULL) + { + hash_search_with_hash_value(lfc_hash, &entry->key, entry->hash, HASH_REMOVE, NULL); + } + lfc_ctl->generation += 1; + lfc_ctl->size = 0; + lfc_ctl->used = 0; + lfc_ctl->limit = 0; + dlist_init(&lfc_ctl->lru); + + if (lfc_desc > 0) + { + /* If the reason of error is ENOSPC, then truncation of file may help to reclaim some space */ + int rc = ftruncate(lfc_desc, 0); + if (rc < 0) + elog(WARNING, "Failed to truncate local file cache %s: %m", lfc_path); + } + } + /* We need to use unlink to to avoid races in LFC write, because it is not protectedby */ + unlink(lfc_path); + + fd = BasicOpenFile(lfc_path, O_RDWR|O_CREAT|O_TRUNC); + if (fd < 0) + elog(WARNING, "Failed to recreate local file cache %s: %m", lfc_path); + else + close(fd); + + LWLockRelease(lfc_lock); + if (lfc_desc > 0) close(lfc_desc); lfc_desc = -1; - lfc_size_limit = 0; +} - /* Invalidate hash */ - LWLockAcquire(lfc_lock, LW_EXCLUSIVE); - - hash_seq_init(&status, lfc_hash); - while ((entry = hash_seq_search(&status)) != NULL) - { - hash_search(lfc_hash, &entry->key, HASH_REMOVE, NULL); - memset(entry->bitmap, 0, sizeof entry->bitmap); - } - hash_seq_term(&status); - lfc_ctl->generation += 1; - lfc_ctl->size = 0; - lfc_ctl->used = 0; - dlist_init(&lfc_ctl->lru); - - LWLockRelease(lfc_lock); +/* + * This check is done without obtaining lfc_lock, so it is unreliable + */ +static bool +lfc_maybe_disabled(void) +{ + return !lfc_ctl || !LFC_ENABLED(); } static bool lfc_ensure_opened(void) { + bool enabled = !lfc_maybe_disabled(); /* Open cache file if not done yet */ - if (lfc_desc <= 0) + if (lfc_desc <= 0 && enabled) { - lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT); + lfc_desc = BasicOpenFile(lfc_path, O_RDWR); if (lfc_desc < 0) { lfc_disable("open"); return false; } } - return true; + return enabled; } static void @@ -163,6 +200,7 @@ lfc_shmem_startup(void) lfc_ctl = (FileCacheControl*)ShmemInitStruct("lfc", sizeof(FileCacheControl), &found); if (!found) { + int fd; uint32 lfc_size = SIZE_MB_TO_CHUNKS(lfc_max_size); lfc_lock = (LWLockId)GetNamedLWLockTranche("lfc_lock"); info.keysize = sizeof(BufferTag); @@ -175,10 +213,23 @@ lfc_shmem_startup(void) lfc_ctl->generation = 0; lfc_ctl->size = 0; lfc_ctl->used = 0; + lfc_ctl->hits = 0; + lfc_ctl->misses = 0; + lfc_ctl->writes = 0; dlist_init(&lfc_ctl->lru); - /* Remove file cache on restart */ - (void)unlink(lfc_path); + /* Recreate file cache on restart */ + fd = BasicOpenFile(lfc_path, O_RDWR|O_CREAT|O_TRUNC); + if (fd < 0) + { + elog(WARNING, "Failed to create local file cache %s: %m", lfc_path); + lfc_ctl->limit = 0; + } + else + { + close(fd); + lfc_ctl->limit = SIZE_MB_TO_CHUNKS(lfc_size_limit); + } } LWLockRelease(AddinShmemInitLock); } @@ -195,6 +246,17 @@ lfc_shmem_request(void) RequestNamedLWLockTranche("lfc_lock", 1); } +static bool +is_normal_backend(void) +{ + /* + * Stats collector detach shared memory, so we should not try to access shared memory here. + * Parallel workers first assign default value (0), so not perform truncation in parallel workers. + * The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC. + */ + return lfc_ctl && MyProc && UsedShmemSegAddr && !IsParallelWorker(); +} + static bool lfc_check_limit_hook(int *newval, void **extra, GucSource source) { @@ -210,25 +272,15 @@ static void lfc_change_limit_hook(int newval, void *extra) { uint32 new_size = SIZE_MB_TO_CHUNKS(newval); - /* - * Stats collector detach shared memory, so we should not try to access shared memory here. - * Parallel workers first assign default value (0), so not perform truncation in parallel workers. - * The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC. - */ - if (!lfc_ctl || !MyProc || !UsedShmemSegAddr || IsParallelWorker()) + + if (!is_normal_backend()) + return; + + if (!lfc_ensure_opened()) return; - /* Open cache file if not done yet */ - if (lfc_desc <= 0) - { - lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT); - if (lfc_desc < 0) { - elog(WARNING, "Failed to open file cache %s: %m, disabling file cache", lfc_path); - lfc_size_limit = 0; /* disable file cache */ - return; - } - } LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + while (new_size < lfc_ctl->used && !dlist_is_empty(&lfc_ctl->lru)) { /* Shrink cache by throwing away least recently accessed chunks and returning their space to file system */ @@ -238,10 +290,12 @@ lfc_change_limit_hook(int newval, void *extra) if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE, (off_t)victim->offset*BLOCKS_PER_CHUNK*BLCKSZ, BLOCKS_PER_CHUNK*BLCKSZ) < 0) elog(LOG, "Failed to punch hole in file: %m"); #endif - hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL); + hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL); lfc_ctl->used -= 1; } + lfc_ctl->limit = new_size; elog(DEBUG1, "set local file cache limit to %d", new_size); + LWLockRelease(lfc_lock); } @@ -255,6 +309,7 @@ lfc_init(void) if (!process_shared_preload_libraries_in_progress) elog(ERROR, "Neon module should be loaded via shared_preload_libraries"); + DefineCustomIntVariable("neon.max_file_cache_size", "Maximal size of Neon local file cache", NULL, @@ -315,10 +370,10 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) BufferTag tag; FileCacheEntry* entry; int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1); - bool found; + bool found = false; uint32 hash; - if (lfc_size_limit == 0) /* fast exit if file cache is disabled */ + if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return false; CopyNRelFileInfoToBufTag(tag, rinfo); @@ -327,8 +382,11 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) hash = get_hash_value(lfc_hash, &tag); LWLockAcquire(lfc_lock, LW_SHARED); - entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL); - found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0; + if (LFC_ENABLED()) + { + entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL); + found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0; + } LWLockRelease(lfc_lock); return found; } @@ -345,7 +403,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1); uint32 hash; - if (lfc_size_limit == 0) /* fast exit if file cache is disabled */ + if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return; CopyNRelFileInfoToBufTag(tag, rinfo); @@ -355,6 +413,13 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) hash = get_hash_value(lfc_hash, &tag); LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + + if (!LFC_ENABLED()) + { + LWLockRelease(lfc_lock); + return; + } + entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, &found); if (!found) @@ -405,7 +470,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) /* * Try to read page from local cache. * Returns true if page is found in local cache. - * In case of error lfc_size_limit is set to zero to disable any further opera-tins with cache. + * In case of error local file cache is disabled (lfc->limit is set to zero). */ bool lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, @@ -420,7 +485,7 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, uint64 generation; uint32 entry_offset; - if (lfc_size_limit == 0) /* fast exit if file cache is disabled */ + if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return false; if (!lfc_ensure_opened()) @@ -432,10 +497,18 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, hash = get_hash_value(lfc_hash, &tag); LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + + if (!LFC_ENABLED()) + { + LWLockRelease(lfc_lock); + return false; + } + entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL); if (entry == NULL || (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) == 0) { /* Page is not cached */ + lfc_ctl->misses += 1; LWLockRelease(lfc_lock); return false; } @@ -456,8 +529,11 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, /* Place entry to the head of LRU list */ LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + if (lfc_ctl->generation == generation) { + Assert(LFC_ENABLED()); + lfc_ctl->hits += 1; Assert(entry->access_count > 0); if (--entry->access_count == 0) dlist_push_tail(&lfc_ctl->lru, &entry->lru_node); @@ -488,8 +564,10 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, bool found; int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1); uint32 hash; + uint64 generation; + uint32 entry_offset; - if (lfc_size_limit == 0) /* fast exit if file cache is disabled */ + if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return; if (!lfc_ensure_opened()) @@ -497,12 +575,17 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, tag.forkNum = forkNum; tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1); - CopyNRelFileInfoToBufTag(tag, rinfo); - hash = get_hash_value(lfc_hash, &tag); LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + + if (!LFC_ENABLED()) + { + LWLockRelease(lfc_lock); + return; + } + entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found); if (found) @@ -521,13 +604,13 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, * there are should be very large number of concurrent IO operations and them are limited by max_connections, * we prefer not to complicate code and use second approach. */ - if (lfc_ctl->used >= SIZE_MB_TO_CHUNKS(lfc_size_limit) && !dlist_is_empty(&lfc_ctl->lru)) + if (lfc_ctl->used >= lfc_ctl->limit && !dlist_is_empty(&lfc_ctl->lru)) { /* Cache overflow: evict least recently used chunk */ FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru)); Assert(victim->access_count == 0); entry->offset = victim->offset; /* grab victim's chunk */ - hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL); + hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL); elog(DEBUG2, "Swap file cache page"); } else @@ -536,27 +619,140 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, entry->offset = lfc_ctl->size++; /* allocate new chunk at end of file */ } entry->access_count = 1; + entry->hash = hash; memset(entry->bitmap, 0, sizeof entry->bitmap); } - rc = pwrite(lfc_desc, buffer, BLCKSZ, ((off_t)entry->offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ); + generation = lfc_ctl->generation; + entry_offset = entry->offset; + lfc_ctl->writes += 1; + LWLockRelease(lfc_lock); + + rc = pwrite(lfc_desc, buffer, BLCKSZ, ((off_t)entry_offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ); if (rc != BLCKSZ) { - LWLockRelease(lfc_lock); lfc_disable("write"); } else { - /* Place entry to the head of LRU list */ - Assert(entry->access_count > 0); - if (--entry->access_count == 0) - dlist_push_tail(&lfc_ctl->lru, &entry->lru_node); + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + + if (lfc_ctl->generation == generation) + { + Assert(LFC_ENABLED()); + /* Place entry to the head of LRU list */ + Assert(entry->access_count > 0); + if (--entry->access_count == 0) + dlist_push_tail(&lfc_ctl->lru, &entry->lru_node); + + entry->bitmap[chunk_offs >> 5] |= (1 << (chunk_offs & 31)); + } - entry->bitmap[chunk_offs >> 5] |= (1 << (chunk_offs & 31)); LWLockRelease(lfc_lock); } } +typedef struct +{ + TupleDesc tupdesc; +} NeonGetStatsCtx; + +#define NUM_NEON_GET_STATS_COLS 2 +#define NUM_NEON_GET_STATS_ROWS 3 + +PG_FUNCTION_INFO_V1(neon_get_lfc_stats); +Datum +neon_get_lfc_stats(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + NeonGetStatsCtx* fctx; + MemoryContext oldcontext; + TupleDesc tupledesc; + Datum result; + HeapTuple tuple; + char const* key; + uint64 value; + Datum values[NUM_NEON_GET_STATS_COLS]; + bool nulls[NUM_NEON_GET_STATS_COLS]; + + if (SRF_IS_FIRSTCALL()) + { + funcctx = SRF_FIRSTCALL_INIT(); + + /* Switch context when allocating stuff to be used in later calls */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + /* Create a user function context for cross-call persistence */ + fctx = (NeonGetStatsCtx*) palloc(sizeof(NeonGetStatsCtx)); + + /* Construct a tuple descriptor for the result rows. */ + tupledesc = CreateTemplateTupleDesc(NUM_NEON_GET_STATS_COLS); + + TupleDescInitEntry(tupledesc, (AttrNumber) 1, "lfc_key", + TEXTOID, -1, 0); + TupleDescInitEntry(tupledesc, (AttrNumber) 2, "lfc_value", + INT8OID, -1, 0); + + fctx->tupdesc = BlessTupleDesc(tupledesc); + funcctx->max_calls = NUM_NEON_GET_STATS_ROWS; + funcctx->user_fctx = fctx; + + /* Return to original context when allocating transient memory */ + MemoryContextSwitchTo(oldcontext); + } + + funcctx = SRF_PERCALL_SETUP(); + + /* Get the saved state */ + fctx = (NeonGetStatsCtx*) funcctx->user_fctx; + + switch (funcctx->call_cntr) + { + case 0: + key = "file_cache_misses"; + if (lfc_ctl) + value = lfc_ctl->misses; + break; + case 1: + key = "file_cache_hits"; + if (lfc_ctl) + value = lfc_ctl->hits; + break; + case 2: + key = "file_cache_used"; + if (lfc_ctl) + value = lfc_ctl->used; + break; + case 3: + key = "file_cache_writes"; + if (lfc_ctl) + value = lfc_ctl->writes; + break; + default: + SRF_RETURN_DONE(funcctx); + } + values[0] = PointerGetDatum(cstring_to_text(key)); + nulls[0] = false; + if (lfc_ctl) + { + nulls[1] = false; + values[1] = Int64GetDatum(value); + } + else + nulls[1] = true; + + tuple = heap_form_tuple(fctx->tupdesc, values, nulls); + result = HeapTupleGetDatum(tuple); + SRF_RETURN_NEXT(funcctx, result); +} + + +/* + * Function returning data from the local file cache + * relation node/tablespace/database/blocknum and access_counter + */ +PG_FUNCTION_INFO_V1(local_cache_pages); + /* * Record structure holding the to be exposed cache data. */ @@ -580,11 +776,6 @@ typedef struct LocalCachePagesRec *record; } LocalCachePagesContext; -/* - * Function returning data from the local file cache - * relation node/tablespace/database/blocknum and access_counter - */ -PG_FUNCTION_INFO_V1(local_cache_pages); #define NUM_LOCALCACHE_PAGES_ELEM 7 @@ -653,13 +844,15 @@ local_cache_pages(PG_FUNCTION_ARGS) LWLockAcquire(lfc_lock, LW_SHARED); - hash_seq_init(&status, lfc_hash); - while ((entry = hash_seq_search(&status)) != NULL) + if (LFC_ENABLED()) { - for (int i = 0; i < BLOCKS_PER_CHUNK; i++) - n_pages += (entry->bitmap[i >> 5] & (1 << (i & 31))) != 0; + hash_seq_init(&status, lfc_hash); + while ((entry = hash_seq_search(&status)) != NULL) + { + for (int i = 0; i < BLOCKS_PER_CHUNK/32; i++) + n_pages += pg_popcount32(entry->bitmap[i]); + } } - hash_seq_term(&status); fctx->record = (LocalCachePagesRec *) MemoryContextAllocHuge(CurrentMemoryContext, sizeof(LocalCachePagesRec) * n_pages); @@ -671,35 +864,33 @@ local_cache_pages(PG_FUNCTION_ARGS) /* Return to original context when allocating transient memory */ MemoryContextSwitchTo(oldcontext); - /* - * Scan through all the buffers, saving the relevant fields in the - * fctx->record structure. - * - * We don't hold the partition locks, so we don't get a consistent - * snapshot across all buffers, but we do grab the buffer header - * locks, so the information of each buffer is self-consistent. - */ - n_pages = 0; - hash_seq_init(&status, lfc_hash); - while ((entry = hash_seq_search(&status)) != NULL) + if (n_pages != 0) { - for (int i = 0; i < BLOCKS_PER_CHUNK; i++) + /* + * Scan through all the cache entries, saving the relevant fields in the + * fctx->record structure. + */ + uint32 n = 0; + hash_seq_init(&status, lfc_hash); + while ((entry = hash_seq_search(&status)) != NULL) { - if (entry->bitmap[i >> 5] & (1 << (i & 31))) + for (int i = 0; i < BLOCKS_PER_CHUNK; i++) { - fctx->record[n_pages].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i; - fctx->record[n_pages].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)); - fctx->record[n_pages].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key)); - fctx->record[n_pages].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key)); - fctx->record[n_pages].forknum = entry->key.forkNum; - fctx->record[n_pages].blocknum = entry->key.blockNum + i; - fctx->record[n_pages].accesscount = entry->access_count; - n_pages += 1; + if (entry->bitmap[i >> 5] & (1 << (i & 31))) + { + fctx->record[n].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i; + fctx->record[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)); + fctx->record[n].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key)); + fctx->record[n].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key)); + fctx->record[n].forknum = entry->key.forkNum; + fctx->record[n].blocknum = entry->key.blockNum + i; + fctx->record[n].accesscount = entry->access_count; + n += 1; + } } } + Assert(n_pages == n); } - hash_seq_term(&status); - Assert(n_pages == funcctx->max_calls); LWLockRelease(lfc_lock); } diff --git a/pgxn/neon/neon--1.0--1.1.sql b/pgxn/neon/neon--1.0--1.1.sql new file mode 100644 index 0000000000..d1760f7299 --- /dev/null +++ b/pgxn/neon/neon--1.0--1.1.sql @@ -0,0 +1,10 @@ +\echo Use "ALTER EXTENSION neon UPDATE TO '1.1'" to load this file. \quit + +CREATE FUNCTION neon_get_lfc_stats() +RETURNS SETOF RECORD +AS 'MODULE_PATHNAME', 'neon_get_lfc_stats' +LANGUAGE C PARALLEL SAFE; + +-- Create a view for convenient access. +CREATE VIEW neon_lfc_stats AS + SELECT P.* FROM neon_get_lfc_stats() AS P (lfc_key text, lfc_value bigint); diff --git a/pgxn/neon/neon.control b/pgxn/neon/neon.control index 84f79881c1..c110437c3e 100644 --- a/pgxn/neon/neon.control +++ b/pgxn/neon/neon.control @@ -1,4 +1,4 @@ # neon extension comment = 'cloud storage for PostgreSQL' -default_version = '1.0' +default_version = '1.1' module_pathname = '$libdir/neon' diff --git a/test_runner/regress/test_local_file_cache.py b/test_runner/regress/test_local_file_cache.py new file mode 100644 index 0000000000..38f2034c18 --- /dev/null +++ b/test_runner/regress/test_local_file_cache.py @@ -0,0 +1,74 @@ +import os +import random +import threading +import time +from typing import List + +from fixtures.neon_fixtures import NeonEnv +from fixtures.utils import query_scalar + + +def test_local_file_cache_unlink(neon_simple_env: NeonEnv): + env = neon_simple_env + + cache_dir = os.path.join(env.repo_dir, "file_cache") + os.mkdir(cache_dir) + + env.neon_cli.create_branch("test_local_file_cache_unlink", "empty") + + endpoint = env.endpoints.create_start( + "test_local_file_cache_unlink", + config_lines=[ + "shared_buffers='1MB'", + f"neon.file_cache_path='{cache_dir}/file.cache'", + "neon.max_file_cache_size='64MB'", + "neon.file_cache_size_limit='10MB'", + ], + ) + + cur = endpoint.connect().cursor() + + n_rows = 100000 + n_threads = 20 + n_updates_per_thread = 10000 + n_updates_per_connection = 1000 + n_total_updates = n_threads * n_updates_per_thread + + cur.execute("CREATE TABLE lfctest (id int4 PRIMARY KEY, n int) WITH (fillfactor=10)") + cur.execute(f"INSERT INTO lfctest SELECT g, 1 FROM generate_series(1, {n_rows}) g") + + # Start threads that will perform random UPDATEs. Each UPDATE + # increments the counter on the row, so that we can check at the + # end that the sum of all the counters match the number of updates + # performed (plus the initial 1 on each row). + # + # Furthermore, each thread will reconnect between every 1000 updates. + def run_updates(): + n_updates_performed = 0 + conn = endpoint.connect() + cur = conn.cursor() + for _ in range(n_updates_per_thread): + id = random.randint(1, n_rows) + cur.execute(f"UPDATE lfctest SET n = n + 1 WHERE id = {id}") + n_updates_performed += 1 + if n_updates_performed % n_updates_per_connection == 0: + cur.close() + conn.close() + conn = endpoint.connect() + cur = conn.cursor() + + threads: List[threading.Thread] = [] + for _i in range(n_threads): + thread = threading.Thread(target=run_updates, args=(), daemon=True) + thread.start() + threads.append(thread) + + time.sleep(5) + + new_cache_dir = os.path.join(env.repo_dir, "file_cache_new") + os.rename(cache_dir, new_cache_dir) + + for thread in threads: + thread.join() + + assert query_scalar(cur, "SELECT SUM(n) FROM lfctest") == n_total_updates + n_rows