mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-12 07:00:36 +00:00
Compare commits
4 Commits
proxy_id
...
inmem_file
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
93b0f44a06 | ||
|
|
fc206e60d2 | ||
|
|
7b8ccee8db | ||
|
|
d8d38f2c42 |
@@ -14,6 +14,7 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include <sys/file.h>
|
#include <sys/file.h>
|
||||||
|
#include <sys/mman.h>
|
||||||
#include <sys/statvfs.h>
|
#include <sys/statvfs.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
@@ -59,12 +60,17 @@
|
|||||||
* 2. Improve access locality, subsequent pages will be allocated together improving seqscan speed
|
* 2. Improve access locality, subsequent pages will be allocated together improving seqscan speed
|
||||||
*/
|
*/
|
||||||
#define BLOCKS_PER_CHUNK 128 /* 1Mb chunk */
|
#define BLOCKS_PER_CHUNK 128 /* 1Mb chunk */
|
||||||
|
#define CHUNK_SIZE (BLOCKS_PER_CHUNK * BLCKSZ)
|
||||||
#define MB ((uint64)1024*1024)
|
#define MB ((uint64)1024*1024)
|
||||||
|
|
||||||
|
#ifndef MADV_REMOVE
|
||||||
|
#define MADV_REMOVE MADV_FREE /* MacOS doesn't have MADV_REMOVE and at Linux MADV_FREE works only for MAP_PRIVATE */
|
||||||
|
#endif
|
||||||
|
|
||||||
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK))
|
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK))
|
||||||
|
|
||||||
#define MAX_MONITOR_INTERVAL_USEC 1000000 /* 1 second */
|
#define MAX_MONITOR_INTERVAL_USEC 1000000 /* 1 second */
|
||||||
#define MAX_DISK_WRITE_RATE 1000 /* MB/sec */
|
#define MAX_MEM_WRITE_RATE 10000 /* MB/sec */
|
||||||
|
|
||||||
typedef struct FileCacheEntry
|
typedef struct FileCacheEntry
|
||||||
{
|
{
|
||||||
@@ -83,21 +89,56 @@ typedef struct FileCacheControl
|
|||||||
} FileCacheControl;
|
} FileCacheControl;
|
||||||
|
|
||||||
static HTAB* lfc_hash;
|
static HTAB* lfc_hash;
|
||||||
static int lfc_desc;
|
|
||||||
static LWLockId lfc_lock;
|
static LWLockId lfc_lock;
|
||||||
static int lfc_max_size;
|
static int lfc_max_size;
|
||||||
|
static int lfc_max_mem;
|
||||||
static int lfc_size_limit;
|
static int lfc_size_limit;
|
||||||
static int lfc_free_space_watermark;
|
static int lfc_free_space_watermark;
|
||||||
|
static int lfc_free_memory_watermark;
|
||||||
|
static char* lfc_base_addr;
|
||||||
static char* lfc_path;
|
static char* lfc_path;
|
||||||
static FileCacheControl* lfc_ctl;
|
static FileCacheControl* lfc_ctl;
|
||||||
static shmem_startup_hook_type prev_shmem_startup_hook;
|
static shmem_startup_hook_type prev_shmem_startup_hook;
|
||||||
#if PG_VERSION_NUM>=150000
|
#if PG_VERSION_NUM>=150000
|
||||||
static shmem_request_hook_type prev_shmem_request_hook;
|
static shmem_request_hook_type prev_shmem_request_hook;
|
||||||
#endif
|
#endif
|
||||||
static int lfc_shrinking_factor; /* power of two by which local cache size will be shrinked when lfc_free_space_watermark is reached */
|
static int lfc_shrinking_factor; /* power of two by which local cache size will be shrinked when lfc_free_space_watermark or lfc_free_memory_watermak are reached */
|
||||||
|
|
||||||
void FileCacheMonitorMain(Datum main_arg);
|
void FileCacheMonitorMain(Datum main_arg);
|
||||||
|
|
||||||
|
#ifdef __APPLE__
|
||||||
|
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <sys/sysctl.h>
|
||||||
|
|
||||||
|
static size_t
|
||||||
|
get_available_memory(void)
|
||||||
|
{
|
||||||
|
size_t total;
|
||||||
|
size_t sizeof_total = sizeof(total);
|
||||||
|
if (sysctlbyname("hw.memsize", &total, &sizeof_total, NULL, 0) < 0)
|
||||||
|
elog(ERROR, "Failed to get amount of RAM: %m");
|
||||||
|
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
|
||||||
|
#else
|
||||||
|
|
||||||
|
#include <sys/sysinfo.h>
|
||||||
|
|
||||||
|
static size_t
|
||||||
|
get_available_memory(void)
|
||||||
|
{
|
||||||
|
struct sysinfo si;
|
||||||
|
if (sysinfo(&si) < 0)
|
||||||
|
elog(ERROR, "Failed to get amount of RAM: %m");
|
||||||
|
|
||||||
|
return si.totalram*si.mem_unit;
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
lfc_shmem_startup(void)
|
lfc_shmem_startup(void)
|
||||||
{
|
{
|
||||||
@@ -111,7 +152,7 @@ lfc_shmem_startup(void)
|
|||||||
|
|
||||||
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
|
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
lfc_ctl = (FileCacheControl*)ShmemInitStruct("lfc", sizeof(FileCacheControl), &found);
|
lfc_ctl = (FileCacheControl*)ShmemInitStruct("lfc", sizeof(FileCacheControl) + lfc_max_size*MB + CHUNK_SIZE, &found);
|
||||||
if (!found)
|
if (!found)
|
||||||
{
|
{
|
||||||
uint32 lfc_size = SIZE_MB_TO_CHUNKS(lfc_max_size);
|
uint32 lfc_size = SIZE_MB_TO_CHUNKS(lfc_max_size);
|
||||||
@@ -126,10 +167,10 @@ lfc_shmem_startup(void)
|
|||||||
lfc_ctl->size = 0;
|
lfc_ctl->size = 0;
|
||||||
lfc_ctl->used = 0;
|
lfc_ctl->used = 0;
|
||||||
dlist_init(&lfc_ctl->lru);
|
dlist_init(&lfc_ctl->lru);
|
||||||
|
|
||||||
/* Remove file cache on restart */
|
|
||||||
(void)unlink(lfc_path);
|
|
||||||
}
|
}
|
||||||
|
lfc_base_addr = (char*)TYPEALIGN(CHUNK_SIZE, lfc_ctl+1);
|
||||||
|
if (!found)
|
||||||
|
madvise(lfc_base_addr, lfc_max_size*MB, MADV_REMOVE);
|
||||||
LWLockRelease(AddinShmemInitLock);
|
LWLockRelease(AddinShmemInitLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -141,7 +182,9 @@ lfc_shmem_request(void)
|
|||||||
prev_shmem_request_hook();
|
prev_shmem_request_hook();
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size)+1, sizeof(FileCacheEntry)));
|
lfc_max_size = Min(lfc_max_size, lfc_max_mem);
|
||||||
|
lfc_size_limit = Min(lfc_size_limit, lfc_max_mem);
|
||||||
|
RequestAddinShmemSpace(sizeof(FileCacheControl) + lfc_max_size*MB + CHUNK_SIZE + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size)+1, sizeof(FileCacheEntry)));
|
||||||
RequestNamedLWLockTranche("lfc_lock", 1);
|
RequestNamedLWLockTranche("lfc_lock", 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -167,26 +210,14 @@ lfc_change_limit_hook(int newval, void *extra)
|
|||||||
if (!lfc_ctl || !UsedShmemSegAddr || IsParallelWorker())
|
if (!lfc_ctl || !UsedShmemSegAddr || IsParallelWorker())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
/* Open cache file if not done yet */
|
|
||||||
if (lfc_desc == 0)
|
|
||||||
{
|
|
||||||
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
|
|
||||||
if (lfc_desc < 0) {
|
|
||||||
elog(WARNING, "Failed to open file cache %s: %m, disabling file cache", lfc_path);
|
|
||||||
lfc_size_limit = 0; /* disable file cache */
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||||
while (new_size < lfc_ctl->used && !dlist_is_empty(&lfc_ctl->lru))
|
while (new_size < lfc_ctl->used && !dlist_is_empty(&lfc_ctl->lru))
|
||||||
{
|
{
|
||||||
/* Shrink cache by throwing away least recently accessed chunks and returning their space to file system */
|
/* Shrink cache by throwing away least recently accessed chunks and returning their space to file system */
|
||||||
FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru));
|
FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru));
|
||||||
Assert(victim->access_count == 0);
|
Assert(victim->access_count == 0);
|
||||||
#ifdef FALLOC_FL_PUNCH_HOLE
|
if (madvise(lfc_base_addr + victim->offset*CHUNK_SIZE, CHUNK_SIZE, MADV_REMOVE) < 0)
|
||||||
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE, (off_t)victim->offset*BLOCKS_PER_CHUNK*BLCKSZ, BLOCKS_PER_CHUNK*BLCKSZ) < 0)
|
elog(LOG, "Failed to punch hole in memory: %m");
|
||||||
elog(LOG, "Failed to punch hole in file: %m");
|
|
||||||
#endif
|
|
||||||
hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL);
|
hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL);
|
||||||
lfc_ctl->used -= 1;
|
lfc_ctl->used -= 1;
|
||||||
}
|
}
|
||||||
@@ -195,10 +226,11 @@ lfc_change_limit_hook(int newval, void *extra)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Local file system state monitor check available free space.
|
* Local file system state monitor check available free space and memory.
|
||||||
* If it is lower than lfc_free_space_watermark then we shrink size of local cache
|
* If available disk space is lower than lfc_free_space_watermark or
|
||||||
|
* available memory is lower than lfc_free_memory_watermark then we shrink size of local cache
|
||||||
* but throwing away least recently accessed chunks.
|
* but throwing away least recently accessed chunks.
|
||||||
* First time low space watermark is reached cache size is divided by two,
|
* First time the watermark is reached cache size is divided by two,
|
||||||
* second time by four,... Finally we remove all chunks from local cache.
|
* second time by four,... Finally we remove all chunks from local cache.
|
||||||
*
|
*
|
||||||
* Please notice that we are not changing lfc_cache_size: it is used to be adjusted by autoscaler.
|
* Please notice that we are not changing lfc_cache_size: it is used to be adjusted by autoscaler.
|
||||||
@@ -213,9 +245,9 @@ FileCacheMonitorMain(Datum main_arg)
|
|||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Choose file system state monitor interval so that space can not be exosted
|
* Choose file system state monitor interval so that space can not be exosted
|
||||||
* during this period but not longer than MAX_MONITOR_INTERVAL (10 sec)
|
* during this period but not longer than MAX_MONITOR_INTERVAL (1 sec)
|
||||||
*/
|
*/
|
||||||
uint64 monitor_interval = Min(MAX_MONITOR_INTERVAL_USEC, lfc_free_space_watermark*MB/MAX_DISK_WRITE_RATE);
|
uint64 monitor_interval = Min(MAX_MONITOR_INTERVAL_USEC, lfc_free_space_watermark*MB/MAX_MEM_WRITE_RATE);
|
||||||
|
|
||||||
/* Establish signal handlers. */
|
/* Establish signal handlers. */
|
||||||
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
|
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
|
||||||
@@ -226,25 +258,17 @@ FileCacheMonitorMain(Datum main_arg)
|
|||||||
/* Periodically dump buffers until terminated. */
|
/* Periodically dump buffers until terminated. */
|
||||||
while (!ShutdownRequestPending)
|
while (!ShutdownRequestPending)
|
||||||
{
|
{
|
||||||
if (lfc_size_limit != 0)
|
if (lfc_size_limit != 0 && lfc_free_memory_watermark != 0 )
|
||||||
{
|
{
|
||||||
struct statvfs sfs;
|
if (get_available_memory() < lfc_free_memory_watermark*MB)
|
||||||
if (statvfs(lfc_path, &sfs) < 0)
|
|
||||||
{
|
{
|
||||||
elog(WARNING, "Failed to obtain status of %s: %m", lfc_path);
|
if (lfc_shrinking_factor < 31) {
|
||||||
|
lfc_shrinking_factor += 1;
|
||||||
|
}
|
||||||
|
lfc_change_limit_hook(lfc_size_limit >> lfc_shrinking_factor, NULL);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
lfc_shrinking_factor = 0; /* reset to initial value */
|
||||||
if (sfs.f_bavail*sfs.f_bsize < lfc_free_space_watermark*MB)
|
|
||||||
{
|
|
||||||
if (lfc_shrinking_factor < 31) {
|
|
||||||
lfc_shrinking_factor += 1;
|
|
||||||
}
|
|
||||||
lfc_change_limit_hook(lfc_size_limit >> lfc_shrinking_factor, NULL);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
lfc_shrinking_factor = 0; /* reset to initial value */
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
pg_usleep(monitor_interval);
|
pg_usleep(monitor_interval);
|
||||||
}
|
}
|
||||||
@@ -278,6 +302,18 @@ lfc_init(void)
|
|||||||
if (!process_shared_preload_libraries_in_progress)
|
if (!process_shared_preload_libraries_in_progress)
|
||||||
elog(ERROR, "Neon module should be loaded via shared_preload_libraries");
|
elog(ERROR, "Neon module should be loaded via shared_preload_libraries");
|
||||||
|
|
||||||
|
/* TODO: left only for compatibility with on-disk cache */
|
||||||
|
DefineCustomStringVariable("neon.file_cache_path",
|
||||||
|
"Path to local file cache (can be raw device)",
|
||||||
|
NULL,
|
||||||
|
&lfc_path,
|
||||||
|
"file.cache",
|
||||||
|
PGC_POSTMASTER,
|
||||||
|
0,
|
||||||
|
NULL,
|
||||||
|
NULL,
|
||||||
|
NULL);
|
||||||
|
|
||||||
DefineCustomIntVariable("neon.max_file_cache_size",
|
DefineCustomIntVariable("neon.max_file_cache_size",
|
||||||
"Maximal size of Neon local file cache",
|
"Maximal size of Neon local file cache",
|
||||||
NULL,
|
NULL,
|
||||||
@@ -291,6 +327,19 @@ lfc_init(void)
|
|||||||
NULL,
|
NULL,
|
||||||
NULL);
|
NULL);
|
||||||
|
|
||||||
|
DefineCustomIntVariable("neon.max_inmem_cache_size",
|
||||||
|
"Maximal size used by Neon local file cache in memory",
|
||||||
|
NULL,
|
||||||
|
&lfc_max_mem,
|
||||||
|
128, /* 128Mb */
|
||||||
|
0,
|
||||||
|
INT_MAX,
|
||||||
|
PGC_POSTMASTER,
|
||||||
|
GUC_UNIT_MB,
|
||||||
|
NULL,
|
||||||
|
NULL,
|
||||||
|
NULL);
|
||||||
|
|
||||||
DefineCustomIntVariable("neon.file_cache_size_limit",
|
DefineCustomIntVariable("neon.file_cache_size_limit",
|
||||||
"Current limit for size of Neon local file cache",
|
"Current limit for size of Neon local file cache",
|
||||||
NULL,
|
NULL,
|
||||||
@@ -304,11 +353,11 @@ lfc_init(void)
|
|||||||
lfc_change_limit_hook,
|
lfc_change_limit_hook,
|
||||||
NULL);
|
NULL);
|
||||||
|
|
||||||
DefineCustomIntVariable("neon.free_space_watermark",
|
DefineCustomIntVariable("neon.free_memory_watermark",
|
||||||
"Minimal free space in local file system after reaching which local file cache will be truncated",
|
"Minimal free memory in system after reaching which local file cache will be truncated",
|
||||||
NULL,
|
NULL,
|
||||||
&lfc_free_space_watermark,
|
&lfc_free_memory_watermark,
|
||||||
1024, /* 1GB */
|
0, /* disabled by default, because iurt makes sense only when local file cache is located i tmpfs */
|
||||||
0,
|
0,
|
||||||
INT_MAX,
|
INT_MAX,
|
||||||
PGC_SIGHUP,
|
PGC_SIGHUP,
|
||||||
@@ -317,17 +366,6 @@ lfc_init(void)
|
|||||||
NULL,
|
NULL,
|
||||||
NULL);
|
NULL);
|
||||||
|
|
||||||
DefineCustomStringVariable("neon.file_cache_path",
|
|
||||||
"Path to local file cache (can be raw device)",
|
|
||||||
NULL,
|
|
||||||
&lfc_path,
|
|
||||||
"file.cache",
|
|
||||||
PGC_POSTMASTER,
|
|
||||||
0,
|
|
||||||
NULL,
|
|
||||||
NULL,
|
|
||||||
NULL);
|
|
||||||
|
|
||||||
if (lfc_max_size == 0)
|
if (lfc_max_size == 0)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
@@ -476,27 +514,7 @@ lfc_read(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
|
|||||||
dlist_delete(&entry->lru_node);
|
dlist_delete(&entry->lru_node);
|
||||||
LWLockRelease(lfc_lock);
|
LWLockRelease(lfc_lock);
|
||||||
|
|
||||||
/* Open cache file if not done yet */
|
memcpy(buffer, lfc_base_addr + ((size_t)entry->offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ, BLCKSZ);
|
||||||
if (lfc_desc == 0)
|
|
||||||
{
|
|
||||||
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
|
|
||||||
if (lfc_desc < 0) {
|
|
||||||
elog(LOG, "Failed to open file cache %s: %m", lfc_path);
|
|
||||||
lfc_size_limit = 0; /* disable file cache */
|
|
||||||
result = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (lfc_desc > 0)
|
|
||||||
{
|
|
||||||
rc = pread(lfc_desc, buffer, BLCKSZ, ((off_t)entry->offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ);
|
|
||||||
if (rc != BLCKSZ)
|
|
||||||
{
|
|
||||||
elog(INFO, "Failed to read file cache: %m");
|
|
||||||
lfc_size_limit = 0; /* disable file cache */
|
|
||||||
result = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Place entry to the head of LRU list */
|
/* Place entry to the head of LRU list */
|
||||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||||
@@ -569,24 +587,8 @@ lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
|
|||||||
}
|
}
|
||||||
LWLockRelease(lfc_lock);
|
LWLockRelease(lfc_lock);
|
||||||
|
|
||||||
/* Open cache file if not done yet */
|
memcpy(lfc_base_addr + ((size_t)entry->offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ, buffer, BLCKSZ);
|
||||||
if (lfc_desc == 0)
|
|
||||||
{
|
|
||||||
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
|
|
||||||
if (lfc_desc < 0) {
|
|
||||||
elog(WARNING, "Failed to open file cache %s: %m, disabling file cache", lfc_path);
|
|
||||||
lfc_size_limit = 0; /* disable file cache */
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (lfc_desc > 0)
|
|
||||||
{
|
|
||||||
rc = pwrite(lfc_desc, buffer, BLCKSZ, ((off_t)entry->offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ);
|
|
||||||
if (rc != BLCKSZ)
|
|
||||||
{
|
|
||||||
elog(WARNING, "Failed to write file cache: %m, disabling file cache");
|
|
||||||
lfc_size_limit = 0; /* disable file cache */
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/* Place entry to the head of LRU list */
|
/* Place entry to the head of LRU list */
|
||||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||||
Assert(entry->access_count > 0);
|
Assert(entry->access_count > 0);
|
||||||
|
|||||||
Reference in New Issue
Block a user