diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 3a2ac380f9..143ad4bf67 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -14,6 +14,7 @@ */ #include +#include #include #include @@ -34,6 +35,9 @@ #include "storage/fd.h" #include "storage/pg_shmem.h" #include "storage/buf_internals.h" +#include "storage/procsignal.h" +#include "postmaster/bgworker.h" +#include "postmaster/interrupt.h" /* * Local file cache is used to temporary store relations pages in local file system. @@ -59,6 +63,9 @@ #define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK)) +#define MAX_MONITOR_INTERVAL_USEC 1000000 /* 1 second */ +#define MAX_DISK_WRITE_RATE 1000 /* MB/sec */ + typedef struct FileCacheEntry { BufferTag key; @@ -71,6 +78,7 @@ typedef struct FileCacheEntry typedef struct FileCacheControl { uint32 size; /* size of cache file in chunks */ + uint32 used; /* number of used chunks */ dlist_head lru; /* double linked list for LRU replacement algorithm */ } FileCacheControl; @@ -79,12 +87,14 @@ static int lfc_desc; static LWLockId lfc_lock; static int lfc_max_size; static int lfc_size_limit; +static int lfc_free_space_watermark; static char* lfc_path; static FileCacheControl* lfc_ctl; static shmem_startup_hook_type prev_shmem_startup_hook; #if PG_VERSION_NUM>=150000 static shmem_request_hook_type prev_shmem_request_hook; #endif +static int lfc_shrinking_factor; /* power of two by which local cache size will be shrinked when lfc_free_space_watermark is reached */ static void lfc_shmem_startup(void) @@ -112,6 +122,7 @@ lfc_shmem_startup(void) &info, HASH_ELEM | HASH_BLOBS); lfc_ctl->size = 0; + lfc_ctl->used = 0; dlist_init(&lfc_ctl->lru); /* Remove file cache on restart */ @@ -165,7 +176,7 @@ lfc_change_limit_hook(int newval, void *extra) } } LWLockAcquire(lfc_lock, LW_EXCLUSIVE); - while (new_size < lfc_ctl->size && !dlist_is_empty(&lfc_ctl->lru)) + while (new_size < lfc_ctl->used && !dlist_is_empty(&lfc_ctl->lru)) { /* Shrink cache by throwing away least recently accessed chunks and returning their space to file system */ FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru)); @@ -175,12 +186,86 @@ lfc_change_limit_hook(int newval, void *extra) elog(LOG, "Failed to punch hole in file: %m"); #endif hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL); - lfc_ctl->size -= 1; + lfc_ctl->used -= 1; } elog(LOG, "set local file cache limit to %d", new_size); LWLockRelease(lfc_lock); } +/* + * Local file system state monitor check available free space. + * If it is lower than lfc_free_space_watermark then we shrink size of local cache + * but throwing away least recently accessed chunks. + * First time low space watermark is reached cache size is divided by two, + * second time by four,... Finally we remove all chunks from local cache. + * + * Please notice that we are not changing lfc_cache_size: it is used to be adjusted by autoscaler. + * We only throw away cached chunks but do not prevent from filling cache by new chunks. + * + * Interval of poooling cache state is calculated as minimal time needed to consume lfc_free_space_watermark + * disk space with maximal possible disk write speed (1Gb/sec). But not larger than 1 second. + * Callinng statfs each second should not add some noticable overhead. + */ +void +FileCacheMonitorMain(Datum main_arg) +{ + /* + * Choose file system state monitor interval so that space can not be exosted + * during this period but not longer than MAX_MONITOR_INTERVAL (10 sec) + */ + uint64 monitor_interval = Min(MAX_MONITOR_INTERVAL_USEC, lfc_free_space_watermark*MB/MAX_DISK_WRITE_RATE); + + /* Establish signal handlers. */ + pqsignal(SIGUSR1, procsignal_sigusr1_handler); + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGTERM, SignalHandlerForShutdownRequest); + BackgroundWorkerUnblockSignals(); + + /* Periodically dump buffers until terminated. */ + while (!ShutdownRequestPending) + { + if (lfc_size_limit != 0) + { + struct statfs sfs; + if (statfs(lfc_path, &sfs) < 0) + { + elog(WARNING, "Failed to obtain status of %s: %m", lfc_path); + } + else + { + if (sfs.f_bavail*sfs.f_bsize < lfc_free_space_watermark*MB) + { + if (lfc_shrinking_factor < 31) { + lfc_shrinking_factor += 1; + } + lfc_change_limit_hook(lfc_size_limit >> lfc_shrinking_factor, NULL); + } + else + lfc_shrinking_factor = 0; /* reset to initial value */ + } + } + pg_usleep(monitor_interval); + } +} + +static void +lfc_register_free_space_monitor(void) +{ + BackgroundWorker bgw; + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS; + bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "FileCacheMonitorMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, "Local free space monitor"); + snprintf(bgw.bgw_type, BGW_MAXLEN, "Local free space monitor"); + bgw.bgw_restart_time = 5; + bgw.bgw_notify_pid = 0; + bgw.bgw_main_arg = (Datum) 0; + + RegisterBackgroundWorker(&bgw); +} + void lfc_init(void) { @@ -217,6 +302,19 @@ lfc_init(void) lfc_change_limit_hook, NULL); + DefineCustomIntVariable("neon.free_space_watermark", + "Minimal free space in local file system after reaching which local file cache will be truncated", + NULL, + &lfc_free_space_watermark, + 1024, /* 1GB */ + 0, + INT_MAX, + PGC_SIGHUP, + GUC_UNIT_MB, + NULL, + NULL, + NULL); + DefineCustomStringVariable("neon.file_cache_path", "Path to local file cache (can be raw device)", NULL, @@ -231,6 +329,9 @@ lfc_init(void) if (lfc_max_size == 0) return; + if (lfc_free_space_watermark != 0) + lfc_register_free_space_monitor(); + prev_shmem_startup_hook = shmem_startup_hook; shmem_startup_hook = lfc_shmem_startup; #if PG_VERSION_NUM>=150000 @@ -380,7 +481,7 @@ lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, * there are should be very large number of concurrent IO operations and them are limited by max_connections, * we prefer not to complicate code and use second approach. */ - if (lfc_ctl->size >= SIZE_MB_TO_CHUNKS(lfc_size_limit) && !dlist_is_empty(&lfc_ctl->lru)) + if (lfc_ctl->used >= SIZE_MB_TO_CHUNKS(lfc_size_limit) && !dlist_is_empty(&lfc_ctl->lru)) { /* Cache overflow: evict least recently used chunk */ FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru)); @@ -390,7 +491,10 @@ lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, elog(LOG, "Swap file cache page"); } else + { + lfc_ctl->used += 1; entry->offset = lfc_ctl->size++; /* allocate new chunk at end of file */ + } entry->access_count = 1; memset(entry->bitmap, 0, sizeof entry->bitmap); }