Compare commits

...

4 Commits

Author SHA1 Message Date
Konstantin Knizhnik
93b0f44a06 Set lfc_size_limit as Min(lfc_size_limit, lfc_max_mem) 2023-08-24 17:54:22 +03:00
Konstantin Knizhnik
fc206e60d2 Add lfc_max_mem GUC 2023-08-24 17:54:22 +03:00
Konstantin Knizhnik
7b8ccee8db Restore neon.lfs_file_parth GUC 2023-08-24 17:54:22 +03:00
Konstantin Knizhnik
d8d38f2c42 Monitor avaiable memory size in local file cache and shrink cache if watermark is reached 2023-08-24 17:54:22 +03:00

View File

@@ -14,6 +14,7 @@
*/ */
#include <sys/file.h> #include <sys/file.h>
#include <sys/mman.h>
#include <sys/statvfs.h> #include <sys/statvfs.h>
#include <unistd.h> #include <unistd.h>
#include <fcntl.h> #include <fcntl.h>
@@ -59,12 +60,17 @@
* 2. Improve access locality, subsequent pages will be allocated together improving seqscan speed * 2. Improve access locality, subsequent pages will be allocated together improving seqscan speed
*/ */
#define BLOCKS_PER_CHUNK 128 /* 1Mb chunk */ #define BLOCKS_PER_CHUNK 128 /* 1Mb chunk */
#define CHUNK_SIZE (BLOCKS_PER_CHUNK * BLCKSZ)
#define MB ((uint64)1024*1024) #define MB ((uint64)1024*1024)
#ifndef MADV_REMOVE
#define MADV_REMOVE MADV_FREE /* MacOS doesn't have MADV_REMOVE and at Linux MADV_FREE works only for MAP_PRIVATE */
#endif
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK)) #define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK))
#define MAX_MONITOR_INTERVAL_USEC 1000000 /* 1 second */ #define MAX_MONITOR_INTERVAL_USEC 1000000 /* 1 second */
#define MAX_DISK_WRITE_RATE 1000 /* MB/sec */ #define MAX_MEM_WRITE_RATE 10000 /* MB/sec */
typedef struct FileCacheEntry typedef struct FileCacheEntry
{ {
@@ -83,21 +89,56 @@ typedef struct FileCacheControl
} FileCacheControl; } FileCacheControl;
static HTAB* lfc_hash; static HTAB* lfc_hash;
static int lfc_desc;
static LWLockId lfc_lock; static LWLockId lfc_lock;
static int lfc_max_size; static int lfc_max_size;
static int lfc_max_mem;
static int lfc_size_limit; static int lfc_size_limit;
static int lfc_free_space_watermark; static int lfc_free_space_watermark;
static int lfc_free_memory_watermark;
static char* lfc_base_addr;
static char* lfc_path; static char* lfc_path;
static FileCacheControl* lfc_ctl; static FileCacheControl* lfc_ctl;
static shmem_startup_hook_type prev_shmem_startup_hook; static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM>=150000 #if PG_VERSION_NUM>=150000
static shmem_request_hook_type prev_shmem_request_hook; static shmem_request_hook_type prev_shmem_request_hook;
#endif #endif
static int lfc_shrinking_factor; /* power of two by which local cache size will be shrinked when lfc_free_space_watermark is reached */ static int lfc_shrinking_factor; /* power of two by which local cache size will be shrinked when lfc_free_space_watermark or lfc_free_memory_watermak are reached */
void FileCacheMonitorMain(Datum main_arg); void FileCacheMonitorMain(Datum main_arg);
#ifdef __APPLE__
#include <sys/types.h>
#include <sys/sysctl.h>
static size_t
get_available_memory(void)
{
size_t total;
size_t sizeof_total = sizeof(total);
if (sysctlbyname("hw.memsize", &total, &sizeof_total, NULL, 0) < 0)
elog(ERROR, "Failed to get amount of RAM: %m");
return total;
}
#else
#include <sys/sysinfo.h>
static size_t
get_available_memory(void)
{
struct sysinfo si;
if (sysinfo(&si) < 0)
elog(ERROR, "Failed to get amount of RAM: %m");
return si.totalram*si.mem_unit;
}
#endif
static void static void
lfc_shmem_startup(void) lfc_shmem_startup(void)
{ {
@@ -111,7 +152,7 @@ lfc_shmem_startup(void)
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
lfc_ctl = (FileCacheControl*)ShmemInitStruct("lfc", sizeof(FileCacheControl), &found); lfc_ctl = (FileCacheControl*)ShmemInitStruct("lfc", sizeof(FileCacheControl) + lfc_max_size*MB + CHUNK_SIZE, &found);
if (!found) if (!found)
{ {
uint32 lfc_size = SIZE_MB_TO_CHUNKS(lfc_max_size); uint32 lfc_size = SIZE_MB_TO_CHUNKS(lfc_max_size);
@@ -126,10 +167,10 @@ lfc_shmem_startup(void)
lfc_ctl->size = 0; lfc_ctl->size = 0;
lfc_ctl->used = 0; lfc_ctl->used = 0;
dlist_init(&lfc_ctl->lru); dlist_init(&lfc_ctl->lru);
/* Remove file cache on restart */
(void)unlink(lfc_path);
} }
lfc_base_addr = (char*)TYPEALIGN(CHUNK_SIZE, lfc_ctl+1);
if (!found)
madvise(lfc_base_addr, lfc_max_size*MB, MADV_REMOVE);
LWLockRelease(AddinShmemInitLock); LWLockRelease(AddinShmemInitLock);
} }
@@ -141,7 +182,9 @@ lfc_shmem_request(void)
prev_shmem_request_hook(); prev_shmem_request_hook();
#endif #endif
RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size)+1, sizeof(FileCacheEntry))); lfc_max_size = Min(lfc_max_size, lfc_max_mem);
lfc_size_limit = Min(lfc_size_limit, lfc_max_mem);
RequestAddinShmemSpace(sizeof(FileCacheControl) + lfc_max_size*MB + CHUNK_SIZE + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size)+1, sizeof(FileCacheEntry)));
RequestNamedLWLockTranche("lfc_lock", 1); RequestNamedLWLockTranche("lfc_lock", 1);
} }
@@ -167,26 +210,14 @@ lfc_change_limit_hook(int newval, void *extra)
if (!lfc_ctl || !UsedShmemSegAddr || IsParallelWorker()) if (!lfc_ctl || !UsedShmemSegAddr || IsParallelWorker())
return; return;
/* Open cache file if not done yet */
if (lfc_desc == 0)
{
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
if (lfc_desc < 0) {
elog(WARNING, "Failed to open file cache %s: %m, disabling file cache", lfc_path);
lfc_size_limit = 0; /* disable file cache */
return;
}
}
LWLockAcquire(lfc_lock, LW_EXCLUSIVE); LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
while (new_size < lfc_ctl->used && !dlist_is_empty(&lfc_ctl->lru)) while (new_size < lfc_ctl->used && !dlist_is_empty(&lfc_ctl->lru))
{ {
/* Shrink cache by throwing away least recently accessed chunks and returning their space to file system */ /* Shrink cache by throwing away least recently accessed chunks and returning their space to file system */
FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru)); FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru));
Assert(victim->access_count == 0); Assert(victim->access_count == 0);
#ifdef FALLOC_FL_PUNCH_HOLE if (madvise(lfc_base_addr + victim->offset*CHUNK_SIZE, CHUNK_SIZE, MADV_REMOVE) < 0)
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE, (off_t)victim->offset*BLOCKS_PER_CHUNK*BLCKSZ, BLOCKS_PER_CHUNK*BLCKSZ) < 0) elog(LOG, "Failed to punch hole in memory: %m");
elog(LOG, "Failed to punch hole in file: %m");
#endif
hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL); hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL);
lfc_ctl->used -= 1; lfc_ctl->used -= 1;
} }
@@ -195,10 +226,11 @@ lfc_change_limit_hook(int newval, void *extra)
} }
/* /*
* Local file system state monitor check available free space. * Local file system state monitor check available free space and memory.
* If it is lower than lfc_free_space_watermark then we shrink size of local cache * If available disk space is lower than lfc_free_space_watermark or
* available memory is lower than lfc_free_memory_watermark then we shrink size of local cache
* but throwing away least recently accessed chunks. * but throwing away least recently accessed chunks.
* First time low space watermark is reached cache size is divided by two, * First time the watermark is reached cache size is divided by two,
* second time by four,... Finally we remove all chunks from local cache. * second time by four,... Finally we remove all chunks from local cache.
* *
* Please notice that we are not changing lfc_cache_size: it is used to be adjusted by autoscaler. * Please notice that we are not changing lfc_cache_size: it is used to be adjusted by autoscaler.
@@ -213,9 +245,9 @@ FileCacheMonitorMain(Datum main_arg)
{ {
/* /*
* Choose file system state monitor interval so that space can not be exosted * Choose file system state monitor interval so that space can not be exosted
* during this period but not longer than MAX_MONITOR_INTERVAL (10 sec) * during this period but not longer than MAX_MONITOR_INTERVAL (1 sec)
*/ */
uint64 monitor_interval = Min(MAX_MONITOR_INTERVAL_USEC, lfc_free_space_watermark*MB/MAX_DISK_WRITE_RATE); uint64 monitor_interval = Min(MAX_MONITOR_INTERVAL_USEC, lfc_free_space_watermark*MB/MAX_MEM_WRITE_RATE);
/* Establish signal handlers. */ /* Establish signal handlers. */
pqsignal(SIGUSR1, procsignal_sigusr1_handler); pqsignal(SIGUSR1, procsignal_sigusr1_handler);
@@ -226,25 +258,17 @@ FileCacheMonitorMain(Datum main_arg)
/* Periodically dump buffers until terminated. */ /* Periodically dump buffers until terminated. */
while (!ShutdownRequestPending) while (!ShutdownRequestPending)
{ {
if (lfc_size_limit != 0) if (lfc_size_limit != 0 && lfc_free_memory_watermark != 0 )
{ {
struct statvfs sfs; if (get_available_memory() < lfc_free_memory_watermark*MB)
if (statvfs(lfc_path, &sfs) < 0)
{ {
elog(WARNING, "Failed to obtain status of %s: %m", lfc_path); if (lfc_shrinking_factor < 31) {
lfc_shrinking_factor += 1;
}
lfc_change_limit_hook(lfc_size_limit >> lfc_shrinking_factor, NULL);
} }
else else
{ lfc_shrinking_factor = 0; /* reset to initial value */
if (sfs.f_bavail*sfs.f_bsize < lfc_free_space_watermark*MB)
{
if (lfc_shrinking_factor < 31) {
lfc_shrinking_factor += 1;
}
lfc_change_limit_hook(lfc_size_limit >> lfc_shrinking_factor, NULL);
}
else
lfc_shrinking_factor = 0; /* reset to initial value */
}
} }
pg_usleep(monitor_interval); pg_usleep(monitor_interval);
} }
@@ -278,6 +302,18 @@ lfc_init(void)
if (!process_shared_preload_libraries_in_progress) if (!process_shared_preload_libraries_in_progress)
elog(ERROR, "Neon module should be loaded via shared_preload_libraries"); elog(ERROR, "Neon module should be loaded via shared_preload_libraries");
/* TODO: left only for compatibility with on-disk cache */
DefineCustomStringVariable("neon.file_cache_path",
"Path to local file cache (can be raw device)",
NULL,
&lfc_path,
"file.cache",
PGC_POSTMASTER,
0,
NULL,
NULL,
NULL);
DefineCustomIntVariable("neon.max_file_cache_size", DefineCustomIntVariable("neon.max_file_cache_size",
"Maximal size of Neon local file cache", "Maximal size of Neon local file cache",
NULL, NULL,
@@ -291,6 +327,19 @@ lfc_init(void)
NULL, NULL,
NULL); NULL);
DefineCustomIntVariable("neon.max_inmem_cache_size",
"Maximal size used by Neon local file cache in memory",
NULL,
&lfc_max_mem,
128, /* 128Mb */
0,
INT_MAX,
PGC_POSTMASTER,
GUC_UNIT_MB,
NULL,
NULL,
NULL);
DefineCustomIntVariable("neon.file_cache_size_limit", DefineCustomIntVariable("neon.file_cache_size_limit",
"Current limit for size of Neon local file cache", "Current limit for size of Neon local file cache",
NULL, NULL,
@@ -304,11 +353,11 @@ lfc_init(void)
lfc_change_limit_hook, lfc_change_limit_hook,
NULL); NULL);
DefineCustomIntVariable("neon.free_space_watermark", DefineCustomIntVariable("neon.free_memory_watermark",
"Minimal free space in local file system after reaching which local file cache will be truncated", "Minimal free memory in system after reaching which local file cache will be truncated",
NULL, NULL,
&lfc_free_space_watermark, &lfc_free_memory_watermark,
1024, /* 1GB */ 0, /* disabled by default, because iurt makes sense only when local file cache is located i tmpfs */
0, 0,
INT_MAX, INT_MAX,
PGC_SIGHUP, PGC_SIGHUP,
@@ -317,17 +366,6 @@ lfc_init(void)
NULL, NULL,
NULL); NULL);
DefineCustomStringVariable("neon.file_cache_path",
"Path to local file cache (can be raw device)",
NULL,
&lfc_path,
"file.cache",
PGC_POSTMASTER,
0,
NULL,
NULL,
NULL);
if (lfc_max_size == 0) if (lfc_max_size == 0)
return; return;
@@ -476,27 +514,7 @@ lfc_read(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
dlist_delete(&entry->lru_node); dlist_delete(&entry->lru_node);
LWLockRelease(lfc_lock); LWLockRelease(lfc_lock);
/* Open cache file if not done yet */ memcpy(buffer, lfc_base_addr + ((size_t)entry->offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ, BLCKSZ);
if (lfc_desc == 0)
{
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
if (lfc_desc < 0) {
elog(LOG, "Failed to open file cache %s: %m", lfc_path);
lfc_size_limit = 0; /* disable file cache */
result = false;
}
}
if (lfc_desc > 0)
{
rc = pread(lfc_desc, buffer, BLCKSZ, ((off_t)entry->offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ);
if (rc != BLCKSZ)
{
elog(INFO, "Failed to read file cache: %m");
lfc_size_limit = 0; /* disable file cache */
result = false;
}
}
/* Place entry to the head of LRU list */ /* Place entry to the head of LRU list */
LWLockAcquire(lfc_lock, LW_EXCLUSIVE); LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
@@ -569,24 +587,8 @@ lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
} }
LWLockRelease(lfc_lock); LWLockRelease(lfc_lock);
/* Open cache file if not done yet */ memcpy(lfc_base_addr + ((size_t)entry->offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ, buffer, BLCKSZ);
if (lfc_desc == 0)
{
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
if (lfc_desc < 0) {
elog(WARNING, "Failed to open file cache %s: %m, disabling file cache", lfc_path);
lfc_size_limit = 0; /* disable file cache */
}
}
if (lfc_desc > 0)
{
rc = pwrite(lfc_desc, buffer, BLCKSZ, ((off_t)entry->offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ);
if (rc != BLCKSZ)
{
elog(WARNING, "Failed to write file cache: %m, disabling file cache");
lfc_size_limit = 0; /* disable file cache */
}
}
/* Place entry to the head of LRU list */ /* Place entry to the head of LRU list */
LWLockAcquire(lfc_lock, LW_EXCLUSIVE); LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
Assert(entry->access_count > 0); Assert(entry->access_count > 0);