From 2324e5a3d37f9944ba46d794c01563ecb02b2b04 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Sun, 8 Oct 2023 15:02:42 +0300 Subject: [PATCH] Rebased version of LFC fixes --- pgxn/neon/file_cache.c | 256 +++++++++++++------ pgxn/neon/neon--1.0.sql | 10 + test_runner/regress/test_local_file_cache.py | 74 ++++++ 3 files changed, 257 insertions(+), 83 deletions(-) create mode 100644 test_runner/regress/test_local_file_cache.py diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 22f20a4c0b..ecd9ff6437 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -65,6 +65,7 @@ typedef struct FileCacheEntry { BufferTag key; + uint32 hash; uint32 offset; uint32 access_count; uint32 bitmap[BLOCKS_PER_CHUNK/32]; @@ -76,6 +77,9 @@ typedef struct FileCacheControl uint64 generation; /* generation is needed to handle correct hash reenabling */ uint32 size; /* size of cache file in chunks */ uint32 used; /* number of used chunks */ + uint32 limit; /* shared copy of lfc_size_limit */ + uint64 hits; + uint64 misses; dlist_head lru; /* double linked list for LRU replacement algorithm */ } FileCacheControl; @@ -91,7 +95,9 @@ static shmem_startup_hook_type prev_shmem_startup_hook; static shmem_request_hook_type prev_shmem_request_hook; #endif -void FileCacheMonitorMain(Datum main_arg); +#define LFC_ENABLED() (lfc_ctl->limit != 0) + +void PGDLLEXPORT FileCacheMonitorMain(Datum main_arg); /* * Local file cache is mandatory and Neon can work without it. @@ -102,49 +108,68 @@ void FileCacheMonitorMain(Datum main_arg); static void lfc_disable(char const* op) { - HASH_SEQ_STATUS status; - FileCacheEntry* entry; - elog(WARNING, "Failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path); + /* Invalidate hash */ + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + + if (LFC_ENABLED()) + { + HASH_SEQ_STATUS status; + FileCacheEntry* entry; + + hash_seq_init(&status, lfc_hash); + while ((entry = hash_seq_search(&status)) != NULL) + { + hash_search_with_hash_value(lfc_hash, &entry->key, entry->hash, HASH_REMOVE, NULL); + } + lfc_ctl->generation += 1; + lfc_ctl->size = 0; + lfc_ctl->used = 0; + lfc_ctl->limit = 0; + dlist_init(&lfc_ctl->lru); + + if (lfc_desc > 0) + { + /* If the reason of error is ENOSPC, then truncation of file may help to reclaim some space */ + int rc = ftruncate(lfc_desc, 0); + if (rc < 0) + elog(WARNING, "Failed to truncate local file cache %s: %m", lfc_path); + } + } + LWLockRelease(lfc_lock); + + if (lfc_desc > 0) close(lfc_desc); lfc_desc = -1; - lfc_size_limit = 0; +} - /* Invalidate hash */ - LWLockAcquire(lfc_lock, LW_EXCLUSIVE); - - hash_seq_init(&status, lfc_hash); - while ((entry = hash_seq_search(&status)) != NULL) - { - hash_search(lfc_hash, &entry->key, HASH_REMOVE, NULL); - memset(entry->bitmap, 0, sizeof entry->bitmap); - } - hash_seq_term(&status); - lfc_ctl->generation += 1; - lfc_ctl->size = 0; - lfc_ctl->used = 0; - dlist_init(&lfc_ctl->lru); - - LWLockRelease(lfc_lock); +/* + * This check is done without obtaining lfc_lock, so it is unreliable + */ +static bool +lfc_maybe_disabled(void) +{ + return !lfc_ctl || !LFC_ENABLED(); } static bool lfc_ensure_opened(void) { + bool enabled = !lfc_maybe_disabled(); /* Open cache file if not done yet */ - if (lfc_desc <= 0) + if (lfc_desc <= 0 && enabled) { - lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT); + lfc_desc = BasicOpenFile(lfc_path, O_RDWR); if (lfc_desc < 0) { lfc_disable("open"); return false; } } - return true; + return enabled; } static void @@ -163,6 +188,7 @@ lfc_shmem_startup(void) lfc_ctl = (FileCacheControl*)ShmemInitStruct("lfc", sizeof(FileCacheControl), &found); if (!found) { + int fd; uint32 lfc_size = SIZE_MB_TO_CHUNKS(lfc_max_size); lfc_lock = (LWLockId)GetNamedLWLockTranche("lfc_lock"); info.keysize = sizeof(BufferTag); @@ -175,10 +201,22 @@ lfc_shmem_startup(void) lfc_ctl->generation = 0; lfc_ctl->size = 0; lfc_ctl->used = 0; + lfc_ctl->hits = 0; + lfc_ctl->misses = 0; dlist_init(&lfc_ctl->lru); - /* Remove file cache on restart */ - (void)unlink(lfc_path); + /* Recreate file cache on restart */ + fd = BasicOpenFile(lfc_path, O_RDWR|O_CREAT|O_TRUNC); + if (fd < 0) + { + elog(WARNING, "Failed to create local file cache %s: %m", lfc_path); + lfc_ctl->limit = 0; + } + else + { + close(fd); + lfc_ctl->limit = SIZE_MB_TO_CHUNKS(lfc_size_limit); + } } LWLockRelease(AddinShmemInitLock); } @@ -195,6 +233,17 @@ lfc_shmem_request(void) RequestNamedLWLockTranche("lfc_lock", 1); } +static bool +is_normal_backend(void) +{ + /* + * Stats collector detach shared memory, so we should not try to access shared memory here. + * Parallel workers first assign default value (0), so not perform truncation in parallel workers. + * The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC. + */ + return lfc_ctl && MyProc && UsedShmemSegAddr && !IsParallelWorker(); +} + static bool lfc_check_limit_hook(int *newval, void **extra, GucSource source) { @@ -210,25 +259,15 @@ static void lfc_change_limit_hook(int newval, void *extra) { uint32 new_size = SIZE_MB_TO_CHUNKS(newval); - /* - * Stats collector detach shared memory, so we should not try to access shared memory here. - * Parallel workers first assign default value (0), so not perform truncation in parallel workers. - * The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC. - */ - if (!lfc_ctl || !MyProc || !UsedShmemSegAddr || IsParallelWorker()) + + if (!is_normal_backend()) + return; + + if (!lfc_ensure_opened()) return; - /* Open cache file if not done yet */ - if (lfc_desc <= 0) - { - lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT); - if (lfc_desc < 0) { - elog(WARNING, "Failed to open file cache %s: %m, disabling file cache", lfc_path); - lfc_size_limit = 0; /* disable file cache */ - return; - } - } LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + while (new_size < lfc_ctl->used && !dlist_is_empty(&lfc_ctl->lru)) { /* Shrink cache by throwing away least recently accessed chunks and returning their space to file system */ @@ -238,10 +277,12 @@ lfc_change_limit_hook(int newval, void *extra) if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE, (off_t)victim->offset*BLOCKS_PER_CHUNK*BLCKSZ, BLOCKS_PER_CHUNK*BLCKSZ) < 0) elog(LOG, "Failed to punch hole in file: %m"); #endif - hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL); + hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL); lfc_ctl->used -= 1; } + lfc_ctl->limit = new_size; elog(DEBUG1, "set local file cache limit to %d", new_size); + LWLockRelease(lfc_lock); } @@ -255,6 +296,7 @@ lfc_init(void) if (!process_shared_preload_libraries_in_progress) elog(ERROR, "Neon module should be loaded via shared_preload_libraries"); + DefineCustomIntVariable("neon.max_file_cache_size", "Maximal size of Neon local file cache", NULL, @@ -315,10 +357,10 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) BufferTag tag; FileCacheEntry* entry; int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1); - bool found; + bool found = false; uint32 hash; - if (lfc_size_limit == 0) /* fast exit if file cache is disabled */ + if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return false; CopyNRelFileInfoToBufTag(tag, rinfo); @@ -327,8 +369,11 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) hash = get_hash_value(lfc_hash, &tag); LWLockAcquire(lfc_lock, LW_SHARED); - entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL); - found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0; + if (LFC_ENABLED()) + { + entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL); + found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0; + } LWLockRelease(lfc_lock); return found; } @@ -345,7 +390,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1); uint32 hash; - if (lfc_size_limit == 0) /* fast exit if file cache is disabled */ + if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return; CopyNRelFileInfoToBufTag(tag, rinfo); @@ -355,6 +400,13 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) hash = get_hash_value(lfc_hash, &tag); LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + + if (!LFC_ENABLED()) + { + LWLockRelease(lfc_lock); + return; + } + entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, &found); if (!found) @@ -405,7 +457,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) /* * Try to read page from local cache. * Returns true if page is found in local cache. - * In case of error lfc_size_limit is set to zero to disable any further opera-tins with cache. + * In case of error local file cache is disabled (lfc->limit is set to zero). */ bool lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, @@ -420,7 +472,7 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, uint64 generation; uint32 entry_offset; - if (lfc_size_limit == 0) /* fast exit if file cache is disabled */ + if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return false; if (!lfc_ensure_opened()) @@ -432,10 +484,18 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, hash = get_hash_value(lfc_hash, &tag); LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + + if (!LFC_ENABLED()) + { + LWLockRelease(lfc_lock); + return false; + } + entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL); if (entry == NULL || (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) == 0) { /* Page is not cached */ + lfc_ctl->misses += 1; /* race condition here, but precise value is not needed */ LWLockRelease(lfc_lock); return false; } @@ -456,8 +516,11 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, /* Place entry to the head of LRU list */ LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + if (lfc_ctl->generation == generation) { + Assert(LFC_ENABLED()); + lfc_ctl->hits += 1; Assert(entry->access_count > 0); if (--entry->access_count == 0) dlist_push_tail(&lfc_ctl->lru, &entry->lru_node); @@ -489,7 +552,7 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1); uint32 hash; - if (lfc_size_limit == 0) /* fast exit if file cache is disabled */ + if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return; if (!lfc_ensure_opened()) @@ -497,12 +560,17 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, tag.forkNum = forkNum; tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1); - CopyNRelFileInfoToBufTag(tag, rinfo); - hash = get_hash_value(lfc_hash, &tag); LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + + if (!LFC_ENABLED()) + { + LWLockRelease(lfc_lock); + return; + } + entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found); if (found) @@ -521,13 +589,13 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, * there are should be very large number of concurrent IO operations and them are limited by max_connections, * we prefer not to complicate code and use second approach. */ - if (lfc_ctl->used >= SIZE_MB_TO_CHUNKS(lfc_size_limit) && !dlist_is_empty(&lfc_ctl->lru)) + if (lfc_ctl->used >= lfc_ctl->limit && !dlist_is_empty(&lfc_ctl->lru)) { /* Cache overflow: evict least recently used chunk */ FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru)); Assert(victim->access_count == 0); entry->offset = victim->offset; /* grab victim's chunk */ - hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL); + hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL); elog(DEBUG2, "Swap file cache page"); } else @@ -536,6 +604,7 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, entry->offset = lfc_ctl->size++; /* allocate new chunk at end of file */ } entry->access_count = 1; + entry->hash = hash; memset(entry->bitmap, 0, sizeof entry->bitmap); } @@ -580,6 +649,23 @@ typedef struct LocalCachePagesRec *record; } LocalCachePagesContext; + +PG_FUNCTION_INFO_V1(local_cache_hits); +Datum +local_cache_hits(PG_FUNCTION_ARGS) +{ + PG_RETURN_INT64(lfc_ctl ? lfc_ctl->hits : -1); +} + + +PG_FUNCTION_INFO_V1(local_cache_misses); +Datum +local_cache_misses(PG_FUNCTION_ARGS) +{ + PG_RETURN_INT64(lfc_ctl ? lfc_ctl->misses : -1); +} + + /* * Function returning data from the local file cache * relation node/tablespace/database/blocknum and access_counter @@ -653,13 +739,15 @@ local_cache_pages(PG_FUNCTION_ARGS) LWLockAcquire(lfc_lock, LW_SHARED); - hash_seq_init(&status, lfc_hash); - while ((entry = hash_seq_search(&status)) != NULL) + if (LFC_ENABLED()) { - for (int i = 0; i < BLOCKS_PER_CHUNK; i++) - n_pages += (entry->bitmap[i >> 5] & (1 << (i & 31))) != 0; + hash_seq_init(&status, lfc_hash); + while ((entry = hash_seq_search(&status)) != NULL) + { + for (int i = 0; i < BLOCKS_PER_CHUNK; i++) + n_pages += (entry->bitmap[i >> 5] & (1 << (i & 31))) != 0; + } } - hash_seq_term(&status); fctx->record = (LocalCachePagesRec *) MemoryContextAllocHuge(CurrentMemoryContext, sizeof(LocalCachePagesRec) * n_pages); @@ -671,35 +759,37 @@ local_cache_pages(PG_FUNCTION_ARGS) /* Return to original context when allocating transient memory */ MemoryContextSwitchTo(oldcontext); - /* - * Scan through all the buffers, saving the relevant fields in the - * fctx->record structure. - * - * We don't hold the partition locks, so we don't get a consistent - * snapshot across all buffers, but we do grab the buffer header - * locks, so the information of each buffer is self-consistent. - */ - n_pages = 0; - hash_seq_init(&status, lfc_hash); - while ((entry = hash_seq_search(&status)) != NULL) + if (n_pages != 0) { - for (int i = 0; i < BLOCKS_PER_CHUNK; i++) + /* + * Scan through all the buffers, saving the relevant fields in the + * fctx->record structure. + * + * We don't hold the partition locks, so we don't get a consistent + * snapshot across all buffers, but we do grab the buffer header + * locks, so the information of each buffer is self-consistent. + */ + n_pages = 0; + hash_seq_init(&status, lfc_hash); + while ((entry = hash_seq_search(&status)) != NULL) { - if (entry->bitmap[i >> 5] & (1 << (i & 31))) + for (int i = 0; i < BLOCKS_PER_CHUNK; i++) { - fctx->record[n_pages].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i; - fctx->record[n_pages].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)); - fctx->record[n_pages].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key)); - fctx->record[n_pages].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key)); - fctx->record[n_pages].forknum = entry->key.forkNum; - fctx->record[n_pages].blocknum = entry->key.blockNum + i; - fctx->record[n_pages].accesscount = entry->access_count; - n_pages += 1; + if (entry->bitmap[i >> 5] & (1 << (i & 31))) + { + fctx->record[n_pages].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i; + fctx->record[n_pages].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)); + fctx->record[n_pages].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key)); + fctx->record[n_pages].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key)); + fctx->record[n_pages].forknum = entry->key.forkNum; + fctx->record[n_pages].blocknum = entry->key.blockNum + i; + fctx->record[n_pages].accesscount = entry->access_count; + n_pages += 1; + } } } + Assert(n_pages == funcctx->max_calls); } - hash_seq_term(&status); - Assert(n_pages == funcctx->max_calls); LWLockRelease(lfc_lock); } diff --git a/pgxn/neon/neon--1.0.sql b/pgxn/neon/neon--1.0.sql index 6cf111ea6a..81f68dafef 100644 --- a/pgxn/neon/neon--1.0.sql +++ b/pgxn/neon/neon--1.0.sql @@ -22,6 +22,16 @@ AS 'MODULE_PATHNAME', 'backpressure_throttling_time' LANGUAGE C STRICT PARALLEL UNSAFE; +CREATE FUNCTION local_cache_hits() RETURNS bigint +AS 'MODULE_PATHNAME', 'local_cache_hits' +LANGUAGE C PARALLEL SAFE; + + +CREATE FUNCTION local_cache_misses() RETURNS bigint +AS 'MODULE_PATHNAME', 'local_cache_misses' +LANGUAGE C PARALLEL SAFE; + + CREATE FUNCTION local_cache_pages() RETURNS SETOF RECORD AS 'MODULE_PATHNAME', 'local_cache_pages' diff --git a/test_runner/regress/test_local_file_cache.py b/test_runner/regress/test_local_file_cache.py new file mode 100644 index 0000000000..38f2034c18 --- /dev/null +++ b/test_runner/regress/test_local_file_cache.py @@ -0,0 +1,74 @@ +import os +import random +import threading +import time +from typing import List + +from fixtures.neon_fixtures import NeonEnv +from fixtures.utils import query_scalar + + +def test_local_file_cache_unlink(neon_simple_env: NeonEnv): + env = neon_simple_env + + cache_dir = os.path.join(env.repo_dir, "file_cache") + os.mkdir(cache_dir) + + env.neon_cli.create_branch("test_local_file_cache_unlink", "empty") + + endpoint = env.endpoints.create_start( + "test_local_file_cache_unlink", + config_lines=[ + "shared_buffers='1MB'", + f"neon.file_cache_path='{cache_dir}/file.cache'", + "neon.max_file_cache_size='64MB'", + "neon.file_cache_size_limit='10MB'", + ], + ) + + cur = endpoint.connect().cursor() + + n_rows = 100000 + n_threads = 20 + n_updates_per_thread = 10000 + n_updates_per_connection = 1000 + n_total_updates = n_threads * n_updates_per_thread + + cur.execute("CREATE TABLE lfctest (id int4 PRIMARY KEY, n int) WITH (fillfactor=10)") + cur.execute(f"INSERT INTO lfctest SELECT g, 1 FROM generate_series(1, {n_rows}) g") + + # Start threads that will perform random UPDATEs. Each UPDATE + # increments the counter on the row, so that we can check at the + # end that the sum of all the counters match the number of updates + # performed (plus the initial 1 on each row). + # + # Furthermore, each thread will reconnect between every 1000 updates. + def run_updates(): + n_updates_performed = 0 + conn = endpoint.connect() + cur = conn.cursor() + for _ in range(n_updates_per_thread): + id = random.randint(1, n_rows) + cur.execute(f"UPDATE lfctest SET n = n + 1 WHERE id = {id}") + n_updates_performed += 1 + if n_updates_performed % n_updates_per_connection == 0: + cur.close() + conn.close() + conn = endpoint.connect() + cur = conn.cursor() + + threads: List[threading.Thread] = [] + for _i in range(n_threads): + thread = threading.Thread(target=run_updates, args=(), daemon=True) + thread.start() + threads.append(thread) + + time.sleep(5) + + new_cache_dir = os.path.join(env.repo_dir, "file_cache_new") + os.rename(cache_dir, new_cache_dir) + + for thread in threads: + thread.join() + + assert query_scalar(cur, "SELECT SUM(n) FROM lfctest") == n_total_updates + n_rows