Replace not-thread safe LWlocks in LFC with pthread sync primitives

This commit is contained in:
Konstantin Knizhnik
2025-02-21 08:43:43 +02:00
parent d677e85a14
commit f9dbed33a7
2 changed files with 79 additions and 76 deletions

View File

@@ -16,6 +16,7 @@
#include "postgres.h"
#include <sys/file.h>
#include <pthread.h>
#include <unistd.h>
#include <fcntl.h>
@@ -151,14 +152,14 @@ typedef struct FileCacheControl
* algorithm */
dlist_head holes; /* double linked list of punched holes */
HyperLogLogState wss_estimation; /* estimation of working set size */
ConditionVariable cv[N_COND_VARS]; /* turnstile of condition variables */
pthread_mutex_t mutex;
pthread_cond_t cv[N_COND_VARS]; /* turnstile of condition variables */
} FileCacheControl;
bool lfc_store_prefetch_result;
static HTAB *lfc_hash;
static int lfc_desc = 0;
static LWLockId lfc_lock;
static int lfc_max_size;
static int lfc_size_limit;
static char *lfc_path;
@@ -170,6 +171,25 @@ static shmem_request_hook_type prev_shmem_request_hook;
#define LFC_ENABLED() (lfc_ctl->limit != 0)
static void
lfc_write_lock(void)
{
pthread_mutex_lock(&lfc_ctl->mutex);
}
static void
lfc_read_lock(void)
{
pthread_mutex_lock(&lfc_ctl->mutex);
}
static void
lfc_unlock(void)
{
pthread_mutex_unlock(&lfc_ctl->mutex);
}
/*
* Local file cache is optional and Neon can work without it.
* In case of any any errors with this cache, we should disable it but to not throw error.
@@ -184,7 +204,7 @@ lfc_disable(char const *op)
elog(WARNING, "LFC: failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
/* Invalidate hash */
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
lfc_write_lock();
if (LFC_ENABLED())
{
@@ -220,7 +240,7 @@ lfc_disable(char const *op)
}
/* Wakeup waiting backends */
for (int i = 0; i < N_COND_VARS; i++)
ConditionVariableBroadcast(&lfc_ctl->cv[i]);
pthread_cond_broadcast(&lfc_ctl->cv[i]);
}
/*
@@ -235,7 +255,7 @@ lfc_disable(char const *op)
else
close(fd);
LWLockRelease(lfc_lock);
lfc_unlock();
if (lfc_desc > 0)
close(lfc_desc);
@@ -244,7 +264,7 @@ lfc_disable(char const *op)
}
/*
* This check is done without obtaining lfc_lock, so it is unreliable
* This check is done without obtaining lock, so it is unreliable
*/
static bool
lfc_maybe_disabled(void)
@@ -293,8 +313,15 @@ lfc_shmem_startup(void)
{
int fd;
uint32 n_chunks = SIZE_MB_TO_CHUNKS(lfc_max_size);
pthread_condattr_t attrcond;
pthread_mutexattr_t attrmutex;
pthread_condattr_init(&attrcond);
pthread_condattr_setpshared(&attrcond, PTHREAD_PROCESS_SHARED);
pthread_mutexattr_init(&attrmutex);
pthread_mutexattr_setpshared(&attrmutex, PTHREAD_PROCESS_SHARED);
lfc_lock = (LWLockId) GetNamedLWLockTranche("lfc_lock");
info.keysize = sizeof(BufferTag);
info.entrysize = sizeof(FileCacheEntry);
@@ -310,6 +337,8 @@ lfc_shmem_startup(void)
dlist_init(&lfc_ctl->lru);
dlist_init(&lfc_ctl->holes);
pthread_mutex_init(&lfc_ctl->mutex, &attrmutex);
/* Initialize hyper-log-log structure for estimating working set size */
initSHLL(&lfc_ctl->wss_estimation);
@@ -328,8 +357,9 @@ lfc_shmem_startup(void)
/* Initialize turnstile of condition variables */
for (int i = 0; i < N_COND_VARS; i++)
ConditionVariableInit(&lfc_ctl->cv[i]);
{
pthread_cond_init(&lfc_ctl->cv[i], &attrcond);
}
}
LWLockRelease(AddinShmemInitLock);
}
@@ -343,7 +373,6 @@ lfc_shmem_request(void)
#endif
RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, sizeof(FileCacheEntry)));
RequestNamedLWLockTranche("lfc_lock", 1);
}
static bool
@@ -382,7 +411,7 @@ lfc_change_limit_hook(int newval, void *extra)
if ((newval > 0 || LFC_ENABLED()) && !lfc_ensure_opened())
return;
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
lfc_write_lock();
if (lfc_ctl->limit != new_size)
{
@@ -434,7 +463,7 @@ lfc_change_limit_hook(int newval, void *extra)
}
neon_log(DEBUG1, "set local file cache limit to %d", new_size);
LWLockRelease(lfc_lock);
lfc_unlock();
}
void
@@ -532,13 +561,13 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_SHARED);
lfc_read_lock();
if (LFC_ENABLED())
{
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
found = entry != NULL && GET_STATE(entry, chunk_offs) != UNAVAILABLE;
}
LWLockRelease(lfc_lock);
lfc_unlock();
return found;
}
@@ -569,11 +598,11 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
hash = get_hash_value(lfc_hash, &tag);
chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
LWLockAcquire(lfc_lock, LW_SHARED);
lfc_read_lock();
if (!LFC_ENABLED())
{
LWLockRelease(lfc_lock);
lfc_unlock();
return 0;
}
while (true)
@@ -612,7 +641,7 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
chunk_offs = (blkno + i) & (BLOCKS_PER_CHUNK - 1);
}
LWLockRelease(lfc_lock);
lfc_unlock();
#ifdef USE_ASSERT_CHECKING
{
@@ -685,7 +714,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
int iteration_hits = 0;
int iteration_misses = 0;
uint64 io_time_us = 0;
ConditionVariable* cv;
pthread_cond_t* cv;
Assert(blocks_in_chunk > 0);
@@ -699,13 +728,13 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
hash = get_hash_value(lfc_hash, &tag);
cv = &lfc_ctl->cv[hash % N_COND_VARS];
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
lfc_write_lock();
/* We can return the blocks we've read before LFC got disabled;
* assuming we read any. */
if (!LFC_ENABLED())
{
LWLockRelease(lfc_lock);
lfc_unlock();
return blocks_read;
}
@@ -723,7 +752,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
/* Pages are not cached */
lfc_ctl->misses += blocks_in_chunk;
pgBufferUsage.file_cache.misses += blocks_in_chunk;
LWLockRelease(lfc_lock);
lfc_unlock();
buf_offset += blocks_in_chunk;
nblocks -= blocks_in_chunk;
@@ -742,7 +771,6 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
for (int i = 0; i < blocks_in_chunk; i++)
{
FileCacheBlockState state = UNAVAILABLE;
bool sleeping = false;
while (lfc_ctl->generation == generation)
{
state = GET_STATE(entry, chunk_offs + i);
@@ -751,18 +779,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
} else if (state != REQUESTED) {
break;
}
if (!sleeping)
{
ConditionVariablePrepareToSleep(cv);
sleeping = true;
}
LWLockRelease(lfc_lock);
ConditionVariableTimedSleep(cv, CV_WAIT_TIMEOUT, WAIT_EVENT_NEON_LFC_CV_WAIT);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
}
if (sleeping)
{
ConditionVariableCancelSleep();
pthread_cond_wait(cv, &lfc_ctl->mutex);
}
if (state == AVAILABLE)
{
@@ -772,7 +789,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
else
iteration_misses++;
}
LWLockRelease(lfc_lock);
lfc_unlock();
Assert(iteration_hits + iteration_misses > 0);
@@ -791,7 +808,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
/* Place entry to the head of LRU list */
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
lfc_write_lock();
if (lfc_ctl->generation == generation)
{
@@ -814,11 +831,11 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
else
{
/* generation mismatch, assume error condition */
LWLockRelease(lfc_lock);
lfc_unlock();
return -1;
}
LWLockRelease(lfc_lock);
lfc_unlock();
buf_offset += blocks_in_chunk;
nblocks -= blocks_in_chunk;
@@ -951,7 +968,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
uint64 generation;
uint32 entry_offset;
instr_time io_start, io_end;
ConditionVariable* cv;
pthread_cond_t* cv;
FileCacheBlockState state;
XLogRecPtr lwlsn;
@@ -972,11 +989,11 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
hash = get_hash_value(lfc_hash, &tag);
cv = &lfc_ctl->cv[hash % N_COND_VARS];
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
lfc_write_lock();
if (!LFC_ENABLED())
{
LWLockRelease(lfc_lock);
lfc_unlock();
return false;
}
lwlsn = GetLastWrittenLSN(rinfo, forknum, blkno);
@@ -984,7 +1001,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
{
elog(DEBUG1, "Skip LFC write for %d because LwLSN=%X/%X is greater than not_nodified_since LSN %X/%X",
blkno, LSN_FORMAT_ARGS(lwlsn), LSN_FORMAT_ARGS(lsn));
LWLockRelease(lfc_lock);
lfc_unlock();
return false;
}
@@ -995,7 +1012,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
state = GET_STATE(entry, chunk_offs);
if (state != UNAVAILABLE) {
/* Do not rewrite existed LFC entry */
LWLockRelease(lfc_lock);
lfc_unlock();
return false;
}
/*
@@ -1013,7 +1030,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
* We can't process this chunk due to lack of space in LFC,
* so skip to the next one
*/
LWLockRelease(lfc_lock);
lfc_unlock();
return false;
}
}
@@ -1023,7 +1040,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
SET_STATE(entry, chunk_offs, PENDING);
LWLockRelease(lfc_lock);
lfc_unlock();
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
INSTR_TIME_SET_CURRENT(io_start);
@@ -1038,7 +1055,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
}
else
{
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
lfc_write_lock();
if (lfc_ctl->generation == generation)
{
@@ -1058,7 +1075,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
state = GET_STATE(entry, chunk_offs);
if (state == REQUESTED) {
ConditionVariableBroadcast(cv);
pthread_cond_broadcast(cv);
}
if (state != AVAILABLE)
{
@@ -1066,7 +1083,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
SET_STATE(entry, chunk_offs, AVAILABLE);
}
}
LWLockRelease(lfc_lock);
lfc_unlock();
}
return true;
}
@@ -1099,11 +1116,11 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
lfc_write_lock();
if (!LFC_ENABLED())
{
LWLockRelease(lfc_lock);
lfc_unlock();
return;
}
@@ -1122,7 +1139,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK));
instr_time io_start, io_end;
ConditionVariable* cv;
pthread_cond_t* cv;
Assert(blocks_in_chunk > 0);
@@ -1168,7 +1185,6 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
for (int i = 0; i < blocks_in_chunk; i++)
{
FileCacheBlockState state = UNAVAILABLE;
bool sleeping = false;
while (lfc_ctl->generation == generation)
{
state = GET_STATE(entry, chunk_offs + i);
@@ -1178,21 +1194,10 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
SET_STATE(entry, chunk_offs + i, PENDING);
break;
}
if (!sleeping)
{
ConditionVariablePrepareToSleep(cv);
sleeping = true;
}
LWLockRelease(lfc_lock);
ConditionVariableTimedSleep(cv, CV_WAIT_TIMEOUT, WAIT_EVENT_NEON_LFC_CV_WAIT);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
}
if (sleeping)
{
ConditionVariableCancelSleep();
pthread_cond_wait(cv, &lfc_ctl->mutex);
}
}
LWLockRelease(lfc_lock);
lfc_unlock();
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
INSTR_TIME_SET_CURRENT(io_start);
@@ -1208,7 +1213,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
else
{
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
lfc_write_lock();
if (lfc_ctl->generation == generation)
{
@@ -1231,7 +1236,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
FileCacheBlockState state = GET_STATE(entry, chunk_offs + i);
if (state == REQUESTED)
{
ConditionVariableBroadcast(cv);
pthread_cond_broadcast(cv);
}
if (state != AVAILABLE)
{
@@ -1246,7 +1251,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
buf_offset += blocks_in_chunk;
nblocks -= blocks_in_chunk;
}
LWLockRelease(lfc_lock);
lfc_unlock();
}
typedef struct
@@ -1459,7 +1464,7 @@ local_cache_pages(PG_FUNCTION_ARGS)
if (lfc_ctl)
{
LWLockAcquire(lfc_lock, LW_SHARED);
lfc_read_lock();
if (LFC_ENABLED())
{
@@ -1511,7 +1516,7 @@ local_cache_pages(PG_FUNCTION_ARGS)
Assert(n_pages == n);
}
if (lfc_ctl)
LWLockRelease(lfc_lock);
lfc_unlock();
}
funcctx = SRF_PERCALL_SETUP();
@@ -1554,9 +1559,9 @@ approximate_working_set_size_seconds(PG_FUNCTION_ARGS)
{
int32 dc;
time_t duration = PG_ARGISNULL(0) ? (time_t)-1 : PG_GETARG_INT32(0);
LWLockAcquire(lfc_lock, LW_SHARED);
lfc_read_lock();
dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, duration);
LWLockRelease(lfc_lock);
lfc_unlock();
PG_RETURN_INT32(dc);
}
PG_RETURN_NULL();
@@ -1571,11 +1576,11 @@ approximate_working_set_size(PG_FUNCTION_ARGS)
{
int32 dc;
bool reset = PG_GETARG_BOOL(0);
LWLockAcquire(lfc_lock, reset ? LW_EXCLUSIVE : LW_SHARED);
lfc_write_lock();
dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, (time_t)-1);
if (reset)
memset(lfc_ctl->wss_estimation.regs, 0, sizeof lfc_ctl->wss_estimation.regs);
LWLockRelease(lfc_lock);
lfc_unlock();
PG_RETURN_INT32(dc);
}
PG_RETURN_NULL();

View File

@@ -1119,9 +1119,7 @@ communicator_read_loop(void* arg)
{
/* result of prefetch */
pthread_mutex_lock(&mutex); /* FIXME: lfc_prefetch is using LWLock which is not thread-safe (use static variables) */
(void) lfc_prefetch(page_resp->req.rinfo, page_resp->req.forknum, page_resp->req.blkno, page_resp->page, resp->not_modified_since);
pthread_mutex_unlock(&mutex);
notify_backend = false;
}
else