mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-18 10:52:55 +00:00
Compare commits
4 Commits
remove_ini
...
inmem_file
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
93b0f44a06 | ||
|
|
fc206e60d2 | ||
|
|
7b8ccee8db | ||
|
|
d8d38f2c42 |
@@ -14,6 +14,7 @@
|
||||
*/
|
||||
|
||||
#include <sys/file.h>
|
||||
#include <sys/mman.h>
|
||||
#include <sys/statvfs.h>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
@@ -59,12 +60,17 @@
|
||||
* 2. Improve access locality, subsequent pages will be allocated together improving seqscan speed
|
||||
*/
|
||||
#define BLOCKS_PER_CHUNK 128 /* 1Mb chunk */
|
||||
#define CHUNK_SIZE (BLOCKS_PER_CHUNK * BLCKSZ)
|
||||
#define MB ((uint64)1024*1024)
|
||||
|
||||
#ifndef MADV_REMOVE
|
||||
#define MADV_REMOVE MADV_FREE /* MacOS doesn't have MADV_REMOVE and at Linux MADV_FREE works only for MAP_PRIVATE */
|
||||
#endif
|
||||
|
||||
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK))
|
||||
|
||||
#define MAX_MONITOR_INTERVAL_USEC 1000000 /* 1 second */
|
||||
#define MAX_DISK_WRITE_RATE 1000 /* MB/sec */
|
||||
#define MAX_MEM_WRITE_RATE 10000 /* MB/sec */
|
||||
|
||||
typedef struct FileCacheEntry
|
||||
{
|
||||
@@ -83,21 +89,56 @@ typedef struct FileCacheControl
|
||||
} FileCacheControl;
|
||||
|
||||
static HTAB* lfc_hash;
|
||||
static int lfc_desc;
|
||||
static LWLockId lfc_lock;
|
||||
static int lfc_max_size;
|
||||
static int lfc_max_mem;
|
||||
static int lfc_size_limit;
|
||||
static int lfc_free_space_watermark;
|
||||
static int lfc_free_memory_watermark;
|
||||
static char* lfc_base_addr;
|
||||
static char* lfc_path;
|
||||
static FileCacheControl* lfc_ctl;
|
||||
static shmem_startup_hook_type prev_shmem_startup_hook;
|
||||
#if PG_VERSION_NUM>=150000
|
||||
static shmem_request_hook_type prev_shmem_request_hook;
|
||||
#endif
|
||||
static int lfc_shrinking_factor; /* power of two by which local cache size will be shrinked when lfc_free_space_watermark is reached */
|
||||
static int lfc_shrinking_factor; /* power of two by which local cache size will be shrinked when lfc_free_space_watermark or lfc_free_memory_watermak are reached */
|
||||
|
||||
void FileCacheMonitorMain(Datum main_arg);
|
||||
|
||||
#ifdef __APPLE__
|
||||
|
||||
#include <sys/types.h>
|
||||
#include <sys/sysctl.h>
|
||||
|
||||
static size_t
|
||||
get_available_memory(void)
|
||||
{
|
||||
size_t total;
|
||||
size_t sizeof_total = sizeof(total);
|
||||
if (sysctlbyname("hw.memsize", &total, &sizeof_total, NULL, 0) < 0)
|
||||
elog(ERROR, "Failed to get amount of RAM: %m");
|
||||
|
||||
return total;
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
#include <sys/sysinfo.h>
|
||||
|
||||
static size_t
|
||||
get_available_memory(void)
|
||||
{
|
||||
struct sysinfo si;
|
||||
if (sysinfo(&si) < 0)
|
||||
elog(ERROR, "Failed to get amount of RAM: %m");
|
||||
|
||||
return si.totalram*si.mem_unit;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
static void
|
||||
lfc_shmem_startup(void)
|
||||
{
|
||||
@@ -111,7 +152,7 @@ lfc_shmem_startup(void)
|
||||
|
||||
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
|
||||
|
||||
lfc_ctl = (FileCacheControl*)ShmemInitStruct("lfc", sizeof(FileCacheControl), &found);
|
||||
lfc_ctl = (FileCacheControl*)ShmemInitStruct("lfc", sizeof(FileCacheControl) + lfc_max_size*MB + CHUNK_SIZE, &found);
|
||||
if (!found)
|
||||
{
|
||||
uint32 lfc_size = SIZE_MB_TO_CHUNKS(lfc_max_size);
|
||||
@@ -126,10 +167,10 @@ lfc_shmem_startup(void)
|
||||
lfc_ctl->size = 0;
|
||||
lfc_ctl->used = 0;
|
||||
dlist_init(&lfc_ctl->lru);
|
||||
|
||||
/* Remove file cache on restart */
|
||||
(void)unlink(lfc_path);
|
||||
}
|
||||
lfc_base_addr = (char*)TYPEALIGN(CHUNK_SIZE, lfc_ctl+1);
|
||||
if (!found)
|
||||
madvise(lfc_base_addr, lfc_max_size*MB, MADV_REMOVE);
|
||||
LWLockRelease(AddinShmemInitLock);
|
||||
}
|
||||
|
||||
@@ -141,7 +182,9 @@ lfc_shmem_request(void)
|
||||
prev_shmem_request_hook();
|
||||
#endif
|
||||
|
||||
RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size)+1, sizeof(FileCacheEntry)));
|
||||
lfc_max_size = Min(lfc_max_size, lfc_max_mem);
|
||||
lfc_size_limit = Min(lfc_size_limit, lfc_max_mem);
|
||||
RequestAddinShmemSpace(sizeof(FileCacheControl) + lfc_max_size*MB + CHUNK_SIZE + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size)+1, sizeof(FileCacheEntry)));
|
||||
RequestNamedLWLockTranche("lfc_lock", 1);
|
||||
}
|
||||
|
||||
@@ -167,26 +210,14 @@ lfc_change_limit_hook(int newval, void *extra)
|
||||
if (!lfc_ctl || !UsedShmemSegAddr || IsParallelWorker())
|
||||
return;
|
||||
|
||||
/* Open cache file if not done yet */
|
||||
if (lfc_desc == 0)
|
||||
{
|
||||
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
|
||||
if (lfc_desc < 0) {
|
||||
elog(WARNING, "Failed to open file cache %s: %m, disabling file cache", lfc_path);
|
||||
lfc_size_limit = 0; /* disable file cache */
|
||||
return;
|
||||
}
|
||||
}
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
while (new_size < lfc_ctl->used && !dlist_is_empty(&lfc_ctl->lru))
|
||||
{
|
||||
/* Shrink cache by throwing away least recently accessed chunks and returning their space to file system */
|
||||
FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru));
|
||||
Assert(victim->access_count == 0);
|
||||
#ifdef FALLOC_FL_PUNCH_HOLE
|
||||
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE, (off_t)victim->offset*BLOCKS_PER_CHUNK*BLCKSZ, BLOCKS_PER_CHUNK*BLCKSZ) < 0)
|
||||
elog(LOG, "Failed to punch hole in file: %m");
|
||||
#endif
|
||||
if (madvise(lfc_base_addr + victim->offset*CHUNK_SIZE, CHUNK_SIZE, MADV_REMOVE) < 0)
|
||||
elog(LOG, "Failed to punch hole in memory: %m");
|
||||
hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL);
|
||||
lfc_ctl->used -= 1;
|
||||
}
|
||||
@@ -195,10 +226,11 @@ lfc_change_limit_hook(int newval, void *extra)
|
||||
}
|
||||
|
||||
/*
|
||||
* Local file system state monitor check available free space.
|
||||
* If it is lower than lfc_free_space_watermark then we shrink size of local cache
|
||||
* Local file system state monitor check available free space and memory.
|
||||
* If available disk space is lower than lfc_free_space_watermark or
|
||||
* available memory is lower than lfc_free_memory_watermark then we shrink size of local cache
|
||||
* but throwing away least recently accessed chunks.
|
||||
* First time low space watermark is reached cache size is divided by two,
|
||||
* First time the watermark is reached cache size is divided by two,
|
||||
* second time by four,... Finally we remove all chunks from local cache.
|
||||
*
|
||||
* Please notice that we are not changing lfc_cache_size: it is used to be adjusted by autoscaler.
|
||||
@@ -213,9 +245,9 @@ FileCacheMonitorMain(Datum main_arg)
|
||||
{
|
||||
/*
|
||||
* Choose file system state monitor interval so that space can not be exosted
|
||||
* during this period but not longer than MAX_MONITOR_INTERVAL (10 sec)
|
||||
* during this period but not longer than MAX_MONITOR_INTERVAL (1 sec)
|
||||
*/
|
||||
uint64 monitor_interval = Min(MAX_MONITOR_INTERVAL_USEC, lfc_free_space_watermark*MB/MAX_DISK_WRITE_RATE);
|
||||
uint64 monitor_interval = Min(MAX_MONITOR_INTERVAL_USEC, lfc_free_space_watermark*MB/MAX_MEM_WRITE_RATE);
|
||||
|
||||
/* Establish signal handlers. */
|
||||
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
|
||||
@@ -226,25 +258,17 @@ FileCacheMonitorMain(Datum main_arg)
|
||||
/* Periodically dump buffers until terminated. */
|
||||
while (!ShutdownRequestPending)
|
||||
{
|
||||
if (lfc_size_limit != 0)
|
||||
if (lfc_size_limit != 0 && lfc_free_memory_watermark != 0 )
|
||||
{
|
||||
struct statvfs sfs;
|
||||
if (statvfs(lfc_path, &sfs) < 0)
|
||||
if (get_available_memory() < lfc_free_memory_watermark*MB)
|
||||
{
|
||||
elog(WARNING, "Failed to obtain status of %s: %m", lfc_path);
|
||||
if (lfc_shrinking_factor < 31) {
|
||||
lfc_shrinking_factor += 1;
|
||||
}
|
||||
lfc_change_limit_hook(lfc_size_limit >> lfc_shrinking_factor, NULL);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (sfs.f_bavail*sfs.f_bsize < lfc_free_space_watermark*MB)
|
||||
{
|
||||
if (lfc_shrinking_factor < 31) {
|
||||
lfc_shrinking_factor += 1;
|
||||
}
|
||||
lfc_change_limit_hook(lfc_size_limit >> lfc_shrinking_factor, NULL);
|
||||
}
|
||||
else
|
||||
lfc_shrinking_factor = 0; /* reset to initial value */
|
||||
}
|
||||
lfc_shrinking_factor = 0; /* reset to initial value */
|
||||
}
|
||||
pg_usleep(monitor_interval);
|
||||
}
|
||||
@@ -278,6 +302,18 @@ lfc_init(void)
|
||||
if (!process_shared_preload_libraries_in_progress)
|
||||
elog(ERROR, "Neon module should be loaded via shared_preload_libraries");
|
||||
|
||||
/* TODO: left only for compatibility with on-disk cache */
|
||||
DefineCustomStringVariable("neon.file_cache_path",
|
||||
"Path to local file cache (can be raw device)",
|
||||
NULL,
|
||||
&lfc_path,
|
||||
"file.cache",
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL);
|
||||
|
||||
DefineCustomIntVariable("neon.max_file_cache_size",
|
||||
"Maximal size of Neon local file cache",
|
||||
NULL,
|
||||
@@ -291,6 +327,19 @@ lfc_init(void)
|
||||
NULL,
|
||||
NULL);
|
||||
|
||||
DefineCustomIntVariable("neon.max_inmem_cache_size",
|
||||
"Maximal size used by Neon local file cache in memory",
|
||||
NULL,
|
||||
&lfc_max_mem,
|
||||
128, /* 128Mb */
|
||||
0,
|
||||
INT_MAX,
|
||||
PGC_POSTMASTER,
|
||||
GUC_UNIT_MB,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL);
|
||||
|
||||
DefineCustomIntVariable("neon.file_cache_size_limit",
|
||||
"Current limit for size of Neon local file cache",
|
||||
NULL,
|
||||
@@ -304,11 +353,11 @@ lfc_init(void)
|
||||
lfc_change_limit_hook,
|
||||
NULL);
|
||||
|
||||
DefineCustomIntVariable("neon.free_space_watermark",
|
||||
"Minimal free space in local file system after reaching which local file cache will be truncated",
|
||||
DefineCustomIntVariable("neon.free_memory_watermark",
|
||||
"Minimal free memory in system after reaching which local file cache will be truncated",
|
||||
NULL,
|
||||
&lfc_free_space_watermark,
|
||||
1024, /* 1GB */
|
||||
&lfc_free_memory_watermark,
|
||||
0, /* disabled by default, because iurt makes sense only when local file cache is located i tmpfs */
|
||||
0,
|
||||
INT_MAX,
|
||||
PGC_SIGHUP,
|
||||
@@ -317,17 +366,6 @@ lfc_init(void)
|
||||
NULL,
|
||||
NULL);
|
||||
|
||||
DefineCustomStringVariable("neon.file_cache_path",
|
||||
"Path to local file cache (can be raw device)",
|
||||
NULL,
|
||||
&lfc_path,
|
||||
"file.cache",
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL);
|
||||
|
||||
if (lfc_max_size == 0)
|
||||
return;
|
||||
|
||||
@@ -476,27 +514,7 @@ lfc_read(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
dlist_delete(&entry->lru_node);
|
||||
LWLockRelease(lfc_lock);
|
||||
|
||||
/* Open cache file if not done yet */
|
||||
if (lfc_desc == 0)
|
||||
{
|
||||
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
|
||||
if (lfc_desc < 0) {
|
||||
elog(LOG, "Failed to open file cache %s: %m", lfc_path);
|
||||
lfc_size_limit = 0; /* disable file cache */
|
||||
result = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (lfc_desc > 0)
|
||||
{
|
||||
rc = pread(lfc_desc, buffer, BLCKSZ, ((off_t)entry->offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ);
|
||||
if (rc != BLCKSZ)
|
||||
{
|
||||
elog(INFO, "Failed to read file cache: %m");
|
||||
lfc_size_limit = 0; /* disable file cache */
|
||||
result = false;
|
||||
}
|
||||
}
|
||||
memcpy(buffer, lfc_base_addr + ((size_t)entry->offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ, BLCKSZ);
|
||||
|
||||
/* Place entry to the head of LRU list */
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
@@ -569,24 +587,8 @@ lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
}
|
||||
LWLockRelease(lfc_lock);
|
||||
|
||||
/* Open cache file if not done yet */
|
||||
if (lfc_desc == 0)
|
||||
{
|
||||
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
|
||||
if (lfc_desc < 0) {
|
||||
elog(WARNING, "Failed to open file cache %s: %m, disabling file cache", lfc_path);
|
||||
lfc_size_limit = 0; /* disable file cache */
|
||||
}
|
||||
}
|
||||
if (lfc_desc > 0)
|
||||
{
|
||||
rc = pwrite(lfc_desc, buffer, BLCKSZ, ((off_t)entry->offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ);
|
||||
if (rc != BLCKSZ)
|
||||
{
|
||||
elog(WARNING, "Failed to write file cache: %m, disabling file cache");
|
||||
lfc_size_limit = 0; /* disable file cache */
|
||||
}
|
||||
}
|
||||
memcpy(lfc_base_addr + ((size_t)entry->offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ, buffer, BLCKSZ);
|
||||
|
||||
/* Place entry to the head of LRU list */
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
Assert(entry->access_count > 0);
|
||||
|
||||
Reference in New Issue
Block a user