mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-23 02:00:38 +00:00
Compare commits
8 Commits
proxy-cpla
...
lfc_fixes2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
773a8836cc | ||
|
|
da34a9ed89 | ||
|
|
e4b74e375c | ||
|
|
c3d1e26361 | ||
|
|
665eff5e08 | ||
|
|
7aeec2bc9d | ||
|
|
bc3a5d571f | ||
|
|
2324e5a3d3 |
@@ -20,7 +20,7 @@ SHLIB_LINK_INTERNAL = $(libpq)
|
|||||||
SHLIB_LINK = -lcurl
|
SHLIB_LINK = -lcurl
|
||||||
|
|
||||||
EXTENSION = neon
|
EXTENSION = neon
|
||||||
DATA = neon--1.0.sql
|
DATA = neon--1.0.sql neon--1.0--1.1.sql
|
||||||
PGFILEDESC = "neon - cloud storage for PostgreSQL"
|
PGFILEDESC = "neon - cloud storage for PostgreSQL"
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -32,11 +32,13 @@
|
|||||||
#include "storage/latch.h"
|
#include "storage/latch.h"
|
||||||
#include "storage/ipc.h"
|
#include "storage/ipc.h"
|
||||||
#include "storage/lwlock.h"
|
#include "storage/lwlock.h"
|
||||||
|
#include "utils/builtins.h"
|
||||||
#include "utils/dynahash.h"
|
#include "utils/dynahash.h"
|
||||||
#include "utils/guc.h"
|
#include "utils/guc.h"
|
||||||
#include "storage/fd.h"
|
#include "storage/fd.h"
|
||||||
#include "storage/pg_shmem.h"
|
#include "storage/pg_shmem.h"
|
||||||
#include "storage/buf_internals.h"
|
#include "storage/buf_internals.h"
|
||||||
|
#include "pgstat.h"
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Local file cache is used to temporary store relations pages in local file system.
|
* Local file cache is used to temporary store relations pages in local file system.
|
||||||
@@ -65,6 +67,7 @@
|
|||||||
typedef struct FileCacheEntry
|
typedef struct FileCacheEntry
|
||||||
{
|
{
|
||||||
BufferTag key;
|
BufferTag key;
|
||||||
|
uint32 hash;
|
||||||
uint32 offset;
|
uint32 offset;
|
||||||
uint32 access_count;
|
uint32 access_count;
|
||||||
uint32 bitmap[BLOCKS_PER_CHUNK/32];
|
uint32 bitmap[BLOCKS_PER_CHUNK/32];
|
||||||
@@ -76,6 +79,9 @@ typedef struct FileCacheControl
|
|||||||
uint64 generation; /* generation is needed to handle correct hash reenabling */
|
uint64 generation; /* generation is needed to handle correct hash reenabling */
|
||||||
uint32 size; /* size of cache file in chunks */
|
uint32 size; /* size of cache file in chunks */
|
||||||
uint32 used; /* number of used chunks */
|
uint32 used; /* number of used chunks */
|
||||||
|
uint32 limit; /* shared copy of lfc_size_limit */
|
||||||
|
uint64 hits;
|
||||||
|
uint64 misses;
|
||||||
dlist_head lru; /* double linked list for LRU replacement algorithm */
|
dlist_head lru; /* double linked list for LRU replacement algorithm */
|
||||||
} FileCacheControl;
|
} FileCacheControl;
|
||||||
|
|
||||||
@@ -91,10 +97,12 @@ static shmem_startup_hook_type prev_shmem_startup_hook;
|
|||||||
static shmem_request_hook_type prev_shmem_request_hook;
|
static shmem_request_hook_type prev_shmem_request_hook;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
void FileCacheMonitorMain(Datum main_arg);
|
#define LFC_ENABLED() (lfc_ctl->limit != 0)
|
||||||
|
|
||||||
|
void PGDLLEXPORT FileCacheMonitorMain(Datum main_arg);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Local file cache is mandatory and Neon can work without it.
|
* Local file cache is optional and Neon can work without it.
|
||||||
* In case of any any errors with this cache, we should disable it but to not throw error.
|
* In case of any any errors with this cache, we should disable it but to not throw error.
|
||||||
* Also we should allow re-enable it if source of failure (lack of disk space, permissions,...) is fixed.
|
* Also we should allow re-enable it if source of failure (lack of disk space, permissions,...) is fixed.
|
||||||
* All cache content should be invalidated to avoid reading of stale or corrupted data
|
* All cache content should be invalidated to avoid reading of stale or corrupted data
|
||||||
@@ -102,49 +110,68 @@ void FileCacheMonitorMain(Datum main_arg);
|
|||||||
static void
|
static void
|
||||||
lfc_disable(char const* op)
|
lfc_disable(char const* op)
|
||||||
{
|
{
|
||||||
HASH_SEQ_STATUS status;
|
|
||||||
FileCacheEntry* entry;
|
|
||||||
|
|
||||||
elog(WARNING, "Failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
|
elog(WARNING, "Failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
|
||||||
|
|
||||||
|
/* Invalidate hash */
|
||||||
|
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
if (LFC_ENABLED())
|
||||||
|
{
|
||||||
|
HASH_SEQ_STATUS status;
|
||||||
|
FileCacheEntry* entry;
|
||||||
|
|
||||||
|
hash_seq_init(&status, lfc_hash);
|
||||||
|
while ((entry = hash_seq_search(&status)) != NULL)
|
||||||
|
{
|
||||||
|
hash_search_with_hash_value(lfc_hash, &entry->key, entry->hash, HASH_REMOVE, NULL);
|
||||||
|
}
|
||||||
|
lfc_ctl->generation += 1;
|
||||||
|
lfc_ctl->size = 0;
|
||||||
|
lfc_ctl->used = 0;
|
||||||
|
lfc_ctl->limit = 0;
|
||||||
|
dlist_init(&lfc_ctl->lru);
|
||||||
|
|
||||||
|
if (lfc_desc > 0)
|
||||||
|
{
|
||||||
|
/* If the reason of error is ENOSPC, then truncation of file may help to reclaim some space */
|
||||||
|
int rc = ftruncate(lfc_desc, 0);
|
||||||
|
if (rc < 0)
|
||||||
|
elog(WARNING, "Failed to truncate local file cache %s: %m", lfc_path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LWLockRelease(lfc_lock);
|
||||||
|
|
||||||
|
|
||||||
if (lfc_desc > 0)
|
if (lfc_desc > 0)
|
||||||
close(lfc_desc);
|
close(lfc_desc);
|
||||||
|
|
||||||
lfc_desc = -1;
|
lfc_desc = -1;
|
||||||
lfc_size_limit = 0;
|
}
|
||||||
|
|
||||||
/* Invalidate hash */
|
/*
|
||||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
* This check is done without obtaining lfc_lock, so it is unreliable
|
||||||
|
*/
|
||||||
hash_seq_init(&status, lfc_hash);
|
static bool
|
||||||
while ((entry = hash_seq_search(&status)) != NULL)
|
lfc_maybe_disabled(void)
|
||||||
{
|
{
|
||||||
hash_search(lfc_hash, &entry->key, HASH_REMOVE, NULL);
|
return !lfc_ctl || !LFC_ENABLED();
|
||||||
memset(entry->bitmap, 0, sizeof entry->bitmap);
|
|
||||||
}
|
|
||||||
hash_seq_term(&status);
|
|
||||||
lfc_ctl->generation += 1;
|
|
||||||
lfc_ctl->size = 0;
|
|
||||||
lfc_ctl->used = 0;
|
|
||||||
dlist_init(&lfc_ctl->lru);
|
|
||||||
|
|
||||||
LWLockRelease(lfc_lock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
lfc_ensure_opened(void)
|
lfc_ensure_opened(void)
|
||||||
{
|
{
|
||||||
|
bool enabled = !lfc_maybe_disabled();
|
||||||
/* Open cache file if not done yet */
|
/* Open cache file if not done yet */
|
||||||
if (lfc_desc <= 0)
|
if (lfc_desc <= 0 && enabled)
|
||||||
{
|
{
|
||||||
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
|
lfc_desc = BasicOpenFile(lfc_path, O_RDWR);
|
||||||
|
|
||||||
if (lfc_desc < 0) {
|
if (lfc_desc < 0) {
|
||||||
lfc_disable("open");
|
lfc_disable("open");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true;
|
return enabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
@@ -163,6 +190,7 @@ lfc_shmem_startup(void)
|
|||||||
lfc_ctl = (FileCacheControl*)ShmemInitStruct("lfc", sizeof(FileCacheControl), &found);
|
lfc_ctl = (FileCacheControl*)ShmemInitStruct("lfc", sizeof(FileCacheControl), &found);
|
||||||
if (!found)
|
if (!found)
|
||||||
{
|
{
|
||||||
|
int fd;
|
||||||
uint32 lfc_size = SIZE_MB_TO_CHUNKS(lfc_max_size);
|
uint32 lfc_size = SIZE_MB_TO_CHUNKS(lfc_max_size);
|
||||||
lfc_lock = (LWLockId)GetNamedLWLockTranche("lfc_lock");
|
lfc_lock = (LWLockId)GetNamedLWLockTranche("lfc_lock");
|
||||||
info.keysize = sizeof(BufferTag);
|
info.keysize = sizeof(BufferTag);
|
||||||
@@ -175,10 +203,22 @@ lfc_shmem_startup(void)
|
|||||||
lfc_ctl->generation = 0;
|
lfc_ctl->generation = 0;
|
||||||
lfc_ctl->size = 0;
|
lfc_ctl->size = 0;
|
||||||
lfc_ctl->used = 0;
|
lfc_ctl->used = 0;
|
||||||
|
lfc_ctl->hits = 0;
|
||||||
|
lfc_ctl->misses = 0;
|
||||||
dlist_init(&lfc_ctl->lru);
|
dlist_init(&lfc_ctl->lru);
|
||||||
|
|
||||||
/* Remove file cache on restart */
|
/* Recreate file cache on restart */
|
||||||
(void)unlink(lfc_path);
|
fd = BasicOpenFile(lfc_path, O_RDWR|O_CREAT|O_TRUNC);
|
||||||
|
if (fd < 0)
|
||||||
|
{
|
||||||
|
elog(WARNING, "Failed to create local file cache %s: %m", lfc_path);
|
||||||
|
lfc_ctl->limit = 0;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
close(fd);
|
||||||
|
lfc_ctl->limit = SIZE_MB_TO_CHUNKS(lfc_size_limit);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
LWLockRelease(AddinShmemInitLock);
|
LWLockRelease(AddinShmemInitLock);
|
||||||
}
|
}
|
||||||
@@ -195,6 +235,17 @@ lfc_shmem_request(void)
|
|||||||
RequestNamedLWLockTranche("lfc_lock", 1);
|
RequestNamedLWLockTranche("lfc_lock", 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool
|
||||||
|
is_normal_backend(void)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Stats collector detach shared memory, so we should not try to access shared memory here.
|
||||||
|
* Parallel workers first assign default value (0), so not perform truncation in parallel workers.
|
||||||
|
* The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC.
|
||||||
|
*/
|
||||||
|
return lfc_ctl && MyProc && UsedShmemSegAddr && !IsParallelWorker();
|
||||||
|
}
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
lfc_check_limit_hook(int *newval, void **extra, GucSource source)
|
lfc_check_limit_hook(int *newval, void **extra, GucSource source)
|
||||||
{
|
{
|
||||||
@@ -210,25 +261,15 @@ static void
|
|||||||
lfc_change_limit_hook(int newval, void *extra)
|
lfc_change_limit_hook(int newval, void *extra)
|
||||||
{
|
{
|
||||||
uint32 new_size = SIZE_MB_TO_CHUNKS(newval);
|
uint32 new_size = SIZE_MB_TO_CHUNKS(newval);
|
||||||
/*
|
|
||||||
* Stats collector detach shared memory, so we should not try to access shared memory here.
|
if (!is_normal_backend())
|
||||||
* Parallel workers first assign default value (0), so not perform truncation in parallel workers.
|
return;
|
||||||
* The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC.
|
|
||||||
*/
|
if (!lfc_ensure_opened())
|
||||||
if (!lfc_ctl || !MyProc || !UsedShmemSegAddr || IsParallelWorker())
|
|
||||||
return;
|
return;
|
||||||
|
|
||||||
/* Open cache file if not done yet */
|
|
||||||
if (lfc_desc <= 0)
|
|
||||||
{
|
|
||||||
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
|
|
||||||
if (lfc_desc < 0) {
|
|
||||||
elog(WARNING, "Failed to open file cache %s: %m, disabling file cache", lfc_path);
|
|
||||||
lfc_size_limit = 0; /* disable file cache */
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
while (new_size < lfc_ctl->used && !dlist_is_empty(&lfc_ctl->lru))
|
while (new_size < lfc_ctl->used && !dlist_is_empty(&lfc_ctl->lru))
|
||||||
{
|
{
|
||||||
/* Shrink cache by throwing away least recently accessed chunks and returning their space to file system */
|
/* Shrink cache by throwing away least recently accessed chunks and returning their space to file system */
|
||||||
@@ -238,10 +279,12 @@ lfc_change_limit_hook(int newval, void *extra)
|
|||||||
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE, (off_t)victim->offset*BLOCKS_PER_CHUNK*BLCKSZ, BLOCKS_PER_CHUNK*BLCKSZ) < 0)
|
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE, (off_t)victim->offset*BLOCKS_PER_CHUNK*BLCKSZ, BLOCKS_PER_CHUNK*BLCKSZ) < 0)
|
||||||
elog(LOG, "Failed to punch hole in file: %m");
|
elog(LOG, "Failed to punch hole in file: %m");
|
||||||
#endif
|
#endif
|
||||||
hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL);
|
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
|
||||||
lfc_ctl->used -= 1;
|
lfc_ctl->used -= 1;
|
||||||
}
|
}
|
||||||
|
lfc_ctl->limit = new_size;
|
||||||
elog(DEBUG1, "set local file cache limit to %d", new_size);
|
elog(DEBUG1, "set local file cache limit to %d", new_size);
|
||||||
|
|
||||||
LWLockRelease(lfc_lock);
|
LWLockRelease(lfc_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -255,6 +298,7 @@ lfc_init(void)
|
|||||||
if (!process_shared_preload_libraries_in_progress)
|
if (!process_shared_preload_libraries_in_progress)
|
||||||
elog(ERROR, "Neon module should be loaded via shared_preload_libraries");
|
elog(ERROR, "Neon module should be loaded via shared_preload_libraries");
|
||||||
|
|
||||||
|
|
||||||
DefineCustomIntVariable("neon.max_file_cache_size",
|
DefineCustomIntVariable("neon.max_file_cache_size",
|
||||||
"Maximal size of Neon local file cache",
|
"Maximal size of Neon local file cache",
|
||||||
NULL,
|
NULL,
|
||||||
@@ -315,10 +359,10 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
|||||||
BufferTag tag;
|
BufferTag tag;
|
||||||
FileCacheEntry* entry;
|
FileCacheEntry* entry;
|
||||||
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
|
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
|
||||||
bool found;
|
bool found = false;
|
||||||
uint32 hash;
|
uint32 hash;
|
||||||
|
|
||||||
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
|
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
CopyNRelFileInfoToBufTag(tag, rinfo);
|
CopyNRelFileInfoToBufTag(tag, rinfo);
|
||||||
@@ -327,8 +371,11 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
|||||||
hash = get_hash_value(lfc_hash, &tag);
|
hash = get_hash_value(lfc_hash, &tag);
|
||||||
|
|
||||||
LWLockAcquire(lfc_lock, LW_SHARED);
|
LWLockAcquire(lfc_lock, LW_SHARED);
|
||||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
if (LFC_ENABLED())
|
||||||
found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0;
|
{
|
||||||
|
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
||||||
|
found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0;
|
||||||
|
}
|
||||||
LWLockRelease(lfc_lock);
|
LWLockRelease(lfc_lock);
|
||||||
return found;
|
return found;
|
||||||
}
|
}
|
||||||
@@ -345,7 +392,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
|||||||
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
|
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
|
||||||
uint32 hash;
|
uint32 hash;
|
||||||
|
|
||||||
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
|
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
|
||||||
return;
|
return;
|
||||||
|
|
||||||
CopyNRelFileInfoToBufTag(tag, rinfo);
|
CopyNRelFileInfoToBufTag(tag, rinfo);
|
||||||
@@ -355,6 +402,13 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
|||||||
hash = get_hash_value(lfc_hash, &tag);
|
hash = get_hash_value(lfc_hash, &tag);
|
||||||
|
|
||||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
if (!LFC_ENABLED())
|
||||||
|
{
|
||||||
|
LWLockRelease(lfc_lock);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, &found);
|
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, &found);
|
||||||
|
|
||||||
if (!found)
|
if (!found)
|
||||||
@@ -405,7 +459,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
|||||||
/*
|
/*
|
||||||
* Try to read page from local cache.
|
* Try to read page from local cache.
|
||||||
* Returns true if page is found in local cache.
|
* Returns true if page is found in local cache.
|
||||||
* In case of error lfc_size_limit is set to zero to disable any further opera-tins with cache.
|
* In case of error local file cache is disabled (lfc->limit is set to zero).
|
||||||
*/
|
*/
|
||||||
bool
|
bool
|
||||||
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||||
@@ -420,7 +474,7 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
|||||||
uint64 generation;
|
uint64 generation;
|
||||||
uint32 entry_offset;
|
uint32 entry_offset;
|
||||||
|
|
||||||
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
|
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
if (!lfc_ensure_opened())
|
if (!lfc_ensure_opened())
|
||||||
@@ -432,10 +486,19 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
|||||||
hash = get_hash_value(lfc_hash, &tag);
|
hash = get_hash_value(lfc_hash, &tag);
|
||||||
|
|
||||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
if (!LFC_ENABLED())
|
||||||
|
{
|
||||||
|
LWLockRelease(lfc_lock);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
||||||
if (entry == NULL || (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) == 0)
|
if (entry == NULL || (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) == 0)
|
||||||
{
|
{
|
||||||
/* Page is not cached */
|
/* Page is not cached */
|
||||||
|
lfc_ctl->misses += 1;
|
||||||
|
pgBufferUsage.file_cache.misses += 1;
|
||||||
LWLockRelease(lfc_lock);
|
LWLockRelease(lfc_lock);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@@ -456,8 +519,12 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
|||||||
|
|
||||||
/* Place entry to the head of LRU list */
|
/* Place entry to the head of LRU list */
|
||||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
if (lfc_ctl->generation == generation)
|
if (lfc_ctl->generation == generation)
|
||||||
{
|
{
|
||||||
|
Assert(LFC_ENABLED());
|
||||||
|
lfc_ctl->hits += 1;
|
||||||
|
pgBufferUsage.file_cache.hits += 1;
|
||||||
Assert(entry->access_count > 0);
|
Assert(entry->access_count > 0);
|
||||||
if (--entry->access_count == 0)
|
if (--entry->access_count == 0)
|
||||||
dlist_push_tail(&lfc_ctl->lru, &entry->lru_node);
|
dlist_push_tail(&lfc_ctl->lru, &entry->lru_node);
|
||||||
@@ -489,7 +556,7 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
|||||||
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
|
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
|
||||||
uint32 hash;
|
uint32 hash;
|
||||||
|
|
||||||
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
|
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
|
||||||
return;
|
return;
|
||||||
|
|
||||||
if (!lfc_ensure_opened())
|
if (!lfc_ensure_opened())
|
||||||
@@ -497,12 +564,17 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
|||||||
|
|
||||||
tag.forkNum = forkNum;
|
tag.forkNum = forkNum;
|
||||||
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1);
|
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1);
|
||||||
|
|
||||||
CopyNRelFileInfoToBufTag(tag, rinfo);
|
CopyNRelFileInfoToBufTag(tag, rinfo);
|
||||||
|
|
||||||
hash = get_hash_value(lfc_hash, &tag);
|
hash = get_hash_value(lfc_hash, &tag);
|
||||||
|
|
||||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
if (!LFC_ENABLED())
|
||||||
|
{
|
||||||
|
LWLockRelease(lfc_lock);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
|
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
|
||||||
|
|
||||||
if (found)
|
if (found)
|
||||||
@@ -521,13 +593,13 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
|||||||
* there are should be very large number of concurrent IO operations and them are limited by max_connections,
|
* there are should be very large number of concurrent IO operations and them are limited by max_connections,
|
||||||
* we prefer not to complicate code and use second approach.
|
* we prefer not to complicate code and use second approach.
|
||||||
*/
|
*/
|
||||||
if (lfc_ctl->used >= SIZE_MB_TO_CHUNKS(lfc_size_limit) && !dlist_is_empty(&lfc_ctl->lru))
|
if (lfc_ctl->used >= lfc_ctl->limit && !dlist_is_empty(&lfc_ctl->lru))
|
||||||
{
|
{
|
||||||
/* Cache overflow: evict least recently used chunk */
|
/* Cache overflow: evict least recently used chunk */
|
||||||
FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru));
|
FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru));
|
||||||
Assert(victim->access_count == 0);
|
Assert(victim->access_count == 0);
|
||||||
entry->offset = victim->offset; /* grab victim's chunk */
|
entry->offset = victim->offset; /* grab victim's chunk */
|
||||||
hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL);
|
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
|
||||||
elog(DEBUG2, "Swap file cache page");
|
elog(DEBUG2, "Swap file cache page");
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@@ -536,6 +608,7 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
|||||||
entry->offset = lfc_ctl->size++; /* allocate new chunk at end of file */
|
entry->offset = lfc_ctl->size++; /* allocate new chunk at end of file */
|
||||||
}
|
}
|
||||||
entry->access_count = 1;
|
entry->access_count = 1;
|
||||||
|
entry->hash = hash;
|
||||||
memset(entry->bitmap, 0, sizeof entry->bitmap);
|
memset(entry->bitmap, 0, sizeof entry->bitmap);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -557,6 +630,104 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct
|
||||||
|
{
|
||||||
|
TupleDesc tupdesc;
|
||||||
|
} NeonGetStatsCtx;
|
||||||
|
|
||||||
|
#define NUM_NEON_GET_STATS_COLS 2
|
||||||
|
#define NUM_NEON_GET_STATS_ROWS 3
|
||||||
|
|
||||||
|
PG_FUNCTION_INFO_V1(neon_get_stats);
|
||||||
|
Datum
|
||||||
|
neon_get_stats(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
FuncCallContext *funcctx;
|
||||||
|
NeonGetStatsCtx* fctx;
|
||||||
|
MemoryContext oldcontext;
|
||||||
|
TupleDesc tupledesc;
|
||||||
|
Datum result;
|
||||||
|
HeapTuple tuple;
|
||||||
|
char const* key;
|
||||||
|
uint64 value;
|
||||||
|
Datum values[NUM_NEON_GET_STATS_COLS];
|
||||||
|
bool nulls[NUM_NEON_GET_STATS_COLS];
|
||||||
|
|
||||||
|
if (SRF_IS_FIRSTCALL())
|
||||||
|
{
|
||||||
|
funcctx = SRF_FIRSTCALL_INIT();
|
||||||
|
|
||||||
|
/* Switch context when allocating stuff to be used in later calls */
|
||||||
|
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
|
||||||
|
|
||||||
|
/* Create a user function context for cross-call persistence */
|
||||||
|
fctx = (NeonGetStatsCtx*) palloc(sizeof(NeonGetStatsCtx));
|
||||||
|
|
||||||
|
/* Construct a tuple descriptor for the result rows. */
|
||||||
|
tupledesc = CreateTemplateTupleDesc(NUM_NEON_GET_STATS_COLS);
|
||||||
|
|
||||||
|
TupleDescInitEntry(tupledesc, (AttrNumber) 1, "ns_key",
|
||||||
|
TEXTOID, -1, 0);
|
||||||
|
TupleDescInitEntry(tupledesc, (AttrNumber) 2, "ns_value",
|
||||||
|
TEXTOID, -1, 0);
|
||||||
|
|
||||||
|
fctx->tupdesc = BlessTupleDesc(tupledesc);
|
||||||
|
funcctx->max_calls = NUM_NEON_GET_STATS_ROWS;
|
||||||
|
funcctx->user_fctx = fctx;
|
||||||
|
|
||||||
|
/* Return to original context when allocating transient memory */
|
||||||
|
MemoryContextSwitchTo(oldcontext);
|
||||||
|
}
|
||||||
|
|
||||||
|
funcctx = SRF_PERCALL_SETUP();
|
||||||
|
|
||||||
|
/* Get the saved state */
|
||||||
|
fctx = (NeonGetStatsCtx*) funcctx->user_fctx;
|
||||||
|
|
||||||
|
switch (funcctx->call_cntr)
|
||||||
|
{
|
||||||
|
case 0:
|
||||||
|
key = "file_cache_misses";
|
||||||
|
if (lfc_ctl)
|
||||||
|
value = lfc_ctl->misses;
|
||||||
|
break;
|
||||||
|
case 1:
|
||||||
|
key = "file_cache_hits";
|
||||||
|
if (lfc_ctl)
|
||||||
|
value = lfc_ctl->hits;
|
||||||
|
break;
|
||||||
|
case 2:
|
||||||
|
key = "file_cache_used";
|
||||||
|
if (lfc_ctl)
|
||||||
|
value = lfc_ctl->used;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
SRF_RETURN_DONE(funcctx);
|
||||||
|
}
|
||||||
|
values[0] = PointerGetDatum(cstring_to_text(key));
|
||||||
|
nulls[0] = false;
|
||||||
|
if (lfc_ctl)
|
||||||
|
{
|
||||||
|
char buf[64];
|
||||||
|
snprintf(buf, sizeof buf, "%llu", (long long)value);
|
||||||
|
nulls[1] = false;
|
||||||
|
values[1] = PointerGetDatum(cstring_to_text(buf));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
nulls[1] = true;
|
||||||
|
|
||||||
|
tuple = heap_form_tuple(fctx->tupdesc, values, nulls);
|
||||||
|
result = HeapTupleGetDatum(tuple);
|
||||||
|
SRF_RETURN_NEXT(funcctx, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Function returning data from the local file cache
|
||||||
|
* relation node/tablespace/database/blocknum and access_counter
|
||||||
|
*/
|
||||||
|
PG_FUNCTION_INFO_V1(local_cache_pages);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Record structure holding the to be exposed cache data.
|
* Record structure holding the to be exposed cache data.
|
||||||
*/
|
*/
|
||||||
@@ -580,11 +751,6 @@ typedef struct
|
|||||||
LocalCachePagesRec *record;
|
LocalCachePagesRec *record;
|
||||||
} LocalCachePagesContext;
|
} LocalCachePagesContext;
|
||||||
|
|
||||||
/*
|
|
||||||
* Function returning data from the local file cache
|
|
||||||
* relation node/tablespace/database/blocknum and access_counter
|
|
||||||
*/
|
|
||||||
PG_FUNCTION_INFO_V1(local_cache_pages);
|
|
||||||
|
|
||||||
#define NUM_LOCALCACHE_PAGES_ELEM 7
|
#define NUM_LOCALCACHE_PAGES_ELEM 7
|
||||||
|
|
||||||
@@ -653,13 +819,15 @@ local_cache_pages(PG_FUNCTION_ARGS)
|
|||||||
|
|
||||||
LWLockAcquire(lfc_lock, LW_SHARED);
|
LWLockAcquire(lfc_lock, LW_SHARED);
|
||||||
|
|
||||||
hash_seq_init(&status, lfc_hash);
|
if (LFC_ENABLED())
|
||||||
while ((entry = hash_seq_search(&status)) != NULL)
|
|
||||||
{
|
{
|
||||||
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
|
hash_seq_init(&status, lfc_hash);
|
||||||
n_pages += (entry->bitmap[i >> 5] & (1 << (i & 31))) != 0;
|
while ((entry = hash_seq_search(&status)) != NULL)
|
||||||
|
{
|
||||||
|
for (int i = 0; i < BLOCKS_PER_CHUNK/32; i++)
|
||||||
|
n_pages += pg_popcount32(entry->bitmap[i]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
hash_seq_term(&status);
|
|
||||||
fctx->record = (LocalCachePagesRec *)
|
fctx->record = (LocalCachePagesRec *)
|
||||||
MemoryContextAllocHuge(CurrentMemoryContext,
|
MemoryContextAllocHuge(CurrentMemoryContext,
|
||||||
sizeof(LocalCachePagesRec) * n_pages);
|
sizeof(LocalCachePagesRec) * n_pages);
|
||||||
@@ -671,35 +839,33 @@ local_cache_pages(PG_FUNCTION_ARGS)
|
|||||||
/* Return to original context when allocating transient memory */
|
/* Return to original context when allocating transient memory */
|
||||||
MemoryContextSwitchTo(oldcontext);
|
MemoryContextSwitchTo(oldcontext);
|
||||||
|
|
||||||
/*
|
if (n_pages != 0)
|
||||||
* Scan through all the buffers, saving the relevant fields in the
|
|
||||||
* fctx->record structure.
|
|
||||||
*
|
|
||||||
* We don't hold the partition locks, so we don't get a consistent
|
|
||||||
* snapshot across all buffers, but we do grab the buffer header
|
|
||||||
* locks, so the information of each buffer is self-consistent.
|
|
||||||
*/
|
|
||||||
n_pages = 0;
|
|
||||||
hash_seq_init(&status, lfc_hash);
|
|
||||||
while ((entry = hash_seq_search(&status)) != NULL)
|
|
||||||
{
|
{
|
||||||
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
|
/*
|
||||||
|
* Scan through all the cache entries, saving the relevant fields in the
|
||||||
|
* fctx->record structure.
|
||||||
|
*/
|
||||||
|
uint32 n = 0;
|
||||||
|
hash_seq_init(&status, lfc_hash);
|
||||||
|
while ((entry = hash_seq_search(&status)) != NULL)
|
||||||
{
|
{
|
||||||
if (entry->bitmap[i >> 5] & (1 << (i & 31)))
|
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
|
||||||
{
|
{
|
||||||
fctx->record[n_pages].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i;
|
if (entry->bitmap[i >> 5] & (1 << (i & 31)))
|
||||||
fctx->record[n_pages].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
|
{
|
||||||
fctx->record[n_pages].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
|
fctx->record[n].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i;
|
||||||
fctx->record[n_pages].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
|
fctx->record[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
|
||||||
fctx->record[n_pages].forknum = entry->key.forkNum;
|
fctx->record[n].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
|
||||||
fctx->record[n_pages].blocknum = entry->key.blockNum + i;
|
fctx->record[n].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
|
||||||
fctx->record[n_pages].accesscount = entry->access_count;
|
fctx->record[n].forknum = entry->key.forkNum;
|
||||||
n_pages += 1;
|
fctx->record[n].blocknum = entry->key.blockNum + i;
|
||||||
|
fctx->record[n].accesscount = entry->access_count;
|
||||||
|
n += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Assert(n_pages == n);
|
||||||
}
|
}
|
||||||
hash_seq_term(&status);
|
|
||||||
Assert(n_pages == funcctx->max_calls);
|
|
||||||
LWLockRelease(lfc_lock);
|
LWLockRelease(lfc_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
10
pgxn/neon/neon--1.0--1.1.sql
Normal file
10
pgxn/neon/neon--1.0--1.1.sql
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
\echo Use "ALTER EXTENSION neon UPDATE TO '1.1'" to load this file. \quit
|
||||||
|
|
||||||
|
CREATE FUNCTION neon_get_stats()
|
||||||
|
RETURNS SETOF RECORD
|
||||||
|
AS 'MODULE_PATHNAME', 'neon_get_stats'
|
||||||
|
LANGUAGE C PARALLEL SAFE;
|
||||||
|
|
||||||
|
-- Create a view for convenient access.
|
||||||
|
CREATE VIEW neon_stats AS
|
||||||
|
SELECT P.* FROM neon_get_stats() AS P (ns_key text, ns_value text);
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
# neon extension
|
# neon extension
|
||||||
comment = 'cloud storage for PostgreSQL'
|
comment = 'cloud storage for PostgreSQL'
|
||||||
default_version = '1.0'
|
default_version = '1.1'
|
||||||
module_pathname = '$libdir/neon'
|
module_pathname = '$libdir/neon'
|
||||||
|
|||||||
74
test_runner/regress/test_local_file_cache.py
Normal file
74
test_runner/regress/test_local_file_cache.py
Normal file
@@ -0,0 +1,74 @@
|
|||||||
|
import os
|
||||||
|
import random
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
from fixtures.neon_fixtures import NeonEnv
|
||||||
|
from fixtures.utils import query_scalar
|
||||||
|
|
||||||
|
|
||||||
|
def test_local_file_cache_unlink(neon_simple_env: NeonEnv):
|
||||||
|
env = neon_simple_env
|
||||||
|
|
||||||
|
cache_dir = os.path.join(env.repo_dir, "file_cache")
|
||||||
|
os.mkdir(cache_dir)
|
||||||
|
|
||||||
|
env.neon_cli.create_branch("test_local_file_cache_unlink", "empty")
|
||||||
|
|
||||||
|
endpoint = env.endpoints.create_start(
|
||||||
|
"test_local_file_cache_unlink",
|
||||||
|
config_lines=[
|
||||||
|
"shared_buffers='1MB'",
|
||||||
|
f"neon.file_cache_path='{cache_dir}/file.cache'",
|
||||||
|
"neon.max_file_cache_size='64MB'",
|
||||||
|
"neon.file_cache_size_limit='10MB'",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
cur = endpoint.connect().cursor()
|
||||||
|
|
||||||
|
n_rows = 100000
|
||||||
|
n_threads = 20
|
||||||
|
n_updates_per_thread = 10000
|
||||||
|
n_updates_per_connection = 1000
|
||||||
|
n_total_updates = n_threads * n_updates_per_thread
|
||||||
|
|
||||||
|
cur.execute("CREATE TABLE lfctest (id int4 PRIMARY KEY, n int) WITH (fillfactor=10)")
|
||||||
|
cur.execute(f"INSERT INTO lfctest SELECT g, 1 FROM generate_series(1, {n_rows}) g")
|
||||||
|
|
||||||
|
# Start threads that will perform random UPDATEs. Each UPDATE
|
||||||
|
# increments the counter on the row, so that we can check at the
|
||||||
|
# end that the sum of all the counters match the number of updates
|
||||||
|
# performed (plus the initial 1 on each row).
|
||||||
|
#
|
||||||
|
# Furthermore, each thread will reconnect between every 1000 updates.
|
||||||
|
def run_updates():
|
||||||
|
n_updates_performed = 0
|
||||||
|
conn = endpoint.connect()
|
||||||
|
cur = conn.cursor()
|
||||||
|
for _ in range(n_updates_per_thread):
|
||||||
|
id = random.randint(1, n_rows)
|
||||||
|
cur.execute(f"UPDATE lfctest SET n = n + 1 WHERE id = {id}")
|
||||||
|
n_updates_performed += 1
|
||||||
|
if n_updates_performed % n_updates_per_connection == 0:
|
||||||
|
cur.close()
|
||||||
|
conn.close()
|
||||||
|
conn = endpoint.connect()
|
||||||
|
cur = conn.cursor()
|
||||||
|
|
||||||
|
threads: List[threading.Thread] = []
|
||||||
|
for _i in range(n_threads):
|
||||||
|
thread = threading.Thread(target=run_updates, args=(), daemon=True)
|
||||||
|
thread.start()
|
||||||
|
threads.append(thread)
|
||||||
|
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
|
new_cache_dir = os.path.join(env.repo_dir, "file_cache_new")
|
||||||
|
os.rename(cache_dir, new_cache_dir)
|
||||||
|
|
||||||
|
for thread in threads:
|
||||||
|
thread.join()
|
||||||
|
|
||||||
|
assert query_scalar(cur, "SELECT SUM(n) FROM lfctest") == n_total_updates + n_rows
|
||||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 5d5cfee127...80932ee1ae
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 74cfe3e681...327f4e5e7d
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 389ce36b4b...e6c1ece2ef
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -1,5 +1,5 @@
|
|||||||
{
|
{
|
||||||
"postgres-v16": "389ce36b4b3da7aa654a25e1b3f10b641319a87f",
|
"postgres-v16": "e6c1ece2efbf52277565936925ae43599ce51730",
|
||||||
"postgres-v15": "74cfe3e681836747a31fdbd47bdd14b3d81b0772",
|
"postgres-v15": "327f4e5e7db1050ce1b79e19f3d27ef1cff7acca",
|
||||||
"postgres-v14": "5d5cfee12783f0989a9c9fe13bb40b5585812568"
|
"postgres-v14": "80932ee1ae0b8e645390546ed104c357cddc7d0c"
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user