diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 4be75e1dad..fe752740bf 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -83,12 +83,14 @@ typedef struct FileCacheControl uint64 generation; /* generation is needed to handle correct hash reenabling */ uint32 size; /* size of cache file in chunks */ uint32 used; /* number of used chunks */ + bool enabled; /* local file cache state */ dlist_head lru; /* double linked list for LRU replacement algorithm */ } FileCacheControl; static HTAB* lfc_hash; static int lfc_desc = 0; static LWLockId lfc_lock; +static bool lfc_enabled; static int lfc_max_size; static int lfc_size_limit; static int lfc_free_space_watermark; @@ -111,40 +113,51 @@ void FileCacheMonitorMain(Datum main_arg); static void lfc_disable(char const* op) { - HASH_SEQ_STATUS status; - FileCacheEntry* entry; - - elog(WARNING, "Failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path); + if (op) + elog(WARNING, "Failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path); + else + elog(LOG, "Disable local file cache"); if (lfc_desc > 0) close(lfc_desc); lfc_desc = -1; - lfc_size_limit = 0; /* Invalidate hash */ LWLockAcquire(lfc_lock, LW_EXCLUSIVE); - hash_seq_init(&status, lfc_hash); - while ((entry = hash_seq_search(&status)) != NULL) + if (lfc_ctl->enabled) { - hash_search(lfc_hash, &entry->key, HASH_REMOVE, NULL); - memset(entry->bitmap, 0, sizeof entry->bitmap); - } - hash_seq_term(&status); - lfc_ctl->generation += 1; - lfc_ctl->size = 0; - lfc_ctl->used = 0; - dlist_init(&lfc_ctl->lru); + HASH_SEQ_STATUS status; + FileCacheEntry* entry; + hash_seq_init(&status, lfc_hash); + while ((entry = hash_seq_search(&status)) != NULL) + { + hash_search(lfc_hash, &entry->key, HASH_REMOVE, NULL); + memset(entry->bitmap, 0, sizeof entry->bitmap); + } + lfc_ctl->generation += 1; + lfc_ctl->size = 0; + lfc_ctl->used = 0; + lfc_ctl->enabled = false; + dlist_init(&lfc_ctl->lru); + } LWLockRelease(lfc_lock); } +static bool +lfc_is_enabled(void) +{ + return lfc_ctl && lfc_ctl->enabled; +} + static bool lfc_ensure_opened(void) { + bool enabled = lfc_is_enabled(); /* Open cache file if not done yet */ - if (lfc_desc <= 0) + if (lfc_desc <= 0 && enabled) { lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT); @@ -153,7 +166,7 @@ lfc_ensure_opened(void) return false; } } - return true; + return enabled; } static void @@ -184,6 +197,7 @@ lfc_shmem_startup(void) lfc_ctl->generation = 0; lfc_ctl->size = 0; lfc_ctl->used = 0; + lfc_ctl->enabled = lfc_enabled; dlist_init(&lfc_ctl->lru); /* Remove file cache on restart */ @@ -204,6 +218,39 @@ lfc_shmem_request(void) RequestNamedLWLockTranche("lfc_lock", 1); } +static bool +is_normal_backend(void) +{ + /* + * Stats collector detach shared memory, so we should not try to access shared memory here. + * Parallel workers first assign default value (0), so not perform truncation in parallel workers. + * The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC. + */ + return lfc_ctl && MyProc && UsedShmemSegAddr && !IsParallelWorker(); +} + +static void +lfc_change_state(bool newval, void *extra) +{ + if (!is_normal_backend()) + return; + + if (!newval) + { + if (lfc_ctl) + lfc_disable(NULL); + else + elog(LOG, "Local file cache disabled"); + } + else + { + elog(LOG, "Local file cache enabled"); + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + lfc_ctl->enabled = newval; + LWLockRelease(lfc_lock); + } +} + static bool lfc_check_limit_hook(int *newval, void **extra, GucSource source) { @@ -219,25 +266,20 @@ static void lfc_change_limit_hook(int newval, void *extra) { uint32 new_size = SIZE_MB_TO_CHUNKS(newval); - /* - * Stats collector detach shared memory, so we should not try to access shared memory here. - * Parallel workers first assign default value (0), so not perform truncation in parallel workers. - * The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC. - */ - if (!lfc_ctl || !MyProc || !UsedShmemSegAddr || IsParallelWorker()) + + if (!is_normal_backend()) + return; + + if (!lfc_ensure_opened()) return; - /* Open cache file if not done yet */ - if (lfc_desc <= 0) - { - lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT); - if (lfc_desc < 0) { - elog(WARNING, "Failed to open file cache %s: %m, disabling file cache", lfc_path); - lfc_size_limit = 0; /* disable file cache */ - return; - } - } LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + + if (!lfc_ctl->enabled) + { + LWLockRelease(lfc_lock); + return; + } while (new_size < lfc_ctl->used && !dlist_is_empty(&lfc_ctl->lru)) { /* Shrink cache by throwing away least recently accessed chunks and returning their space to file system */ @@ -251,6 +293,7 @@ lfc_change_limit_hook(int newval, void *extra) lfc_ctl->used -= 1; } elog(DEBUG1, "set local file cache limit to %d", new_size); + LWLockRelease(lfc_lock); } @@ -286,7 +329,7 @@ FileCacheMonitorMain(Datum main_arg) /* Periodically dump buffers until terminated. */ while (!ShutdownRequestPending) { - if (lfc_size_limit != 0) + if (lfc_is_enabled()) { struct statvfs sfs; if (statvfs(lfc_path, &sfs) < 0) @@ -338,6 +381,17 @@ lfc_init(void) if (!process_shared_preload_libraries_in_progress) elog(ERROR, "Neon module should be loaded via shared_preload_libraries"); + DefineCustomIntVariable("neon.local_file_cache", + "Enable ort disable local file cache", + NULL, + &lfc_enabled, + true, /* enabled by default */ + PGC_POSTMASTER, + 0, + NULL, + lfc_change_state, + NULL); + DefineCustomIntVariable("neon.max_file_cache_size", "Maximal size of Neon local file cache", NULL, @@ -414,10 +468,10 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) BufferTag tag; FileCacheEntry* entry; int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1); - bool found; + bool found = false; uint32 hash; - if (lfc_size_limit == 0) /* fast exit if file cache is disabled */ + if (!lfc_is_enabled()) /* fast exit if file cache is disabled */ return false; CopyNRelFileInfoToBufTag(tag, rinfo); @@ -426,8 +480,11 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) hash = get_hash_value(lfc_hash, &tag); LWLockAcquire(lfc_lock, LW_SHARED); - entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL); - found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0; + if (lfc_ctl->enabled) + { + entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL); + found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0; + } LWLockRelease(lfc_lock); return found; } @@ -444,7 +501,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1); uint32 hash; - if (lfc_size_limit == 0) /* fast exit if file cache is disabled */ + if (!lfc_is_enabled()) /* fast exit if file cache is disabled */ return; CopyNRelFileInfoToBufTag(tag, rinfo); @@ -454,6 +511,13 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) hash = get_hash_value(lfc_hash, &tag); LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + + if (!lfc_ctl->enabled) + { + LWLockRelease(lfc_lock); + return; + } + entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, &found); if (!found) @@ -519,7 +583,7 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, uint64 generation; uint32 entry_offset; - if (lfc_size_limit == 0) /* fast exit if file cache is disabled */ + if (!lfc_is_enabled()) /* fast exit if file cache is disabled */ return false; if (!lfc_ensure_opened()) @@ -531,6 +595,13 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, hash = get_hash_value(lfc_hash, &tag); LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + + if (!lfc_ctl->enabled) + { + LWLockRelease(lfc_lock); + return false; + } + entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL); if (entry == NULL || (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) == 0) { @@ -555,8 +626,10 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, /* Place entry to the head of LRU list */ LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + if (lfc_ctl->generation == generation) { + Assert(lfc_ctl->enabled); Assert(entry->access_count > 0); if (--entry->access_count == 0) dlist_push_tail(&lfc_ctl->lru, &entry->lru_node); @@ -591,17 +664,22 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, if (lfc_size_limit == 0) /* fast exit if file cache is disabled */ return; - if (!lfc_ensure_opened()) + if (!lfc_is_enabled()) /* fast exit if file cache is disabled */ return; tag.forkNum = forkNum; tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1); - CopyNRelFileInfoToBufTag(tag, rinfo); - hash = get_hash_value(lfc_hash, &tag); LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + + if (!lfc_ctl->enabled) + { + LWLockRelease(lfc_lock); + return; + } + entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found); if (found) @@ -752,13 +830,15 @@ local_cache_pages(PG_FUNCTION_ARGS) LWLockAcquire(lfc_lock, LW_SHARED); - hash_seq_init(&status, lfc_hash); - while ((entry = hash_seq_search(&status)) != NULL) + if (lfc_ctl->enabled) { - for (int i = 0; i < BLOCKS_PER_CHUNK; i++) - n_pages += (entry->bitmap[i >> 5] & (1 << (i & 31))) != 0; + hash_seq_init(&status, lfc_hash); + while ((entry = hash_seq_search(&status)) != NULL) + { + for (int i = 0; i < BLOCKS_PER_CHUNK; i++) + n_pages += (entry->bitmap[i >> 5] & (1 << (i & 31))) != 0; + } } - hash_seq_term(&status); fctx->record = (LocalCachePagesRec *) MemoryContextAllocHuge(CurrentMemoryContext, sizeof(LocalCachePagesRec) * n_pages); @@ -770,35 +850,37 @@ local_cache_pages(PG_FUNCTION_ARGS) /* Return to original context when allocating transient memory */ MemoryContextSwitchTo(oldcontext); - /* - * Scan through all the buffers, saving the relevant fields in the - * fctx->record structure. - * - * We don't hold the partition locks, so we don't get a consistent - * snapshot across all buffers, but we do grab the buffer header - * locks, so the information of each buffer is self-consistent. - */ - n_pages = 0; - hash_seq_init(&status, lfc_hash); - while ((entry = hash_seq_search(&status)) != NULL) + if (n_pages != 0) { - for (int i = 0; i < BLOCKS_PER_CHUNK; i++) + /* + * Scan through all the buffers, saving the relevant fields in the + * fctx->record structure. + * + * We don't hold the partition locks, so we don't get a consistent + * snapshot across all buffers, but we do grab the buffer header + * locks, so the information of each buffer is self-consistent. + */ + n_pages = 0; + hash_seq_init(&status, lfc_hash); + while ((entry = hash_seq_search(&status)) != NULL) { - if (entry->bitmap[i >> 5] & (1 << (i & 31))) + for (int i = 0; i < BLOCKS_PER_CHUNK; i++) { - fctx->record[n_pages].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i; - fctx->record[n_pages].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)); - fctx->record[n_pages].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key)); - fctx->record[n_pages].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key)); - fctx->record[n_pages].forknum = entry->key.forkNum; - fctx->record[n_pages].blocknum = entry->key.blockNum + i; - fctx->record[n_pages].accesscount = entry->access_count; - n_pages += 1; + if (entry->bitmap[i >> 5] & (1 << (i & 31))) + { + fctx->record[n_pages].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i; + fctx->record[n_pages].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)); + fctx->record[n_pages].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key)); + fctx->record[n_pages].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key)); + fctx->record[n_pages].forknum = entry->key.forkNum; + fctx->record[n_pages].blocknum = entry->key.blockNum + i; + fctx->record[n_pages].accesscount = entry->access_count; + n_pages += 1; + } } } + Assert(n_pages == funcctx->max_calls); } - hash_seq_term(&status); - Assert(n_pages == funcctx->max_calls); LWLockRelease(lfc_lock); } diff --git a/test_runner/regress/test_local_file_cache.py b/test_runner/regress/test_local_file_cache.py new file mode 100644 index 0000000000..c67df22321 --- /dev/null +++ b/test_runner/regress/test_local_file_cache.py @@ -0,0 +1,73 @@ +import os +import pathlib +import random +import signal +import threading +import time + +from psycopg2.extensions import connection as PgConnection + +import pytest +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content +from fixtures.pg_version import PgVersion +from fixtures.utils import query_scalar + +def test_local_file_cache_unlink(neon_simple_env: NeonEnv): + env = neon_simple_env + env.neon_cli.create_branch("test_local_file_cache_unlink", "empty") + + endpoint = env.endpoints.create_start( + "test_local_file_cache_unlink", + config_lines=[ + "shared_buffers='1MB'", + "neon.max_file_cache_size='64MB'", + "neon.file_cache_size_limit='10MB'", + ], + ) + + cur = endpoint.connect().cursor() + + n_rows = 100000 + n_threads = 20 + n_updates_per_thread = 10000 + n_updates_per_connection = 1000 + n_total_updates = n_threads * n_updates_per_thread + + cur.execute("CREATE TABLE lfctest (id int4 PRIMARY KEY, n int) WITH (fillfactor=10)") + cur.execute(f"INSERT INTO lfctest SELECT g, 1 FROM generate_series(1, {n_rows}) g"); + + # Start threads that will perform random UPDATEs. Each UPDATE + # increments the counter on the row, so that we can check at the + # end that the sum of all the counters match the number of updates + # performed (plus the initial 1 on each row). + # + # Furthermore, each thread will reconnect between every 1000 updates. + def run_updates(): + n_updates_performed = 0 + conn = endpoint.connect() + cur = conn.cursor() + for _ in range(n_updates_per_thread): + id = random.randint(1, n_rows) + cur.execute(f"UPDATE lfctest SET n = n + 1 WHERE id = {id}"); + n_updates_performed += 1 + if n_updates_performed % n_updates_per_connection == 0: + cur.close() + conn.close() + conn = endpoint.connect() + cur = conn.cursor() + + threads: List[threading.Thread] = [] + for i in range(n_threads): + thread = threading.Thread(target=run_updates, args=(), daemon=True) + thread.start() + threads.append(thread) + + time.sleep(5) + + os.unlink(os.path.join(endpoint.pg_data_dir_path(), "file.cache")) + + for thread in threads: + thread.join() + + assert query_scalar(cur, "SELECT SUM(n) FROM lfctest") == n_total_updates + n_rows diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 026d6b093d..b4ec091547 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 026d6b093d49e25cec44dd04598152329ceac027 +Subproject commit b4ec091547ebdc900fb07b1c92ed493dfab0c410