From f9dbed33a718773f52df7eda70018bdd06bad78a Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Fri, 21 Feb 2025 08:43:43 +0200 Subject: [PATCH] Replace not-thread safe LWlocks in LFC with pthread sync primitives --- pgxn/neon/file_cache.c | 153 ++++++++++++++++++++------------------- pgxn/neon/libpagestore.c | 2 - 2 files changed, 79 insertions(+), 76 deletions(-) diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 8d83cb6d94..12cb6851e3 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -16,6 +16,7 @@ #include "postgres.h" #include +#include #include #include @@ -151,14 +152,14 @@ typedef struct FileCacheControl * algorithm */ dlist_head holes; /* double linked list of punched holes */ HyperLogLogState wss_estimation; /* estimation of working set size */ - ConditionVariable cv[N_COND_VARS]; /* turnstile of condition variables */ + pthread_mutex_t mutex; + pthread_cond_t cv[N_COND_VARS]; /* turnstile of condition variables */ } FileCacheControl; bool lfc_store_prefetch_result; static HTAB *lfc_hash; static int lfc_desc = 0; -static LWLockId lfc_lock; static int lfc_max_size; static int lfc_size_limit; static char *lfc_path; @@ -170,6 +171,25 @@ static shmem_request_hook_type prev_shmem_request_hook; #define LFC_ENABLED() (lfc_ctl->limit != 0) + +static void +lfc_write_lock(void) +{ + pthread_mutex_lock(&lfc_ctl->mutex); +} + +static void +lfc_read_lock(void) +{ + pthread_mutex_lock(&lfc_ctl->mutex); +} + +static void +lfc_unlock(void) +{ + pthread_mutex_unlock(&lfc_ctl->mutex); +} + /* * Local file cache is optional and Neon can work without it. * In case of any any errors with this cache, we should disable it but to not throw error. @@ -184,7 +204,7 @@ lfc_disable(char const *op) elog(WARNING, "LFC: failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path); /* Invalidate hash */ - LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + lfc_write_lock(); if (LFC_ENABLED()) { @@ -220,7 +240,7 @@ lfc_disable(char const *op) } /* Wakeup waiting backends */ for (int i = 0; i < N_COND_VARS; i++) - ConditionVariableBroadcast(&lfc_ctl->cv[i]); + pthread_cond_broadcast(&lfc_ctl->cv[i]); } /* @@ -235,7 +255,7 @@ lfc_disable(char const *op) else close(fd); - LWLockRelease(lfc_lock); + lfc_unlock(); if (lfc_desc > 0) close(lfc_desc); @@ -244,7 +264,7 @@ lfc_disable(char const *op) } /* - * This check is done without obtaining lfc_lock, so it is unreliable + * This check is done without obtaining lock, so it is unreliable */ static bool lfc_maybe_disabled(void) @@ -293,8 +313,15 @@ lfc_shmem_startup(void) { int fd; uint32 n_chunks = SIZE_MB_TO_CHUNKS(lfc_max_size); + pthread_condattr_t attrcond; + pthread_mutexattr_t attrmutex; + + pthread_condattr_init(&attrcond); + pthread_condattr_setpshared(&attrcond, PTHREAD_PROCESS_SHARED); + + pthread_mutexattr_init(&attrmutex); + pthread_mutexattr_setpshared(&attrmutex, PTHREAD_PROCESS_SHARED); - lfc_lock = (LWLockId) GetNamedLWLockTranche("lfc_lock"); info.keysize = sizeof(BufferTag); info.entrysize = sizeof(FileCacheEntry); @@ -310,6 +337,8 @@ lfc_shmem_startup(void) dlist_init(&lfc_ctl->lru); dlist_init(&lfc_ctl->holes); + pthread_mutex_init(&lfc_ctl->mutex, &attrmutex); + /* Initialize hyper-log-log structure for estimating working set size */ initSHLL(&lfc_ctl->wss_estimation); @@ -328,8 +357,9 @@ lfc_shmem_startup(void) /* Initialize turnstile of condition variables */ for (int i = 0; i < N_COND_VARS; i++) - ConditionVariableInit(&lfc_ctl->cv[i]); - + { + pthread_cond_init(&lfc_ctl->cv[i], &attrcond); + } } LWLockRelease(AddinShmemInitLock); } @@ -343,7 +373,6 @@ lfc_shmem_request(void) #endif RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, sizeof(FileCacheEntry))); - RequestNamedLWLockTranche("lfc_lock", 1); } static bool @@ -382,7 +411,7 @@ lfc_change_limit_hook(int newval, void *extra) if ((newval > 0 || LFC_ENABLED()) && !lfc_ensure_opened()) return; - LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + lfc_write_lock(); if (lfc_ctl->limit != new_size) { @@ -434,7 +463,7 @@ lfc_change_limit_hook(int newval, void *extra) } neon_log(DEBUG1, "set local file cache limit to %d", new_size); - LWLockRelease(lfc_lock); + lfc_unlock(); } void @@ -532,13 +561,13 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); hash = get_hash_value(lfc_hash, &tag); - LWLockAcquire(lfc_lock, LW_SHARED); + lfc_read_lock(); if (LFC_ENABLED()) { entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL); found = entry != NULL && GET_STATE(entry, chunk_offs) != UNAVAILABLE; } - LWLockRelease(lfc_lock); + lfc_unlock(); return found; } @@ -569,11 +598,11 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, hash = get_hash_value(lfc_hash, &tag); chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1); - LWLockAcquire(lfc_lock, LW_SHARED); + lfc_read_lock(); if (!LFC_ENABLED()) { - LWLockRelease(lfc_lock); + lfc_unlock(); return 0; } while (true) @@ -612,7 +641,7 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, chunk_offs = (blkno + i) & (BLOCKS_PER_CHUNK - 1); } - LWLockRelease(lfc_lock); + lfc_unlock(); #ifdef USE_ASSERT_CHECKING { @@ -685,7 +714,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, int iteration_hits = 0; int iteration_misses = 0; uint64 io_time_us = 0; - ConditionVariable* cv; + pthread_cond_t* cv; Assert(blocks_in_chunk > 0); @@ -699,13 +728,13 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, hash = get_hash_value(lfc_hash, &tag); cv = &lfc_ctl->cv[hash % N_COND_VARS]; - LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + lfc_write_lock(); /* We can return the blocks we've read before LFC got disabled; * assuming we read any. */ if (!LFC_ENABLED()) { - LWLockRelease(lfc_lock); + lfc_unlock(); return blocks_read; } @@ -723,7 +752,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, /* Pages are not cached */ lfc_ctl->misses += blocks_in_chunk; pgBufferUsage.file_cache.misses += blocks_in_chunk; - LWLockRelease(lfc_lock); + lfc_unlock(); buf_offset += blocks_in_chunk; nblocks -= blocks_in_chunk; @@ -742,7 +771,6 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, for (int i = 0; i < blocks_in_chunk; i++) { FileCacheBlockState state = UNAVAILABLE; - bool sleeping = false; while (lfc_ctl->generation == generation) { state = GET_STATE(entry, chunk_offs + i); @@ -751,18 +779,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, } else if (state != REQUESTED) { break; } - if (!sleeping) - { - ConditionVariablePrepareToSleep(cv); - sleeping = true; - } - LWLockRelease(lfc_lock); - ConditionVariableTimedSleep(cv, CV_WAIT_TIMEOUT, WAIT_EVENT_NEON_LFC_CV_WAIT); - LWLockAcquire(lfc_lock, LW_EXCLUSIVE); - } - if (sleeping) - { - ConditionVariableCancelSleep(); + pthread_cond_wait(cv, &lfc_ctl->mutex); } if (state == AVAILABLE) { @@ -772,7 +789,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, else iteration_misses++; } - LWLockRelease(lfc_lock); + lfc_unlock(); Assert(iteration_hits + iteration_misses > 0); @@ -791,7 +808,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, } /* Place entry to the head of LRU list */ - LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + lfc_write_lock(); if (lfc_ctl->generation == generation) { @@ -814,11 +831,11 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, else { /* generation mismatch, assume error condition */ - LWLockRelease(lfc_lock); + lfc_unlock(); return -1; } - LWLockRelease(lfc_lock); + lfc_unlock(); buf_offset += blocks_in_chunk; nblocks -= blocks_in_chunk; @@ -951,7 +968,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, uint64 generation; uint32 entry_offset; instr_time io_start, io_end; - ConditionVariable* cv; + pthread_cond_t* cv; FileCacheBlockState state; XLogRecPtr lwlsn; @@ -972,11 +989,11 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, hash = get_hash_value(lfc_hash, &tag); cv = &lfc_ctl->cv[hash % N_COND_VARS]; - LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + lfc_write_lock(); if (!LFC_ENABLED()) { - LWLockRelease(lfc_lock); + lfc_unlock(); return false; } lwlsn = GetLastWrittenLSN(rinfo, forknum, blkno); @@ -984,7 +1001,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, { elog(DEBUG1, "Skip LFC write for %d because LwLSN=%X/%X is greater than not_nodified_since LSN %X/%X", blkno, LSN_FORMAT_ARGS(lwlsn), LSN_FORMAT_ARGS(lsn)); - LWLockRelease(lfc_lock); + lfc_unlock(); return false; } @@ -995,7 +1012,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, state = GET_STATE(entry, chunk_offs); if (state != UNAVAILABLE) { /* Do not rewrite existed LFC entry */ - LWLockRelease(lfc_lock); + lfc_unlock(); return false; } /* @@ -1013,7 +1030,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, * We can't process this chunk due to lack of space in LFC, * so skip to the next one */ - LWLockRelease(lfc_lock); + lfc_unlock(); return false; } } @@ -1023,7 +1040,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, SET_STATE(entry, chunk_offs, PENDING); - LWLockRelease(lfc_lock); + lfc_unlock(); pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE); INSTR_TIME_SET_CURRENT(io_start); @@ -1038,7 +1055,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, } else { - LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + lfc_write_lock(); if (lfc_ctl->generation == generation) { @@ -1058,7 +1075,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, state = GET_STATE(entry, chunk_offs); if (state == REQUESTED) { - ConditionVariableBroadcast(cv); + pthread_cond_broadcast(cv); } if (state != AVAILABLE) { @@ -1066,7 +1083,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, SET_STATE(entry, chunk_offs, AVAILABLE); } } - LWLockRelease(lfc_lock); + lfc_unlock(); } return true; } @@ -1099,11 +1116,11 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); - LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + lfc_write_lock(); if (!LFC_ENABLED()) { - LWLockRelease(lfc_lock); + lfc_unlock(); return; } @@ -1122,7 +1139,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1); int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK)); instr_time io_start, io_end; - ConditionVariable* cv; + pthread_cond_t* cv; Assert(blocks_in_chunk > 0); @@ -1168,7 +1185,6 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, for (int i = 0; i < blocks_in_chunk; i++) { FileCacheBlockState state = UNAVAILABLE; - bool sleeping = false; while (lfc_ctl->generation == generation) { state = GET_STATE(entry, chunk_offs + i); @@ -1178,21 +1194,10 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, SET_STATE(entry, chunk_offs + i, PENDING); break; } - if (!sleeping) - { - ConditionVariablePrepareToSleep(cv); - sleeping = true; - } - LWLockRelease(lfc_lock); - ConditionVariableTimedSleep(cv, CV_WAIT_TIMEOUT, WAIT_EVENT_NEON_LFC_CV_WAIT); - LWLockAcquire(lfc_lock, LW_EXCLUSIVE); - } - if (sleeping) - { - ConditionVariableCancelSleep(); + pthread_cond_wait(cv, &lfc_ctl->mutex); } } - LWLockRelease(lfc_lock); + lfc_unlock(); pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE); INSTR_TIME_SET_CURRENT(io_start); @@ -1208,7 +1213,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, } else { - LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + lfc_write_lock(); if (lfc_ctl->generation == generation) { @@ -1231,7 +1236,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, FileCacheBlockState state = GET_STATE(entry, chunk_offs + i); if (state == REQUESTED) { - ConditionVariableBroadcast(cv); + pthread_cond_broadcast(cv); } if (state != AVAILABLE) { @@ -1246,7 +1251,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, buf_offset += blocks_in_chunk; nblocks -= blocks_in_chunk; } - LWLockRelease(lfc_lock); + lfc_unlock(); } typedef struct @@ -1459,7 +1464,7 @@ local_cache_pages(PG_FUNCTION_ARGS) if (lfc_ctl) { - LWLockAcquire(lfc_lock, LW_SHARED); + lfc_read_lock(); if (LFC_ENABLED()) { @@ -1511,7 +1516,7 @@ local_cache_pages(PG_FUNCTION_ARGS) Assert(n_pages == n); } if (lfc_ctl) - LWLockRelease(lfc_lock); + lfc_unlock(); } funcctx = SRF_PERCALL_SETUP(); @@ -1554,9 +1559,9 @@ approximate_working_set_size_seconds(PG_FUNCTION_ARGS) { int32 dc; time_t duration = PG_ARGISNULL(0) ? (time_t)-1 : PG_GETARG_INT32(0); - LWLockAcquire(lfc_lock, LW_SHARED); + lfc_read_lock(); dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, duration); - LWLockRelease(lfc_lock); + lfc_unlock(); PG_RETURN_INT32(dc); } PG_RETURN_NULL(); @@ -1571,11 +1576,11 @@ approximate_working_set_size(PG_FUNCTION_ARGS) { int32 dc; bool reset = PG_GETARG_BOOL(0); - LWLockAcquire(lfc_lock, reset ? LW_EXCLUSIVE : LW_SHARED); + lfc_write_lock(); dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, (time_t)-1); if (reset) memset(lfc_ctl->wss_estimation.regs, 0, sizeof lfc_ctl->wss_estimation.regs); - LWLockRelease(lfc_lock); + lfc_unlock(); PG_RETURN_INT32(dc); } PG_RETURN_NULL(); diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 198881a9b3..cd5b3e6b29 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -1119,9 +1119,7 @@ communicator_read_loop(void* arg) { /* result of prefetch */ - pthread_mutex_lock(&mutex); /* FIXME: lfc_prefetch is using LWLock which is not thread-safe (use static variables) */ (void) lfc_prefetch(page_resp->req.rinfo, page_resp->req.forknum, page_resp->req.blkno, page_resp->page, resp->not_modified_since); - pthread_mutex_unlock(&mutex); notify_backend = false; } else