Compare commits

...

4 Commits

Author SHA1 Message Date
Konstantin Knizhnik
93b0f44a06 Set lfc_size_limit as Min(lfc_size_limit, lfc_max_mem) 2023-08-24 17:54:22 +03:00
Konstantin Knizhnik
fc206e60d2 Add lfc_max_mem GUC 2023-08-24 17:54:22 +03:00
Konstantin Knizhnik
7b8ccee8db Restore neon.lfs_file_parth GUC 2023-08-24 17:54:22 +03:00
Konstantin Knizhnik
d8d38f2c42 Monitor avaiable memory size in local file cache and shrink cache if watermark is reached 2023-08-24 17:54:22 +03:00

View File

@@ -14,6 +14,7 @@
*/
#include <sys/file.h>
#include <sys/mman.h>
#include <sys/statvfs.h>
#include <unistd.h>
#include <fcntl.h>
@@ -59,12 +60,17 @@
* 2. Improve access locality, subsequent pages will be allocated together improving seqscan speed
*/
#define BLOCKS_PER_CHUNK 128 /* 1Mb chunk */
#define CHUNK_SIZE (BLOCKS_PER_CHUNK * BLCKSZ)
#define MB ((uint64)1024*1024)
#ifndef MADV_REMOVE
#define MADV_REMOVE MADV_FREE /* MacOS doesn't have MADV_REMOVE and at Linux MADV_FREE works only for MAP_PRIVATE */
#endif
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK))
#define MAX_MONITOR_INTERVAL_USEC 1000000 /* 1 second */
#define MAX_DISK_WRITE_RATE 1000 /* MB/sec */
#define MAX_MEM_WRITE_RATE 10000 /* MB/sec */
typedef struct FileCacheEntry
{
@@ -83,21 +89,56 @@ typedef struct FileCacheControl
} FileCacheControl;
static HTAB* lfc_hash;
static int lfc_desc;
static LWLockId lfc_lock;
static int lfc_max_size;
static int lfc_max_mem;
static int lfc_size_limit;
static int lfc_free_space_watermark;
static int lfc_free_memory_watermark;
static char* lfc_base_addr;
static char* lfc_path;
static FileCacheControl* lfc_ctl;
static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM>=150000
static shmem_request_hook_type prev_shmem_request_hook;
#endif
static int lfc_shrinking_factor; /* power of two by which local cache size will be shrinked when lfc_free_space_watermark is reached */
static int lfc_shrinking_factor; /* power of two by which local cache size will be shrinked when lfc_free_space_watermark or lfc_free_memory_watermak are reached */
void FileCacheMonitorMain(Datum main_arg);
#ifdef __APPLE__
#include <sys/types.h>
#include <sys/sysctl.h>
static size_t
get_available_memory(void)
{
size_t total;
size_t sizeof_total = sizeof(total);
if (sysctlbyname("hw.memsize", &total, &sizeof_total, NULL, 0) < 0)
elog(ERROR, "Failed to get amount of RAM: %m");
return total;
}
#else
#include <sys/sysinfo.h>
static size_t
get_available_memory(void)
{
struct sysinfo si;
if (sysinfo(&si) < 0)
elog(ERROR, "Failed to get amount of RAM: %m");
return si.totalram*si.mem_unit;
}
#endif
static void
lfc_shmem_startup(void)
{
@@ -111,7 +152,7 @@ lfc_shmem_startup(void)
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
lfc_ctl = (FileCacheControl*)ShmemInitStruct("lfc", sizeof(FileCacheControl), &found);
lfc_ctl = (FileCacheControl*)ShmemInitStruct("lfc", sizeof(FileCacheControl) + lfc_max_size*MB + CHUNK_SIZE, &found);
if (!found)
{
uint32 lfc_size = SIZE_MB_TO_CHUNKS(lfc_max_size);
@@ -126,10 +167,10 @@ lfc_shmem_startup(void)
lfc_ctl->size = 0;
lfc_ctl->used = 0;
dlist_init(&lfc_ctl->lru);
/* Remove file cache on restart */
(void)unlink(lfc_path);
}
lfc_base_addr = (char*)TYPEALIGN(CHUNK_SIZE, lfc_ctl+1);
if (!found)
madvise(lfc_base_addr, lfc_max_size*MB, MADV_REMOVE);
LWLockRelease(AddinShmemInitLock);
}
@@ -141,7 +182,9 @@ lfc_shmem_request(void)
prev_shmem_request_hook();
#endif
RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size)+1, sizeof(FileCacheEntry)));
lfc_max_size = Min(lfc_max_size, lfc_max_mem);
lfc_size_limit = Min(lfc_size_limit, lfc_max_mem);
RequestAddinShmemSpace(sizeof(FileCacheControl) + lfc_max_size*MB + CHUNK_SIZE + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size)+1, sizeof(FileCacheEntry)));
RequestNamedLWLockTranche("lfc_lock", 1);
}
@@ -167,26 +210,14 @@ lfc_change_limit_hook(int newval, void *extra)
if (!lfc_ctl || !UsedShmemSegAddr || IsParallelWorker())
return;
/* Open cache file if not done yet */
if (lfc_desc == 0)
{
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
if (lfc_desc < 0) {
elog(WARNING, "Failed to open file cache %s: %m, disabling file cache", lfc_path);
lfc_size_limit = 0; /* disable file cache */
return;
}
}
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
while (new_size < lfc_ctl->used && !dlist_is_empty(&lfc_ctl->lru))
{
/* Shrink cache by throwing away least recently accessed chunks and returning their space to file system */
FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru));
Assert(victim->access_count == 0);
#ifdef FALLOC_FL_PUNCH_HOLE
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE, (off_t)victim->offset*BLOCKS_PER_CHUNK*BLCKSZ, BLOCKS_PER_CHUNK*BLCKSZ) < 0)
elog(LOG, "Failed to punch hole in file: %m");
#endif
if (madvise(lfc_base_addr + victim->offset*CHUNK_SIZE, CHUNK_SIZE, MADV_REMOVE) < 0)
elog(LOG, "Failed to punch hole in memory: %m");
hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL);
lfc_ctl->used -= 1;
}
@@ -195,10 +226,11 @@ lfc_change_limit_hook(int newval, void *extra)
}
/*
* Local file system state monitor check available free space.
* If it is lower than lfc_free_space_watermark then we shrink size of local cache
* Local file system state monitor check available free space and memory.
* If available disk space is lower than lfc_free_space_watermark or
* available memory is lower than lfc_free_memory_watermark then we shrink size of local cache
* but throwing away least recently accessed chunks.
* First time low space watermark is reached cache size is divided by two,
* First time the watermark is reached cache size is divided by two,
* second time by four,... Finally we remove all chunks from local cache.
*
* Please notice that we are not changing lfc_cache_size: it is used to be adjusted by autoscaler.
@@ -213,9 +245,9 @@ FileCacheMonitorMain(Datum main_arg)
{
/*
* Choose file system state monitor interval so that space can not be exosted
* during this period but not longer than MAX_MONITOR_INTERVAL (10 sec)
* during this period but not longer than MAX_MONITOR_INTERVAL (1 sec)
*/
uint64 monitor_interval = Min(MAX_MONITOR_INTERVAL_USEC, lfc_free_space_watermark*MB/MAX_DISK_WRITE_RATE);
uint64 monitor_interval = Min(MAX_MONITOR_INTERVAL_USEC, lfc_free_space_watermark*MB/MAX_MEM_WRITE_RATE);
/* Establish signal handlers. */
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
@@ -226,25 +258,17 @@ FileCacheMonitorMain(Datum main_arg)
/* Periodically dump buffers until terminated. */
while (!ShutdownRequestPending)
{
if (lfc_size_limit != 0)
if (lfc_size_limit != 0 && lfc_free_memory_watermark != 0 )
{
struct statvfs sfs;
if (statvfs(lfc_path, &sfs) < 0)
if (get_available_memory() < lfc_free_memory_watermark*MB)
{
elog(WARNING, "Failed to obtain status of %s: %m", lfc_path);
if (lfc_shrinking_factor < 31) {
lfc_shrinking_factor += 1;
}
lfc_change_limit_hook(lfc_size_limit >> lfc_shrinking_factor, NULL);
}
else
{
if (sfs.f_bavail*sfs.f_bsize < lfc_free_space_watermark*MB)
{
if (lfc_shrinking_factor < 31) {
lfc_shrinking_factor += 1;
}
lfc_change_limit_hook(lfc_size_limit >> lfc_shrinking_factor, NULL);
}
else
lfc_shrinking_factor = 0; /* reset to initial value */
}
lfc_shrinking_factor = 0; /* reset to initial value */
}
pg_usleep(monitor_interval);
}
@@ -278,6 +302,18 @@ lfc_init(void)
if (!process_shared_preload_libraries_in_progress)
elog(ERROR, "Neon module should be loaded via shared_preload_libraries");
/* TODO: left only for compatibility with on-disk cache */
DefineCustomStringVariable("neon.file_cache_path",
"Path to local file cache (can be raw device)",
NULL,
&lfc_path,
"file.cache",
PGC_POSTMASTER,
0,
NULL,
NULL,
NULL);
DefineCustomIntVariable("neon.max_file_cache_size",
"Maximal size of Neon local file cache",
NULL,
@@ -291,6 +327,19 @@ lfc_init(void)
NULL,
NULL);
DefineCustomIntVariable("neon.max_inmem_cache_size",
"Maximal size used by Neon local file cache in memory",
NULL,
&lfc_max_mem,
128, /* 128Mb */
0,
INT_MAX,
PGC_POSTMASTER,
GUC_UNIT_MB,
NULL,
NULL,
NULL);
DefineCustomIntVariable("neon.file_cache_size_limit",
"Current limit for size of Neon local file cache",
NULL,
@@ -304,11 +353,11 @@ lfc_init(void)
lfc_change_limit_hook,
NULL);
DefineCustomIntVariable("neon.free_space_watermark",
"Minimal free space in local file system after reaching which local file cache will be truncated",
DefineCustomIntVariable("neon.free_memory_watermark",
"Minimal free memory in system after reaching which local file cache will be truncated",
NULL,
&lfc_free_space_watermark,
1024, /* 1GB */
&lfc_free_memory_watermark,
0, /* disabled by default, because iurt makes sense only when local file cache is located i tmpfs */
0,
INT_MAX,
PGC_SIGHUP,
@@ -317,17 +366,6 @@ lfc_init(void)
NULL,
NULL);
DefineCustomStringVariable("neon.file_cache_path",
"Path to local file cache (can be raw device)",
NULL,
&lfc_path,
"file.cache",
PGC_POSTMASTER,
0,
NULL,
NULL,
NULL);
if (lfc_max_size == 0)
return;
@@ -476,27 +514,7 @@ lfc_read(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
dlist_delete(&entry->lru_node);
LWLockRelease(lfc_lock);
/* Open cache file if not done yet */
if (lfc_desc == 0)
{
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
if (lfc_desc < 0) {
elog(LOG, "Failed to open file cache %s: %m", lfc_path);
lfc_size_limit = 0; /* disable file cache */
result = false;
}
}
if (lfc_desc > 0)
{
rc = pread(lfc_desc, buffer, BLCKSZ, ((off_t)entry->offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ);
if (rc != BLCKSZ)
{
elog(INFO, "Failed to read file cache: %m");
lfc_size_limit = 0; /* disable file cache */
result = false;
}
}
memcpy(buffer, lfc_base_addr + ((size_t)entry->offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ, BLCKSZ);
/* Place entry to the head of LRU list */
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
@@ -569,24 +587,8 @@ lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
}
LWLockRelease(lfc_lock);
/* Open cache file if not done yet */
if (lfc_desc == 0)
{
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
if (lfc_desc < 0) {
elog(WARNING, "Failed to open file cache %s: %m, disabling file cache", lfc_path);
lfc_size_limit = 0; /* disable file cache */
}
}
if (lfc_desc > 0)
{
rc = pwrite(lfc_desc, buffer, BLCKSZ, ((off_t)entry->offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ);
if (rc != BLCKSZ)
{
elog(WARNING, "Failed to write file cache: %m, disabling file cache");
lfc_size_limit = 0; /* disable file cache */
}
}
memcpy(lfc_base_addr + ((size_t)entry->offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ, buffer, BLCKSZ);
/* Place entry to the head of LRU list */
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
Assert(entry->access_count > 0);