mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
refer #5347
refer #5351 Make changing state of local file cache more consistent
This commit is contained in:
@@ -83,12 +83,14 @@ typedef struct FileCacheControl
|
||||
uint64 generation; /* generation is needed to handle correct hash reenabling */
|
||||
uint32 size; /* size of cache file in chunks */
|
||||
uint32 used; /* number of used chunks */
|
||||
bool enabled; /* local file cache state */
|
||||
dlist_head lru; /* double linked list for LRU replacement algorithm */
|
||||
} FileCacheControl;
|
||||
|
||||
static HTAB* lfc_hash;
|
||||
static int lfc_desc = 0;
|
||||
static LWLockId lfc_lock;
|
||||
static bool lfc_enabled;
|
||||
static int lfc_max_size;
|
||||
static int lfc_size_limit;
|
||||
static int lfc_free_space_watermark;
|
||||
@@ -111,40 +113,51 @@ void FileCacheMonitorMain(Datum main_arg);
|
||||
static void
|
||||
lfc_disable(char const* op)
|
||||
{
|
||||
HASH_SEQ_STATUS status;
|
||||
FileCacheEntry* entry;
|
||||
|
||||
elog(WARNING, "Failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
|
||||
if (op)
|
||||
elog(WARNING, "Failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
|
||||
else
|
||||
elog(LOG, "Disable local file cache");
|
||||
|
||||
if (lfc_desc > 0)
|
||||
close(lfc_desc);
|
||||
|
||||
lfc_desc = -1;
|
||||
lfc_size_limit = 0;
|
||||
|
||||
/* Invalidate hash */
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
hash_seq_init(&status, lfc_hash);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
if (lfc_ctl->enabled)
|
||||
{
|
||||
hash_search(lfc_hash, &entry->key, HASH_REMOVE, NULL);
|
||||
memset(entry->bitmap, 0, sizeof entry->bitmap);
|
||||
}
|
||||
hash_seq_term(&status);
|
||||
lfc_ctl->generation += 1;
|
||||
lfc_ctl->size = 0;
|
||||
lfc_ctl->used = 0;
|
||||
dlist_init(&lfc_ctl->lru);
|
||||
HASH_SEQ_STATUS status;
|
||||
FileCacheEntry* entry;
|
||||
|
||||
hash_seq_init(&status, lfc_hash);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
hash_search(lfc_hash, &entry->key, HASH_REMOVE, NULL);
|
||||
memset(entry->bitmap, 0, sizeof entry->bitmap);
|
||||
}
|
||||
lfc_ctl->generation += 1;
|
||||
lfc_ctl->size = 0;
|
||||
lfc_ctl->used = 0;
|
||||
lfc_ctl->enabled = false;
|
||||
dlist_init(&lfc_ctl->lru);
|
||||
}
|
||||
LWLockRelease(lfc_lock);
|
||||
}
|
||||
|
||||
static bool
|
||||
lfc_is_enabled(void)
|
||||
{
|
||||
return lfc_ctl && lfc_ctl->enabled;
|
||||
}
|
||||
|
||||
static bool
|
||||
lfc_ensure_opened(void)
|
||||
{
|
||||
bool enabled = lfc_is_enabled();
|
||||
/* Open cache file if not done yet */
|
||||
if (lfc_desc <= 0)
|
||||
if (lfc_desc <= 0 && enabled)
|
||||
{
|
||||
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
|
||||
|
||||
@@ -153,7 +166,7 @@ lfc_ensure_opened(void)
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
return enabled;
|
||||
}
|
||||
|
||||
static void
|
||||
@@ -184,6 +197,7 @@ lfc_shmem_startup(void)
|
||||
lfc_ctl->generation = 0;
|
||||
lfc_ctl->size = 0;
|
||||
lfc_ctl->used = 0;
|
||||
lfc_ctl->enabled = lfc_enabled;
|
||||
dlist_init(&lfc_ctl->lru);
|
||||
|
||||
/* Remove file cache on restart */
|
||||
@@ -204,6 +218,39 @@ lfc_shmem_request(void)
|
||||
RequestNamedLWLockTranche("lfc_lock", 1);
|
||||
}
|
||||
|
||||
static bool
|
||||
is_normal_backend(void)
|
||||
{
|
||||
/*
|
||||
* Stats collector detach shared memory, so we should not try to access shared memory here.
|
||||
* Parallel workers first assign default value (0), so not perform truncation in parallel workers.
|
||||
* The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC.
|
||||
*/
|
||||
return lfc_ctl && MyProc && UsedShmemSegAddr && !IsParallelWorker();
|
||||
}
|
||||
|
||||
static void
|
||||
lfc_change_state(bool newval, void *extra)
|
||||
{
|
||||
if (!is_normal_backend())
|
||||
return;
|
||||
|
||||
if (!newval)
|
||||
{
|
||||
if (lfc_ctl)
|
||||
lfc_disable(NULL);
|
||||
else
|
||||
elog(LOG, "Local file cache disabled");
|
||||
}
|
||||
else
|
||||
{
|
||||
elog(LOG, "Local file cache enabled");
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
lfc_ctl->enabled = newval;
|
||||
LWLockRelease(lfc_lock);
|
||||
}
|
||||
}
|
||||
|
||||
static bool
|
||||
lfc_check_limit_hook(int *newval, void **extra, GucSource source)
|
||||
{
|
||||
@@ -219,25 +266,20 @@ static void
|
||||
lfc_change_limit_hook(int newval, void *extra)
|
||||
{
|
||||
uint32 new_size = SIZE_MB_TO_CHUNKS(newval);
|
||||
/*
|
||||
* Stats collector detach shared memory, so we should not try to access shared memory here.
|
||||
* Parallel workers first assign default value (0), so not perform truncation in parallel workers.
|
||||
* The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC.
|
||||
*/
|
||||
if (!lfc_ctl || !MyProc || !UsedShmemSegAddr || IsParallelWorker())
|
||||
|
||||
if (!is_normal_backend())
|
||||
return;
|
||||
|
||||
if (!lfc_ensure_opened())
|
||||
return;
|
||||
|
||||
/* Open cache file if not done yet */
|
||||
if (lfc_desc <= 0)
|
||||
{
|
||||
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
|
||||
if (lfc_desc < 0) {
|
||||
elog(WARNING, "Failed to open file cache %s: %m, disabling file cache", lfc_path);
|
||||
lfc_size_limit = 0; /* disable file cache */
|
||||
return;
|
||||
}
|
||||
}
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
if (!lfc_ctl->enabled)
|
||||
{
|
||||
LWLockRelease(lfc_lock);
|
||||
return;
|
||||
}
|
||||
while (new_size < lfc_ctl->used && !dlist_is_empty(&lfc_ctl->lru))
|
||||
{
|
||||
/* Shrink cache by throwing away least recently accessed chunks and returning their space to file system */
|
||||
@@ -251,6 +293,7 @@ lfc_change_limit_hook(int newval, void *extra)
|
||||
lfc_ctl->used -= 1;
|
||||
}
|
||||
elog(DEBUG1, "set local file cache limit to %d", new_size);
|
||||
|
||||
LWLockRelease(lfc_lock);
|
||||
}
|
||||
|
||||
@@ -286,7 +329,7 @@ FileCacheMonitorMain(Datum main_arg)
|
||||
/* Periodically dump buffers until terminated. */
|
||||
while (!ShutdownRequestPending)
|
||||
{
|
||||
if (lfc_size_limit != 0)
|
||||
if (lfc_is_enabled())
|
||||
{
|
||||
struct statvfs sfs;
|
||||
if (statvfs(lfc_path, &sfs) < 0)
|
||||
@@ -338,6 +381,17 @@ lfc_init(void)
|
||||
if (!process_shared_preload_libraries_in_progress)
|
||||
elog(ERROR, "Neon module should be loaded via shared_preload_libraries");
|
||||
|
||||
DefineCustomIntVariable("neon.local_file_cache",
|
||||
"Enable ort disable local file cache",
|
||||
NULL,
|
||||
&lfc_enabled,
|
||||
true, /* enabled by default */
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL,
|
||||
lfc_change_state,
|
||||
NULL);
|
||||
|
||||
DefineCustomIntVariable("neon.max_file_cache_size",
|
||||
"Maximal size of Neon local file cache",
|
||||
NULL,
|
||||
@@ -414,10 +468,10 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
||||
BufferTag tag;
|
||||
FileCacheEntry* entry;
|
||||
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
|
||||
bool found;
|
||||
bool found = false;
|
||||
uint32 hash;
|
||||
|
||||
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
|
||||
if (!lfc_is_enabled()) /* fast exit if file cache is disabled */
|
||||
return false;
|
||||
|
||||
CopyNRelFileInfoToBufTag(tag, rinfo);
|
||||
@@ -426,8 +480,11 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
||||
hash = get_hash_value(lfc_hash, &tag);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_SHARED);
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
||||
found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0;
|
||||
if (lfc_ctl->enabled)
|
||||
{
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
||||
found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0;
|
||||
}
|
||||
LWLockRelease(lfc_lock);
|
||||
return found;
|
||||
}
|
||||
@@ -444,7 +501,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
||||
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
|
||||
uint32 hash;
|
||||
|
||||
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
|
||||
if (!lfc_is_enabled()) /* fast exit if file cache is disabled */
|
||||
return;
|
||||
|
||||
CopyNRelFileInfoToBufTag(tag, rinfo);
|
||||
@@ -454,6 +511,13 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
||||
hash = get_hash_value(lfc_hash, &tag);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
if (!lfc_ctl->enabled)
|
||||
{
|
||||
LWLockRelease(lfc_lock);
|
||||
return;
|
||||
}
|
||||
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, &found);
|
||||
|
||||
if (!found)
|
||||
@@ -519,7 +583,7 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
uint64 generation;
|
||||
uint32 entry_offset;
|
||||
|
||||
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
|
||||
if (!lfc_is_enabled()) /* fast exit if file cache is disabled */
|
||||
return false;
|
||||
|
||||
if (!lfc_ensure_opened())
|
||||
@@ -531,6 +595,13 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
hash = get_hash_value(lfc_hash, &tag);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
if (!lfc_ctl->enabled)
|
||||
{
|
||||
LWLockRelease(lfc_lock);
|
||||
return false;
|
||||
}
|
||||
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
||||
if (entry == NULL || (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) == 0)
|
||||
{
|
||||
@@ -555,8 +626,10 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
|
||||
/* Place entry to the head of LRU list */
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
if (lfc_ctl->generation == generation)
|
||||
{
|
||||
Assert(lfc_ctl->enabled);
|
||||
Assert(entry->access_count > 0);
|
||||
if (--entry->access_count == 0)
|
||||
dlist_push_tail(&lfc_ctl->lru, &entry->lru_node);
|
||||
@@ -591,17 +664,22 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
|
||||
return;
|
||||
|
||||
if (!lfc_ensure_opened())
|
||||
if (!lfc_is_enabled()) /* fast exit if file cache is disabled */
|
||||
return;
|
||||
|
||||
tag.forkNum = forkNum;
|
||||
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1);
|
||||
|
||||
CopyNRelFileInfoToBufTag(tag, rinfo);
|
||||
|
||||
hash = get_hash_value(lfc_hash, &tag);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
if (!lfc_ctl->enabled)
|
||||
{
|
||||
LWLockRelease(lfc_lock);
|
||||
return;
|
||||
}
|
||||
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
|
||||
|
||||
if (found)
|
||||
@@ -752,13 +830,15 @@ local_cache_pages(PG_FUNCTION_ARGS)
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_SHARED);
|
||||
|
||||
hash_seq_init(&status, lfc_hash);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
if (lfc_ctl->enabled)
|
||||
{
|
||||
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
|
||||
n_pages += (entry->bitmap[i >> 5] & (1 << (i & 31))) != 0;
|
||||
hash_seq_init(&status, lfc_hash);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
|
||||
n_pages += (entry->bitmap[i >> 5] & (1 << (i & 31))) != 0;
|
||||
}
|
||||
}
|
||||
hash_seq_term(&status);
|
||||
fctx->record = (LocalCachePagesRec *)
|
||||
MemoryContextAllocHuge(CurrentMemoryContext,
|
||||
sizeof(LocalCachePagesRec) * n_pages);
|
||||
@@ -770,35 +850,37 @@ local_cache_pages(PG_FUNCTION_ARGS)
|
||||
/* Return to original context when allocating transient memory */
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
|
||||
/*
|
||||
* Scan through all the buffers, saving the relevant fields in the
|
||||
* fctx->record structure.
|
||||
*
|
||||
* We don't hold the partition locks, so we don't get a consistent
|
||||
* snapshot across all buffers, but we do grab the buffer header
|
||||
* locks, so the information of each buffer is self-consistent.
|
||||
*/
|
||||
n_pages = 0;
|
||||
hash_seq_init(&status, lfc_hash);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
if (n_pages != 0)
|
||||
{
|
||||
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
|
||||
/*
|
||||
* Scan through all the buffers, saving the relevant fields in the
|
||||
* fctx->record structure.
|
||||
*
|
||||
* We don't hold the partition locks, so we don't get a consistent
|
||||
* snapshot across all buffers, but we do grab the buffer header
|
||||
* locks, so the information of each buffer is self-consistent.
|
||||
*/
|
||||
n_pages = 0;
|
||||
hash_seq_init(&status, lfc_hash);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
if (entry->bitmap[i >> 5] & (1 << (i & 31)))
|
||||
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
|
||||
{
|
||||
fctx->record[n_pages].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i;
|
||||
fctx->record[n_pages].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
|
||||
fctx->record[n_pages].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
|
||||
fctx->record[n_pages].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
|
||||
fctx->record[n_pages].forknum = entry->key.forkNum;
|
||||
fctx->record[n_pages].blocknum = entry->key.blockNum + i;
|
||||
fctx->record[n_pages].accesscount = entry->access_count;
|
||||
n_pages += 1;
|
||||
if (entry->bitmap[i >> 5] & (1 << (i & 31)))
|
||||
{
|
||||
fctx->record[n_pages].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i;
|
||||
fctx->record[n_pages].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
|
||||
fctx->record[n_pages].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
|
||||
fctx->record[n_pages].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
|
||||
fctx->record[n_pages].forknum = entry->key.forkNum;
|
||||
fctx->record[n_pages].blocknum = entry->key.blockNum + i;
|
||||
fctx->record[n_pages].accesscount = entry->access_count;
|
||||
n_pages += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
Assert(n_pages == funcctx->max_calls);
|
||||
}
|
||||
hash_seq_term(&status);
|
||||
Assert(n_pages == funcctx->max_calls);
|
||||
LWLockRelease(lfc_lock);
|
||||
}
|
||||
|
||||
|
||||
73
test_runner/regress/test_local_file_cache.py
Normal file
73
test_runner/regress/test_local_file_cache.py
Normal file
@@ -0,0 +1,73 @@
|
||||
import os
|
||||
import pathlib
|
||||
import random
|
||||
import signal
|
||||
import threading
|
||||
import time
|
||||
|
||||
from psycopg2.extensions import connection as PgConnection
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
def test_local_file_cache_unlink(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_local_file_cache_unlink", "empty")
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_local_file_cache_unlink",
|
||||
config_lines=[
|
||||
"shared_buffers='1MB'",
|
||||
"neon.max_file_cache_size='64MB'",
|
||||
"neon.file_cache_size_limit='10MB'",
|
||||
],
|
||||
)
|
||||
|
||||
cur = endpoint.connect().cursor()
|
||||
|
||||
n_rows = 100000
|
||||
n_threads = 20
|
||||
n_updates_per_thread = 10000
|
||||
n_updates_per_connection = 1000
|
||||
n_total_updates = n_threads * n_updates_per_thread
|
||||
|
||||
cur.execute("CREATE TABLE lfctest (id int4 PRIMARY KEY, n int) WITH (fillfactor=10)")
|
||||
cur.execute(f"INSERT INTO lfctest SELECT g, 1 FROM generate_series(1, {n_rows}) g");
|
||||
|
||||
# Start threads that will perform random UPDATEs. Each UPDATE
|
||||
# increments the counter on the row, so that we can check at the
|
||||
# end that the sum of all the counters match the number of updates
|
||||
# performed (plus the initial 1 on each row).
|
||||
#
|
||||
# Furthermore, each thread will reconnect between every 1000 updates.
|
||||
def run_updates():
|
||||
n_updates_performed = 0
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
for _ in range(n_updates_per_thread):
|
||||
id = random.randint(1, n_rows)
|
||||
cur.execute(f"UPDATE lfctest SET n = n + 1 WHERE id = {id}");
|
||||
n_updates_performed += 1
|
||||
if n_updates_performed % n_updates_per_connection == 0:
|
||||
cur.close()
|
||||
conn.close()
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
|
||||
threads: List[threading.Thread] = []
|
||||
for i in range(n_threads):
|
||||
thread = threading.Thread(target=run_updates, args=(), daemon=True)
|
||||
thread.start()
|
||||
threads.append(thread)
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
os.unlink(os.path.join(endpoint.pg_data_dir_path(), "file.cache"))
|
||||
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
assert query_scalar(cur, "SELECT SUM(n) FROM lfctest") == n_total_updates + n_rows
|
||||
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 026d6b093d...b4ec091547
Reference in New Issue
Block a user