diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 1894e8c72a..e9697c9c26 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -25,6 +25,7 @@ #include "funcapi.h" #include "miscadmin.h" #include "pagestore_client.h" +#include "common/relpath.h" #include "common/hashfn.h" #include "pgstat.h" #include "postmaster/bgworker.h" @@ -51,6 +52,24 @@ * * Cache is always reconstructed at node startup, so we do not need to save mapping somewhere and worry about * its consistency. + * + * ## Holes + * + * The LFC can be resized on the fly, up to a maximum size that's determined + * at server startup (neon.max_file_cache_size). After server startup, we + * expand the underlying file when needed, until it reaches the soft limit + * (neon.file_cache_size_limit). If the soft limit is later reduced, we shrink + * the LFC by punching holes in the underlying file with a + * fallocate(FALLOC_FL_PUNCH_HOLE) call. The nominal size of the file doesn't + * shrink, but the disk space it uses does. + * + * Each hole is tracked by a dummy FileCacheEntry, which are kept in the + * 'holes' linked list. They are entered into the chunk hash table, with a + * special key where the blockNumber is used to store the 'offset' of the + * hole, and all other fields are zero. Holes are never looked up in the hash + * table, we only enter them there to have a FileCacheEntry that we can keep + * in the linked list. If the soft limit is raised again, we reuse the holes + * before extending the nominal size of file. */ /* Local file storage allocation chunk. @@ -87,6 +106,7 @@ typedef struct FileCacheControl uint64 writes; dlist_head lru; /* double linked list for LRU replacement * algorithm */ + dlist_head holes; HyperLogLogState wss_estimation; /* estimation of working set size */ } FileCacheControl; @@ -135,6 +155,7 @@ lfc_disable(char const *op) lfc_ctl->used = 0; lfc_ctl->limit = 0; dlist_init(&lfc_ctl->lru); + dlist_init(&lfc_ctl->holes); if (lfc_desc > 0) { @@ -311,13 +332,31 @@ lfc_change_limit_hook(int newval, void *extra) * returning their space to file system */ FileCacheEntry *victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru)); + FileCacheEntry *hole; + uint32 offset = victim->offset; + uint32 hash; + bool found; + BufferTag holetag; Assert(victim->access_count == 0); #ifdef FALLOC_FL_PUNCH_HOLE if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, (off_t) victim->offset * BLOCKS_PER_CHUNK * BLCKSZ, BLOCKS_PER_CHUNK * BLCKSZ) < 0) neon_log(LOG, "Failed to punch hole in file: %m"); #endif + + /* We remove the old entry, and re-enter a hole to the hash table */ hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL); + + memset(&holetag, 0, sizeof(holetag)); + holetag.blockNum = offset; + hash = get_hash_value(lfc_hash, &holetag); + hole = hash_search_with_hash_value(lfc_hash, &holetag, hash, HASH_ENTER, &found); + hole->hash = hash; + hole->offset = offset; + hole->access_count = 0; + Assert(!found); + dlist_push_head(&lfc_ctl->holes, &hole->lru_node); + lfc_ctl->used -= 1; } lfc_ctl->limit = new_size; @@ -439,7 +478,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) CopyNRelFileInfoToBufTag(tag, rinfo); tag.forkNum = forkNum; tag.blockNum = (blkno & ~(BLOCKS_PER_CHUNK - 1)); - + Assert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); hash = get_hash_value(lfc_hash, &tag); LWLockAcquire(lfc_lock, LW_EXCLUSIVE); @@ -525,6 +564,7 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, CopyNRelFileInfoToBufTag(tag, rinfo); tag.forkNum = forkNum; tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1); + Assert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); hash = get_hash_value(lfc_hash, &tag); LWLockAcquire(lfc_lock, LW_EXCLUSIVE); @@ -613,6 +653,7 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, const void tag.forkNum = forkNum; tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1); CopyNRelFileInfoToBufTag(tag, rinfo); + Assert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); hash = get_hash_value(lfc_hash, &tag); LWLockAcquire(lfc_lock, LW_EXCLUSIVE); @@ -662,6 +703,19 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, const void hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL); neon_log(DEBUG2, "Swap file cache page"); } + else if (!dlist_is_empty(&lfc_ctl->holes)) + { + /* We can reuse a hole that was left behind when the LFC was shrink previously */ + FileCacheEntry *hole = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->holes)); + uint32 offset = hole->offset; + bool found; + + hash_search_with_hash_value(lfc_hash, &hole->key, hole->hash, HASH_REMOVE, &found); + Assert(found); + + lfc_ctl->used += 1; + entry->offset = offset; /* reuse the hole */ + } else { lfc_ctl->used += 1; diff --git a/pgxn/neon/neon_pgversioncompat.h b/pgxn/neon/neon_pgversioncompat.h index f19732cbbb..addb6ccce6 100644 --- a/pgxn/neon/neon_pgversioncompat.h +++ b/pgxn/neon/neon_pgversioncompat.h @@ -54,6 +54,10 @@ #define BufTagGetNRelFileInfo(tag) tag.rnode +#define BufTagGetRelNumber(tagp) ((tagp)->rnode.relNode) + +#define InvalidRelFileNumber InvalidOid + #define SMgrRelGetRelInfo(reln) \ (reln->smgr_rnode.node) diff --git a/test_runner/regress/test_lfc_resize.py b/test_runner/regress/test_lfc_resize.py index 2a3442448a..1b2c7f808f 100644 --- a/test_runner/regress/test_lfc_resize.py +++ b/test_runner/regress/test_lfc_resize.py @@ -1,3 +1,7 @@ +import os +import random +import re +import subprocess import threading import time @@ -17,17 +21,17 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin): "test_lfc_resize", config_lines=[ "neon.file_cache_path='file.cache'", - "neon.max_file_cache_size=1GB", - "neon.file_cache_size_limit=1GB", + "neon.max_file_cache_size=512MB", + "neon.file_cache_size_limit=512MB", ], ) n_resize = 10 - scale = 10 + scale = 100 def run_pgbench(connstr: str): log.info(f"Start a pgbench workload on pg {connstr}") pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr]) - pg_bin.run_capture(["pgbench", "-c4", f"-T{n_resize}", "-Mprepared", connstr]) + pg_bin.run_capture(["pgbench", "-c10", f"-T{n_resize}", "-Mprepared", "-S", connstr]) thread = threading.Thread(target=run_pgbench, args=(endpoint.connstr(),), daemon=True) thread.start() @@ -35,9 +39,21 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin): conn = endpoint.connect() cur = conn.cursor() - for i in range(n_resize): - cur.execute(f"alter system set neon.file_cache_size_limit='{i*10}MB'") + for _ in range(n_resize): + size = random.randint(1, 512) + cur.execute(f"alter system set neon.file_cache_size_limit='{size}MB'") cur.execute("select pg_reload_conf()") time.sleep(1) + cur.execute("alter system set neon.file_cache_size_limit='100MB'") + cur.execute("select pg_reload_conf()") + thread.join() + + lfc_file_path = f"{endpoint.pg_data_dir_path()}/file.cache" + lfc_file_size = os.path.getsize(lfc_file_path) + res = subprocess.run(["ls", "-sk", lfc_file_path], check=True, text=True, capture_output=True) + lfc_file_blocks = re.findall("([0-9A-F]+)", res.stdout)[0] + log.info(f"Size of LFC file {lfc_file_size}, blocks {lfc_file_blocks}") + assert lfc_file_size <= 512 * 1024 * 1024 + assert int(lfc_file_blocks) <= 128 * 1024