Track holes as dummy FileCacheEntrys

This commit is contained in:
Heikki Linnakangas
2024-08-05 21:05:40 +03:00
parent bd845c7587
commit b1e0dbc447
3 changed files with 81 additions and 7 deletions

View File

@@ -25,6 +25,7 @@
#include "funcapi.h"
#include "miscadmin.h"
#include "pagestore_client.h"
#include "common/relpath.h"
#include "common/hashfn.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
@@ -51,6 +52,24 @@
*
* Cache is always reconstructed at node startup, so we do not need to save mapping somewhere and worry about
* its consistency.
*
* ## Holes
*
* The LFC can be resized on the fly, up to a maximum size that's determined
* at server startup (neon.max_file_cache_size). After server startup, we
* expand the underlying file when needed, until it reaches the soft limit
* (neon.file_cache_size_limit). If the soft limit is later reduced, we shrink
* the LFC by punching holes in the underlying file with a
* fallocate(FALLOC_FL_PUNCH_HOLE) call. The nominal size of the file doesn't
* shrink, but the disk space it uses does.
*
* Each hole is tracked by a dummy FileCacheEntry, which are kept in the
* 'holes' linked list. They are entered into the chunk hash table, with a
* special key where the blockNumber is used to store the 'offset' of the
* hole, and all other fields are zero. Holes are never looked up in the hash
* table, we only enter them there to have a FileCacheEntry that we can keep
* in the linked list. If the soft limit is raised again, we reuse the holes
* before extending the nominal size of file.
*/
/* Local file storage allocation chunk.
@@ -87,6 +106,7 @@ typedef struct FileCacheControl
uint64 writes;
dlist_head lru; /* double linked list for LRU replacement
* algorithm */
dlist_head holes;
HyperLogLogState wss_estimation; /* estimation of working set size */
} FileCacheControl;
@@ -135,6 +155,7 @@ lfc_disable(char const *op)
lfc_ctl->used = 0;
lfc_ctl->limit = 0;
dlist_init(&lfc_ctl->lru);
dlist_init(&lfc_ctl->holes);
if (lfc_desc > 0)
{
@@ -311,13 +332,31 @@ lfc_change_limit_hook(int newval, void *extra)
* returning their space to file system
*/
FileCacheEntry *victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru));
FileCacheEntry *hole;
uint32 offset = victim->offset;
uint32 hash;
bool found;
BufferTag holetag;
Assert(victim->access_count == 0);
#ifdef FALLOC_FL_PUNCH_HOLE
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, (off_t) victim->offset * BLOCKS_PER_CHUNK * BLCKSZ, BLOCKS_PER_CHUNK * BLCKSZ) < 0)
neon_log(LOG, "Failed to punch hole in file: %m");
#endif
/* We remove the old entry, and re-enter a hole to the hash table */
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
memset(&holetag, 0, sizeof(holetag));
holetag.blockNum = offset;
hash = get_hash_value(lfc_hash, &holetag);
hole = hash_search_with_hash_value(lfc_hash, &holetag, hash, HASH_ENTER, &found);
hole->hash = hash;
hole->offset = offset;
hole->access_count = 0;
Assert(!found);
dlist_push_head(&lfc_ctl->holes, &hole->lru_node);
lfc_ctl->used -= 1;
}
lfc_ctl->limit = new_size;
@@ -439,7 +478,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
CopyNRelFileInfoToBufTag(tag, rinfo);
tag.forkNum = forkNum;
tag.blockNum = (blkno & ~(BLOCKS_PER_CHUNK - 1));
Assert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
@@ -525,6 +564,7 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
CopyNRelFileInfoToBufTag(tag, rinfo);
tag.forkNum = forkNum;
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
Assert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
@@ -613,6 +653,7 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, const void
tag.forkNum = forkNum;
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
CopyNRelFileInfoToBufTag(tag, rinfo);
Assert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
@@ -662,6 +703,19 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, const void
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
neon_log(DEBUG2, "Swap file cache page");
}
else if (!dlist_is_empty(&lfc_ctl->holes))
{
/* We can reuse a hole that was left behind when the LFC was shrink previously */
FileCacheEntry *hole = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->holes));
uint32 offset = hole->offset;
bool found;
hash_search_with_hash_value(lfc_hash, &hole->key, hole->hash, HASH_REMOVE, &found);
Assert(found);
lfc_ctl->used += 1;
entry->offset = offset; /* reuse the hole */
}
else
{
lfc_ctl->used += 1;

View File

@@ -54,6 +54,10 @@
#define BufTagGetNRelFileInfo(tag) tag.rnode
#define BufTagGetRelNumber(tagp) ((tagp)->rnode.relNode)
#define InvalidRelFileNumber InvalidOid
#define SMgrRelGetRelInfo(reln) \
(reln->smgr_rnode.node)

View File

@@ -1,3 +1,7 @@
import os
import random
import re
import subprocess
import threading
import time
@@ -17,17 +21,17 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
"test_lfc_resize",
config_lines=[
"neon.file_cache_path='file.cache'",
"neon.max_file_cache_size=1GB",
"neon.file_cache_size_limit=1GB",
"neon.max_file_cache_size=512MB",
"neon.file_cache_size_limit=512MB",
],
)
n_resize = 10
scale = 10
scale = 100
def run_pgbench(connstr: str):
log.info(f"Start a pgbench workload on pg {connstr}")
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
pg_bin.run_capture(["pgbench", "-c4", f"-T{n_resize}", "-Mprepared", connstr])
pg_bin.run_capture(["pgbench", "-c10", f"-T{n_resize}", "-Mprepared", "-S", connstr])
thread = threading.Thread(target=run_pgbench, args=(endpoint.connstr(),), daemon=True)
thread.start()
@@ -35,9 +39,21 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
conn = endpoint.connect()
cur = conn.cursor()
for i in range(n_resize):
cur.execute(f"alter system set neon.file_cache_size_limit='{i*10}MB'")
for _ in range(n_resize):
size = random.randint(1, 512)
cur.execute(f"alter system set neon.file_cache_size_limit='{size}MB'")
cur.execute("select pg_reload_conf()")
time.sleep(1)
cur.execute("alter system set neon.file_cache_size_limit='100MB'")
cur.execute("select pg_reload_conf()")
thread.join()
lfc_file_path = f"{endpoint.pg_data_dir_path()}/file.cache"
lfc_file_size = os.path.getsize(lfc_file_path)
res = subprocess.run(["ls", "-sk", lfc_file_path], check=True, text=True, capture_output=True)
lfc_file_blocks = re.findall("([0-9A-F]+)", res.stdout)[0]
log.info(f"Size of LFC file {lfc_file_size}, blocks {lfc_file_blocks}")
assert lfc_file_size <= 512 * 1024 * 1024
assert int(lfc_file_blocks) <= 128 * 1024