diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index a61dc9f4c6..f6a577abfc 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -22,6 +22,7 @@ #include "neon_pgversioncompat.h" #include "access/parallel.h" +#include "access/xlog.h" #include "funcapi.h" #include "miscadmin.h" #include "pagestore_client.h" @@ -40,12 +41,16 @@ #include "utils/dynahash.h" #include "utils/guc.h" +#if PG_VERSION_NUM >= 150000 +#include "access/xlogrecovery.h" +#endif + #include "hll.h" #include "bitmap.h" #include "neon.h" #include "neon_perf_counters.h" -#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "Assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0) +#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "LFC: assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0) /* * Local file cache is used to temporary store relations pages in local file system. @@ -93,7 +98,23 @@ #define MB ((uint64)1024*1024) #define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK)) -#define CHUNK_BITMAP_SIZE ((BLOCKS_PER_CHUNK + 31) / 32) + +/* + * Blocks are read or written to LFC file outside LFC critical section. + * To synchronize access to such block, writer set state of such block to PENDING. + * If some other backend (read or writer) see PENDING status, it change it to REQUESTED and start + * waiting until status is changed on conditional variable. + * When writer completes is operation, it checks if status is REQUESTED and if so, broadcast conditional variable, + * waking up all backend waiting for access to this block. + */ +typedef enum FileCacheBlockState +{ + UNAVAILABLE, /* block is not present in cache */ + AVAILABLE, /* block can be used */ + PENDING, /* block is loaded */ + REQUESTED /* some other backend is waiting for block to be loaded */ +} FileCacheBlockState; + typedef struct FileCacheEntry { @@ -101,10 +122,16 @@ typedef struct FileCacheEntry uint32 hash; uint32 offset; uint32 access_count; - uint32 bitmap[CHUNK_BITMAP_SIZE]; + uint32 state[(BLOCKS_PER_CHUNK + 31) / 32 * 2]; /* two bits per block */ dlist_node list_node; /* LRU/holes list node */ } FileCacheEntry; +#define GET_STATE(entry, i) (((entry)->state[(i) / 16] >> ((i) % 16 * 2)) & 3) +#define SET_STATE(entry, i, new_state) (entry)->state[(i) / 16] = ((entry)->state[(i) / 16] & ~(3 << ((i) % 16 * 2))) | ((new_state) << ((i) % 16 * 2)) + +#define N_COND_VARS 64 +#define CV_WAIT_TIMEOUT 10 + typedef struct FileCacheControl { uint64 generation; /* generation is needed to handle correct hash @@ -118,18 +145,24 @@ typedef struct FileCacheControl uint64 writes; /* number of writes issued */ uint64 time_read; /* time spent reading (us) */ uint64 time_write; /* time spent writing (us) */ + uint64 resizes; /* number of LFC resizes */ + uint64 evicted_pages; /* number of evicted pages */ dlist_head lru; /* double linked list for LRU replacement * algorithm */ dlist_head holes; /* double linked list of punched holes */ HyperLogLogState wss_estimation; /* estimation of working set size */ + ConditionVariable cv[N_COND_VARS]; /* turnstile of condition variables */ } FileCacheControl; +bool lfc_store_prefetch_result; + static HTAB *lfc_hash; -static int lfc_desc = 0; +static int lfc_desc = -1; static LWLockId lfc_lock; static int lfc_max_size; static int lfc_size_limit; static char *lfc_path; +static uint64 lfc_generation; static FileCacheControl *lfc_ctl; static shmem_startup_hook_type prev_shmem_startup_hook; #if PG_VERSION_NUM>=150000 @@ -138,6 +171,20 @@ static shmem_request_hook_type prev_shmem_request_hook; #define LFC_ENABLED() (lfc_ctl->limit != 0) +/* + * Close LFC file if opened. + * All backends should close their LFC files once LFC is disabled. + */ +static void +lfc_close_file(void) +{ + if (lfc_desc >= 0) + { + close(lfc_desc); + lfc_desc = -1; + } +} + /* * Local file cache is optional and Neon can work without it. * In case of any any errors with this cache, we should disable it but to not throw error. @@ -145,20 +192,16 @@ static shmem_request_hook_type prev_shmem_request_hook; * All cache content should be invalidated to avoid reading of stale or corrupted data */ static void -lfc_disable(char const *op) +lfc_switch_off(void) { int fd; - elog(WARNING, "Failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path); - - /* Invalidate hash */ - LWLockAcquire(lfc_lock, LW_EXCLUSIVE); - if (LFC_ENABLED()) { HASH_SEQ_STATUS status; FileCacheEntry *entry; + /* Invalidate hash */ hash_seq_init(&status, lfc_hash); while ((entry = hash_seq_search(&status)) != NULL) { @@ -171,41 +214,33 @@ lfc_disable(char const *op) dlist_init(&lfc_ctl->lru); dlist_init(&lfc_ctl->holes); - if (lfc_desc > 0) - { - int rc; + /* + * We need to use unlink to to avoid races in LFC write, because it is not + * protected by lock + */ + unlink(lfc_path); - /* - * If the reason of error is ENOSPC, then truncation of file may - * help to reclaim some space - */ - pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_TRUNCATE); - rc = ftruncate(lfc_desc, 0); - pgstat_report_wait_end(); + fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC); + if (fd < 0) + elog(WARNING, "LFC: failed to recreate local file cache %s: %m", lfc_path); + else + close(fd); - if (rc < 0) - elog(WARNING, "Failed to truncate local file cache %s: %m", lfc_path); - } + /* Wakeup waiting backends */ + for (int i = 0; i < N_COND_VARS; i++) + ConditionVariableBroadcast(&lfc_ctl->cv[i]); } + lfc_close_file(); +} - /* - * We need to use unlink to to avoid races in LFC write, because it is not - * protectedby - */ - unlink(lfc_path); - - fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC); - if (fd < 0) - elog(WARNING, "Failed to recreate local file cache %s: %m", lfc_path); - else - close(fd); +static void +lfc_disable(char const *op) +{ + elog(WARNING, "LFC: failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path); + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + lfc_switch_off(); LWLockRelease(lfc_lock); - - if (lfc_desc > 0) - close(lfc_desc); - - lfc_desc = -1; } /* @@ -217,13 +252,20 @@ lfc_maybe_disabled(void) return !lfc_ctl || !LFC_ENABLED(); } +/* + * Open LFC file if not opened yet or generation is changed. + * Should be called under LFC lock. + */ static bool lfc_ensure_opened(void) { - bool enabled = !lfc_maybe_disabled(); - + if (lfc_generation != lfc_ctl->generation) + { + lfc_close_file(); + lfc_generation = lfc_ctl->generation; + } /* Open cache file if not done yet */ - if (lfc_desc <= 0 && enabled) + if (lfc_desc < 0) { lfc_desc = BasicOpenFile(lfc_path, O_RDWR); @@ -233,7 +275,7 @@ lfc_ensure_opened(void) return false; } } - return enabled; + return true; } static void @@ -267,14 +309,7 @@ lfc_shmem_startup(void) n_chunks + 1, n_chunks + 1, &info, HASH_ELEM | HASH_BLOBS); - lfc_ctl->generation = 0; - lfc_ctl->size = 0; - lfc_ctl->used = 0; - lfc_ctl->hits = 0; - lfc_ctl->misses = 0; - lfc_ctl->writes = 0; - lfc_ctl->time_read = 0; - lfc_ctl->time_write = 0; + memset(lfc_ctl, 0, sizeof(FileCacheControl)); dlist_init(&lfc_ctl->lru); dlist_init(&lfc_ctl->holes); @@ -285,7 +320,7 @@ lfc_shmem_startup(void) fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC); if (fd < 0) { - elog(WARNING, "Failed to create local file cache %s: %m", lfc_path); + elog(WARNING, "LFC: failed to create local file cache %s: %m", lfc_path); lfc_ctl->limit = 0; } else @@ -293,6 +328,11 @@ lfc_shmem_startup(void) close(fd); lfc_ctl->limit = SIZE_MB_TO_CHUNKS(lfc_size_limit); } + + /* Initialize turnstile of condition variables */ + for (int i = 0; i < N_COND_VARS; i++) + ConditionVariableInit(&lfc_ctl->cv[i]); + } LWLockRelease(AddinShmemInitLock); } @@ -327,7 +367,7 @@ lfc_check_limit_hook(int *newval, void **extra, GucSource source) { if (*newval > lfc_max_size) { - elog(ERROR, "neon.file_cache_size_limit can not be larger than neon.max_file_cache_size"); + elog(ERROR, "LFC: neon.file_cache_size_limit can not be larger than neon.max_file_cache_size"); return false; } return true; @@ -338,14 +378,31 @@ lfc_change_limit_hook(int newval, void *extra) { uint32 new_size = SIZE_MB_TO_CHUNKS(newval); - if (!is_normal_backend()) - return; - - if (!lfc_ensure_opened()) + if (!lfc_ctl || !is_normal_backend()) return; LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + /* Open LFC file only if LFC was enabled or we are going to reenable it */ + if (newval == 0 && !LFC_ENABLED()) + { + LWLockRelease(lfc_lock); + /* File should be reopened if LFC is reenabled */ + lfc_close_file(); + return; + } + + if (!lfc_ensure_opened()) + { + LWLockRelease(lfc_lock); + return; + } + + if (lfc_ctl->limit != new_size) + { + lfc_ctl->resizes += 1; + } + while (new_size < lfc_ctl->used && !dlist_is_empty(&lfc_ctl->lru)) { /* @@ -367,7 +424,9 @@ lfc_change_limit_hook(int newval, void *extra) /* We remove the old entry, and re-enter a hole to the hash table */ for (int i = 0; i < BLOCKS_PER_CHUNK; i++) { - lfc_ctl->used_pages -= (victim->bitmap[i >> 5] >> (i & 31)) & 1; + bool is_page_cached = GET_STATE(victim, i) == AVAILABLE; + lfc_ctl->used_pages -= is_page_cached; + lfc_ctl->evicted_pages += is_page_cached; } hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL); @@ -383,10 +442,11 @@ lfc_change_limit_hook(int newval, void *extra) lfc_ctl->used -= 1; } - lfc_ctl->limit = new_size; - if (new_size == 0) { - lfc_ctl->generation += 1; - } + if (new_size == 0) + lfc_switch_off(); + else + lfc_ctl->limit = new_size; + neon_log(DEBUG1, "set local file cache limit to %d", new_size); LWLockRelease(lfc_lock); @@ -403,6 +463,17 @@ lfc_init(void) neon_log(ERROR, "Neon module should be loaded via shared_preload_libraries"); + DefineCustomBoolVariable("neon.store_prefetch_result_in_lfc", + "Immediately store received prefetch result in LFC", + NULL, + &lfc_store_prefetch_result, + false, + PGC_SUSET, + 0, + NULL, + NULL, + NULL); + DefineCustomIntVariable("neon.max_file_cache_size", "Maximal size of Neon local file cache", NULL, @@ -480,7 +551,7 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) if (LFC_ENABLED()) { entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL); - found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & ((uint32)1 << (chunk_offs & 31))) != 0; + found = entry != NULL && GET_STATE(entry, chunk_offs) != UNAVAILABLE; } LWLockRelease(lfc_lock); return found; @@ -529,8 +600,7 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, { for (; chunk_offs < BLOCKS_PER_CHUNK && i < nblocks; chunk_offs++, i++) { - if ((entry->bitmap[chunk_offs >> 5] & - ((uint32)1 << (chunk_offs & 31))) != 0) + if (GET_STATE(entry, chunk_offs) != UNAVAILABLE) { BITMAP_SET(bitmap, i); found++; @@ -541,7 +611,6 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, { i += this_chunk; } - /* * Break out of the iteration before doing expensive stuff for * a next iteration @@ -577,87 +646,6 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, return found; } -/* - * Evict a page (if present) from the local file cache - */ -void -lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) -{ - BufferTag tag; - FileCacheEntry *entry; - bool found; - int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1); - uint32 hash; - - if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ - return; - - CopyNRelFileInfoToBufTag(tag, rinfo); - tag.forkNum = forkNum; - tag.blockNum = (blkno & ~(BLOCKS_PER_CHUNK - 1)); - - CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); - hash = get_hash_value(lfc_hash, &tag); - - LWLockAcquire(lfc_lock, LW_EXCLUSIVE); - - if (!LFC_ENABLED()) - { - LWLockRelease(lfc_lock); - return; - } - - entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, &found); - - if (!found) - { - /* nothing to do */ - LWLockRelease(lfc_lock); - return; - } - - /* remove the page from the cache */ - entry->bitmap[chunk_offs >> 5] &= ~((uint32)1 << (chunk_offs & (32 - 1))); - - if (entry->access_count == 0) - { - /* - * If the chunk has no live entries, we can position the chunk to be - * recycled first. - */ - if (entry->bitmap[chunk_offs >> 5] == 0) - { - bool has_remaining_pages = false; - - for (int i = 0; i < CHUNK_BITMAP_SIZE; i++) - { - if (entry->bitmap[i] != 0) - { - has_remaining_pages = true; - break; - } - } - - /* - * Put the entry at the position that is first to be reclaimed when we - * have no cached pages remaining in the chunk - */ - if (!has_remaining_pages) - { - dlist_delete(&entry->list_node); - dlist_push_head(&lfc_ctl->lru, &entry->list_node); - } - } - } - - /* - * Done: apart from empty chunks, we don't move chunks in the LRU when - * they're empty because eviction isn't usage. - */ - - LWLockRelease(lfc_lock); -} - /* * Try to read pages from local cache. * Returns the number of pages read from the local cache, and sets bits in @@ -685,17 +673,14 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, int buf_offset = 0; if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ - return 0; - - if (!lfc_ensure_opened()) - return 0; + return -1; CopyNRelFileInfoToBufTag(tag, rinfo); tag.forkNum = forkNum; CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); - /* + /* * For every chunk that has blocks we're interested in, we * 1. get the chunk header * 2. Check if the chunk actually has the blocks we're interested in @@ -712,22 +697,35 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, int iteration_hits = 0; int iteration_misses = 0; uint64 io_time_us = 0; + int n_blocks_to_read = 0; + ConditionVariable* cv; + Assert(blocks_in_chunk > 0); for (int i = 0; i < blocks_in_chunk; i++) { + n_blocks_to_read += (BITMAP_ISSET(mask, buf_offset + i) != 0); iov[i].iov_base = buffers[buf_offset + i]; iov[i].iov_len = BLCKSZ; + BITMAP_CLR(mask, buf_offset + i); + } + if (n_blocks_to_read == 0) + { + buf_offset += blocks_in_chunk; + nblocks -= blocks_in_chunk; + blkno += blocks_in_chunk; + continue; } tag.blockNum = blkno - chunk_offs; hash = get_hash_value(lfc_hash, &tag); + cv = &lfc_ctl->cv[hash % N_COND_VARS]; LWLockAcquire(lfc_lock, LW_EXCLUSIVE); /* We can return the blocks we've read before LFC got disabled; * assuming we read any. */ - if (!LFC_ENABLED()) + if (!LFC_ENABLED() || !lfc_ensure_opened()) { LWLockRelease(lfc_lock); return blocks_read; @@ -763,15 +761,32 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, generation = lfc_ctl->generation; entry_offset = entry->offset; - LWLockRelease(lfc_lock); - for (int i = 0; i < blocks_in_chunk; i++) { - /* - * If the page is valid, we consider it "read". - * All other pages will be fetched separately by the next cache - */ - if (entry->bitmap[(chunk_offs + i) / 32] & ((uint32)1 << ((chunk_offs + i) % 32))) + FileCacheBlockState state = UNAVAILABLE; + bool sleeping = false; + while (lfc_ctl->generation == generation) + { + state = GET_STATE(entry, chunk_offs + i); + if (state == PENDING) { + SET_STATE(entry, chunk_offs + i, REQUESTED); + } else if (state != REQUESTED) { + break; + } + if (!sleeping) + { + ConditionVariablePrepareToSleep(cv); + sleeping = true; + } + LWLockRelease(lfc_lock); + ConditionVariableTimedSleep(cv, CV_WAIT_TIMEOUT, WAIT_EVENT_NEON_LFC_CV_WAIT); + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + } + if (sleeping) + { + ConditionVariableCancelSleep(); + } + if (state == AVAILABLE) { BITMAP_SET(mask, buf_offset + i); iteration_hits++; @@ -779,6 +794,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, else iteration_misses++; } + LWLockRelease(lfc_lock); Assert(iteration_hits + iteration_misses > 0); @@ -820,6 +836,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, else { /* generation mismatch, assume error condition */ + lfc_close_file(); LWLockRelease(lfc_lock); return -1; } @@ -835,6 +852,249 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, return blocks_read; } +/* + * Initialize new LFC hash entry, perform eviction if needed. + * Returns false if there are no unpinned entries and chunk can not be added. + */ +static bool +lfc_init_new_entry(FileCacheEntry* entry, uint32 hash) +{ + /*----------- + * If the chunk wasn't already in the LFC then we have these + * options, in order of preference: + * + * Unless there is no space available, we can: + * 1. Use an entry from the `holes` list, and + * 2. Create a new entry. + * We can always, regardless of space in the LFC: + * 3. evict an entry from LRU, and + * 4. ignore the write operation (the least favorite option) + */ + if (lfc_ctl->used < lfc_ctl->limit) + { + if (!dlist_is_empty(&lfc_ctl->holes)) + { + /* We can reuse a hole that was left behind when the LFC was shrunk previously */ + FileCacheEntry *hole = dlist_container(FileCacheEntry, list_node, + dlist_pop_head_node(&lfc_ctl->holes)); + uint32 offset = hole->offset; + bool hole_found; + + hash_search_with_hash_value(lfc_hash, &hole->key, + hole->hash, HASH_REMOVE, &hole_found); + CriticalAssert(hole_found); + + lfc_ctl->used += 1; + entry->offset = offset; /* reuse the hole */ + } + else + { + lfc_ctl->used += 1; + entry->offset = lfc_ctl->size++;/* allocate new chunk at end + * of file */ + } + } + /* + * We've already used up all allocated LFC entries. + * + * If we can clear an entry from the LRU, do that. + * If we can't (e.g. because all other slots are being accessed) + * then we will remove this entry from the hash and continue + * on to the next chunk, as we may not exceed the limit. + */ + else if (!dlist_is_empty(&lfc_ctl->lru)) + { + /* Cache overflow: evict least recently used chunk */ + FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node, + dlist_pop_head_node(&lfc_ctl->lru)); + + for (int i = 0; i < BLOCKS_PER_CHUNK; i++) + { + bool is_page_cached = GET_STATE(victim, i) == AVAILABLE; + lfc_ctl->used_pages -= is_page_cached; + lfc_ctl->evicted_pages += is_page_cached; + } + + CriticalAssert(victim->access_count == 0); + entry->offset = victim->offset; /* grab victim's chunk */ + hash_search_with_hash_value(lfc_hash, &victim->key, + victim->hash, HASH_REMOVE, NULL); + neon_log(DEBUG2, "Swap file cache page"); + } + else + { + /* Can't add this chunk - we don't have the space for it */ + hash_search_with_hash_value(lfc_hash, &entry->key, hash, + HASH_REMOVE, NULL); + + return false; + } + + entry->access_count = 1; + entry->hash = hash; + + for (int i = 0; i < BLOCKS_PER_CHUNK; i++) + SET_STATE(entry, i, UNAVAILABLE); + + return true; +} + +/* + * Store received prefetch result in LFC cache. + * Unlike lfc_read/lfc_write this call is is not protected by shared buffer lock. + * So we should be ready that other backends will try to concurrently read or write this block. + * We do not store prefetched block if it already exists in LFC or it's not_modified_since LSN is smaller + * than current last written LSN (LwLSN). + * + * We can enforce correctness of storing page in LFC by the following steps: + * 1. Check under LFC lock that page in not present in LFC. + * 2. Check under LFC lock that LwLSN is not changed since prefetch request time (not_modified_since). + * 3. Change page state to "Pending" under LFC lock to prevent all other backends to read or write this + * pages until this write is completed. + * 4. Assume that some other backend creates new image of the page without reading it + * (because reads will be blocked because of 2). This version of the page is stored in shared buffer. + * Any attempt to throw away this page from shared buffer will be blocked, because Postgres first + * needs to save dirty page and write will be blocked because of 2. + * So any backend trying to access this page, will take it from shared buffer without accessing + * SMGR and LFC. + * 5. After write completion we once again obtain LFC lock and wake-up all waiting backends. + * If there is some backend waiting to write new image of the page (4) then now it will be able to + * do it,overwriting old (prefetched) page image. As far as this write will be completed before + * shared buffer can be reassigned, not other backend can see old page image. +*/ +bool +lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, + const void* buffer, XLogRecPtr lsn) +{ + BufferTag tag; + FileCacheEntry *entry; + ssize_t rc; + bool found; + uint32 hash; + uint64 generation; + uint32 entry_offset; + instr_time io_start, io_end; + ConditionVariable* cv; + FileCacheBlockState state; + XLogRecPtr lwlsn; + + int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1); + + if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ + return false; + + CopyNRelFileInfoToBufTag(tag, rinfo); + tag.forkNum = forknum; + + CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); + + tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1); + hash = get_hash_value(lfc_hash, &tag); + cv = &lfc_ctl->cv[hash % N_COND_VARS]; + + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + + if (!LFC_ENABLED() || !lfc_ensure_opened()) + { + LWLockRelease(lfc_lock); + return false; + } + lwlsn = GetLastWrittenLSN(rinfo, forknum, blkno); + if (lwlsn > lsn) + { + elog(DEBUG1, "Skip LFC write for %d because LwLSN=%X/%X is greater than not_nodified_since LSN %X/%X", + blkno, LSN_FORMAT_ARGS(lwlsn), LSN_FORMAT_ARGS(lsn)); + LWLockRelease(lfc_lock); + return false; + } + + entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found); + + if (found) + { + state = GET_STATE(entry, chunk_offs); + if (state != UNAVAILABLE) { + /* Do not rewrite existed LFC entry */ + LWLockRelease(lfc_lock); + return false; + } + /* + * Unlink entry from LRU list to pin it for the duration of IO + * operation + */ + if (entry->access_count++ == 0) + dlist_delete(&entry->list_node); + } + else + { + if (!lfc_init_new_entry(entry, hash)) + { + /* + * We can't process this chunk due to lack of space in LFC, + * so skip to the next one + */ + LWLockRelease(lfc_lock); + return false; + } + } + + generation = lfc_ctl->generation; + entry_offset = entry->offset; + + SET_STATE(entry, chunk_offs, PENDING); + + LWLockRelease(lfc_lock); + + pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE); + INSTR_TIME_SET_CURRENT(io_start); + rc = pwrite(lfc_desc, buffer, BLCKSZ, + ((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ); + INSTR_TIME_SET_CURRENT(io_end); + pgstat_report_wait_end(); + + if (rc != BLCKSZ) + { + lfc_disable("write"); + } + else + { + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + + if (lfc_ctl->generation == generation) + { + uint64 time_spent_us; + CriticalAssert(LFC_ENABLED()); + /* Place entry to the head of LRU list */ + CriticalAssert(entry->access_count > 0); + + lfc_ctl->writes += 1; + INSTR_TIME_SUBTRACT(io_start, io_end); + time_spent_us = INSTR_TIME_GET_MICROSEC(io_start); + lfc_ctl->time_write += time_spent_us; + inc_page_cache_write_wait(time_spent_us); + + if (--entry->access_count == 0) + dlist_push_tail(&lfc_ctl->lru, &entry->list_node); + + state = GET_STATE(entry, chunk_offs); + if (state == REQUESTED) { + ConditionVariableBroadcast(cv); + } + if (state != AVAILABLE) + { + lfc_ctl->used_pages += 1; + SET_STATE(entry, chunk_offs, AVAILABLE); + } + } + else + { + lfc_close_file(); + } + LWLockRelease(lfc_lock); + } + return true; +} + /* * Put page in local file cache. * If cache is full then evict some other page. @@ -855,15 +1115,21 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return; - if (!lfc_ensure_opened()) - return; - CopyNRelFileInfoToBufTag(tag, rinfo); tag.forkNum = forkNum; CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); - /* + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + + if (!LFC_ENABLED() || !lfc_ensure_opened()) + { + LWLockRelease(lfc_lock); + return; + } + generation = lfc_ctl->generation; + + /* * For every chunk that has blocks we're interested in, we * 1. get the chunk header * 2. Check if the chunk actually has the blocks we're interested in @@ -878,6 +1144,8 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1); int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK)); instr_time io_start, io_end; + ConditionVariable* cv; + Assert(blocks_in_chunk > 0); for (int i = 0; i < blocks_in_chunk; i++) @@ -888,14 +1156,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1); hash = get_hash_value(lfc_hash, &tag); - - LWLockAcquire(lfc_lock, LW_EXCLUSIVE); - - if (!LFC_ENABLED()) - { - LWLockRelease(lfc_lock); - return; - } + cv = &lfc_ctl->cv[hash % N_COND_VARS]; entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found); @@ -908,92 +1169,50 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, if (entry->access_count++ == 0) dlist_delete(&entry->list_node); } - /*----------- - * If the chunk wasn't already in the LFC then we have these - * options, in order of preference: - * - * Unless there is no space available, we can: - * 1. Use an entry from the `holes` list, and - * 2. Create a new entry. - * We can always, regardless of space in the LFC: - * 3. evict an entry from LRU, and - * 4. ignore the write operation (the least favorite option) - */ - else if (lfc_ctl->used < lfc_ctl->limit) - { - if (!dlist_is_empty(&lfc_ctl->holes)) - { - /* We can reuse a hole that was left behind when the LFC was shrunk previously */ - FileCacheEntry *hole = dlist_container(FileCacheEntry, list_node, - dlist_pop_head_node(&lfc_ctl->holes)); - uint32 offset = hole->offset; - bool hole_found; - - hash_search_with_hash_value(lfc_hash, &hole->key, - hole->hash, HASH_REMOVE, &hole_found); - CriticalAssert(hole_found); - - lfc_ctl->used += 1; - entry->offset = offset; /* reuse the hole */ - } - else - { - lfc_ctl->used += 1; - entry->offset = lfc_ctl->size++;/* allocate new chunk at end - * of file */ - } - } - /* - * We've already used up all allocated LFC entries. - * - * If we can clear an entry from the LRU, do that. - * If we can't (e.g. because all other slots are being accessed) - * then we will remove this entry from the hash and continue - * on to the next chunk, as we may not exceed the limit. - */ - else if (!dlist_is_empty(&lfc_ctl->lru)) - { - /* Cache overflow: evict least recently used chunk */ - FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node, - dlist_pop_head_node(&lfc_ctl->lru)); - - for (int i = 0; i < BLOCKS_PER_CHUNK; i++) - { - lfc_ctl->used_pages -= (victim->bitmap[i >> 5] >> (i & 31)) & 1; - } - - CriticalAssert(victim->access_count == 0); - entry->offset = victim->offset; /* grab victim's chunk */ - hash_search_with_hash_value(lfc_hash, &victim->key, - victim->hash, HASH_REMOVE, NULL); - neon_log(DEBUG2, "Swap file cache page"); - } else { - /* Can't add this chunk - we don't have the space for it */ - hash_search_with_hash_value(lfc_hash, &entry->key, hash, - HASH_REMOVE, NULL); - - /* - * We can't process this chunk due to lack of space in LFC, - * so skip to the next one - */ - LWLockRelease(lfc_lock); - blkno += blocks_in_chunk; - buf_offset += blocks_in_chunk; - nblocks -= blocks_in_chunk; - continue; + if (!lfc_init_new_entry(entry, hash)) + { + /* + * We can't process this chunk due to lack of space in LFC, + * so skip to the next one + */ + blkno += blocks_in_chunk; + buf_offset += blocks_in_chunk; + nblocks -= blocks_in_chunk; + continue; + } } - if (!found) - { - entry->access_count = 1; - entry->hash = hash; - memset(entry->bitmap, 0, sizeof entry->bitmap); - } - - generation = lfc_ctl->generation; entry_offset = entry->offset; + + for (int i = 0; i < blocks_in_chunk; i++) + { + FileCacheBlockState state = UNAVAILABLE; + bool sleeping = false; + while (lfc_ctl->generation == generation) + { + state = GET_STATE(entry, chunk_offs + i); + if (state == PENDING) { + SET_STATE(entry, chunk_offs + i, REQUESTED); + } else if (state != REQUESTED) { + SET_STATE(entry, chunk_offs + i, PENDING); + break; + } + if (!sleeping) + { + ConditionVariablePrepareToSleep(cv); + sleeping = true; + } + LWLockRelease(lfc_lock); + ConditionVariableTimedSleep(cv, CV_WAIT_TIMEOUT, WAIT_EVENT_NEON_LFC_CV_WAIT); + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + } + if (sleeping) + { + ConditionVariableCancelSleep(); + } + } LWLockRelease(lfc_lock); pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE); @@ -1006,6 +1225,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, if (rc != BLCKSZ * blocks_in_chunk) { lfc_disable("write"); + return; } else { @@ -1029,18 +1249,30 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, for (int i = 0; i < blocks_in_chunk; i++) { - lfc_ctl->used_pages += 1 - ((entry->bitmap[(chunk_offs + i) >> 5] >> ((chunk_offs + i) & 31)) & 1); - entry->bitmap[(chunk_offs + i) >> 5] |= - ((uint32)1 << ((chunk_offs + i) & 31)); + FileCacheBlockState state = GET_STATE(entry, chunk_offs + i); + if (state == REQUESTED) + { + ConditionVariableBroadcast(cv); + } + if (state != AVAILABLE) + { + lfc_ctl->used_pages += 1; + SET_STATE(entry, chunk_offs + i, AVAILABLE); + } } } - - LWLockRelease(lfc_lock); + else + { + /* stop iteration if LFC was disabled */ + lfc_close_file(); + break; + } } blkno += blocks_in_chunk; buf_offset += blocks_in_chunk; nblocks -= blocks_in_chunk; } + LWLockRelease(lfc_lock); } typedef struct @@ -1127,6 +1359,16 @@ neon_get_lfc_stats(PG_FUNCTION_ARGS) if (lfc_ctl) value = lfc_ctl->used_pages; break; + case 6: + key = "file_cache_evicted_pages"; + if (lfc_ctl) + value = lfc_ctl->evicted_pages; + break; + case 7: + key = "file_cache_limit"; + if (lfc_ctl) + value = lfc_ctl->limit; + break; default: SRF_RETURN_DONE(funcctx); } @@ -1250,8 +1492,8 @@ local_cache_pages(PG_FUNCTION_ARGS) hash_seq_init(&status, lfc_hash); while ((entry = hash_seq_search(&status)) != NULL) { - for (int i = 0; i < CHUNK_BITMAP_SIZE; i++) - n_pages += pg_popcount32(entry->bitmap[i]); + for (int i = 0; i < BLOCKS_PER_CHUNK; i++) + n_pages += GET_STATE(entry, i) == AVAILABLE; } } } @@ -1279,7 +1521,7 @@ local_cache_pages(PG_FUNCTION_ARGS) { for (int i = 0; i < BLOCKS_PER_CHUNK; i++) { - if (entry->bitmap[i >> 5] & ((uint32)1 << (i & 31))) + if (GET_STATE(entry, i) == AVAILABLE) { fctx->record[n].pageoffs = entry->offset * BLOCKS_PER_CHUNK + i; fctx->record[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)); diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index ce2938cfd5..700a942284 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -56,6 +56,7 @@ uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE; uint32 WAIT_EVENT_NEON_LFC_READ; uint32 WAIT_EVENT_NEON_LFC_TRUNCATE; uint32 WAIT_EVENT_NEON_LFC_WRITE; +uint32 WAIT_EVENT_NEON_LFC_CV_WAIT; uint32 WAIT_EVENT_NEON_PS_STARTING; uint32 WAIT_EVENT_NEON_PS_CONFIGURING; uint32 WAIT_EVENT_NEON_PS_SEND; @@ -538,6 +539,7 @@ neon_shmem_startup_hook(void) WAIT_EVENT_NEON_LFC_READ = WaitEventExtensionNew("Neon/FileCache_Read"); WAIT_EVENT_NEON_LFC_TRUNCATE = WaitEventExtensionNew("Neon/FileCache_Truncate"); WAIT_EVENT_NEON_LFC_WRITE = WaitEventExtensionNew("Neon/FileCache_Write"); + WAIT_EVENT_NEON_LFC_CV_WAIT = WaitEventExtensionNew("Neon/FileCache_CvWait"); WAIT_EVENT_NEON_PS_STARTING = WaitEventExtensionNew("Neon/PS_Starting"); WAIT_EVENT_NEON_PS_CONFIGURING = WaitEventExtensionNew("Neon/PS_Configuring"); WAIT_EVENT_NEON_PS_SEND = WaitEventExtensionNew("Neon/PS_SendIO"); diff --git a/pgxn/neon/neon.h b/pgxn/neon/neon.h index 79aa88b8d3..912e09c3d3 100644 --- a/pgxn/neon/neon.h +++ b/pgxn/neon/neon.h @@ -28,6 +28,7 @@ extern uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE; extern uint32 WAIT_EVENT_NEON_LFC_READ; extern uint32 WAIT_EVENT_NEON_LFC_TRUNCATE; extern uint32 WAIT_EVENT_NEON_LFC_WRITE; +extern uint32 WAIT_EVENT_NEON_LFC_CV_WAIT; extern uint32 WAIT_EVENT_NEON_PS_STARTING; extern uint32 WAIT_EVENT_NEON_PS_CONFIGURING; extern uint32 WAIT_EVENT_NEON_PS_SEND; @@ -38,6 +39,7 @@ extern uint32 WAIT_EVENT_NEON_WAL_DL; #define WAIT_EVENT_NEON_LFC_READ WAIT_EVENT_BUFFILE_READ #define WAIT_EVENT_NEON_LFC_TRUNCATE WAIT_EVENT_BUFFILE_TRUNCATE #define WAIT_EVENT_NEON_LFC_WRITE WAIT_EVENT_BUFFILE_WRITE +#define WAIT_EVENT_NEON_LFC_CV_WAIT WAIT_EVENT_BUFFILE_READ #define WAIT_EVENT_NEON_PS_STARTING PG_WAIT_EXTENSION #define WAIT_EVENT_NEON_PS_CONFIGURING PG_WAIT_EXTENSION #define WAIT_EVENT_NEON_PS_SEND PG_WAIT_EXTENSION diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 7b748d7252..9faab1e4f0 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -233,6 +233,7 @@ extern char *neon_timeline; extern char *neon_tenant; extern int32 max_cluster_size; extern int neon_protocol_version; +extern bool lfc_store_prefetch_result; extern shardno_t get_shard_number(BufferTag* tag); @@ -301,14 +302,16 @@ extern bool lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno); extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, int nblocks, bits8 *bitmap); -extern void lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno); extern void lfc_init(void); +extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, + const void* buffer, XLogRecPtr lsn); + static inline bool lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, void *buffer) { - bits8 rv = 0; + bits8 rv = 1; return lfc_readv_select(rinfo, forkNum, blkno, &buffer, 1, &rv) == 1; } diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 6c812f347f..4a79acd777 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -162,7 +162,7 @@ static uint32 local_request_counter; * UNUSED ------> REQUESTED --> RECEIVED * ^ : | | * | : v | - * | : TAG_UNUSED | + * | : TAG_REMAINS | * | : | | * +----------------+------------+ * : @@ -181,7 +181,7 @@ typedef enum PrefetchStatus /* must fit in uint8; bits 0x1 are used */ typedef enum { PRFSF_NONE = 0x0, - PRFSF_SEQ = 0x1, + PRFSF_LFC = 0x1 /* received prefetch result is stored in LFC */ } PrefetchRequestFlags; typedef struct PrefetchRequest @@ -305,7 +305,7 @@ GetLastWrittenLSNv(NRelFileInfo relfilenode, ForkNumber forknum, static void neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, neon_request_lsns *output, - BlockNumber nblocks, const bits8 *mask); + BlockNumber nblocks); static bool neon_prefetch_response_usable(neon_request_lsns *request_lsns, PrefetchRequest *slot); @@ -363,6 +363,7 @@ compact_prefetch_buffers(void) target_slot->buftag = source_slot->buftag; target_slot->shard_no = source_slot->shard_no; target_slot->status = source_slot->status; + target_slot->flags = source_slot->flags; target_slot->response = source_slot->response; target_slot->reqid = source_slot->reqid; target_slot->request_lsns = source_slot->request_lsns; @@ -452,6 +453,18 @@ prefetch_pump_state(void) /* update slot state */ slot->status = PRFS_RECEIVED; slot->response = response; + + if (response->tag == T_NeonGetPageResponse && !(slot->flags & PRFSF_LFC) && lfc_store_prefetch_result) + { + /* + * Store prefetched result in LFC (please read comments to lfc_prefetch + * explaining why it can be done without holding shared buffer lock + */ + if (lfc_prefetch(BufTagGetNRelFileInfo(slot->buftag), slot->buftag.forkNum, slot->buftag.blockNum, ((NeonGetPageResponse*)response)->page, slot->request_lsns.not_modified_since)) + { + slot->flags |= PRFSF_LFC; + } + } } } @@ -713,6 +726,18 @@ prefetch_read(PrefetchRequest *slot) /* update slot state */ slot->status = PRFS_RECEIVED; slot->response = response; + + if (response->tag == T_NeonGetPageResponse && !(slot->flags & PRFSF_LFC) && lfc_store_prefetch_result) + { + /* + * Store prefetched result in LFC (please read comments to lfc_prefetch + * explaining why it can be done without holding shared buffer lock + */ + if (lfc_prefetch(BufTagGetNRelFileInfo(buftag), buftag.forkNum, buftag.blockNum, ((NeonGetPageResponse*)response)->page, slot->request_lsns.not_modified_since)) + { + slot->flags |= PRFSF_LFC; + } + } return true; } else @@ -864,7 +889,7 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns else neon_get_request_lsns(BufTagGetNRelFileInfo(slot->buftag), slot->buftag.forkNum, slot->buftag.blockNum, - &slot->request_lsns, 1, NULL); + &slot->request_lsns, 1); request.hdr.lsn = slot->request_lsns.request_lsn; request.hdr.not_modified_since = slot->request_lsns.not_modified_since; @@ -890,6 +915,73 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns Assert(!found); } +/* + * Lookup of already received prefetch requests. Only already received responses matching required LSNs are accepted. + * Present pages are marked in "mask" bitmap and total number of such pages is returned. + */ +static int +prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blocknum, neon_request_lsns *lsns, + BlockNumber nblocks, void **buffers, bits8 *mask) +{ + int hits = 0; + PrefetchRequest hashkey; + + /* + * Use an intermediate PrefetchRequest struct as the hash key to ensure + * correct alignment and that the padding bytes are cleared. + */ + memset(&hashkey.buftag, 0, sizeof(BufferTag)); + CopyNRelFileInfoToBufTag(hashkey.buftag, rinfo); + hashkey.buftag.forkNum = forknum; + + for (int i = 0; i < nblocks; i++) + { + PrfHashEntry *entry; + + hashkey.buftag.blockNum = blocknum + i; + entry = prfh_lookup(MyPState->prf_hash, &hashkey); + + if (entry != NULL) + { + PrefetchRequest *slot = entry->slot; + uint64 ring_index = slot->my_ring_index; + Assert(slot == GetPrfSlot(ring_index)); + + Assert(slot->status != PRFS_UNUSED); + Assert(MyPState->ring_last <= ring_index && + ring_index < MyPState->ring_unused); + Assert(BufferTagsEqual(&slot->buftag, &hashkey.buftag)); + + if (slot->status != PRFS_RECEIVED) + continue; + + /* + * If the caller specified a request LSN to use, only accept + * prefetch responses that satisfy that request. + */ + if (!neon_prefetch_response_usable(&lsns[i], slot)) + continue; + + memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ); + prefetch_set_unused(ring_index); + BITMAP_SET(mask, i); + + hits += 1; + } + } + pgBufferUsage.prefetch.hits += hits; + return hits; +} + +#if PG_MAJORVERSION_NUM < 17 +static bool +prefetch_lookup(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkn, neon_request_lsns *lsns, void *buffer) +{ + bits8 present = 0; + return prefetch_lookupv(rinfo, forkNum, blkn, lsns, 1, &buffer, &present) != 0; +} +#endif + /* * prefetch_register_bufferv() - register and prefetch buffers * @@ -1013,8 +1105,6 @@ Retry: /* The buffered request is good enough, return that index */ if (is_prefetch) pgBufferUsage.prefetch.duplicates++; - else - pgBufferUsage.prefetch.hits++; continue; } } @@ -1116,6 +1206,7 @@ Retry: slot->buftag = hashkey.buftag; slot->shard_no = get_shard_number(&tag); slot->my_ring_index = ring_index; + slot->flags = 0; min_ring_index = Min(min_ring_index, ring_index); @@ -2056,8 +2147,7 @@ GetLastWrittenLSNv(NRelFileInfo relfilenode, ForkNumber forknum, */ static void neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, - neon_request_lsns *output, BlockNumber nblocks, - const bits8 *mask) + neon_request_lsns *output, BlockNumber nblocks) { XLogRecPtr last_written_lsns[PG_IOV_MAX]; @@ -2145,9 +2235,6 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, neon_request_lsns *result = &output[i]; XLogRecPtr last_written_lsn = last_written_lsns[i]; - if (PointerIsValid(mask) && !BITMAP_ISSET(mask, i)) - continue; - if (last_written_lsn > replay_lsn) { /* GetCurrentReplayRecPtr was introduced in v15 */ @@ -2190,8 +2277,6 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, neon_request_lsns *result = &output[i]; XLogRecPtr last_written_lsn = last_written_lsns[i]; - if (PointerIsValid(mask) && !BITMAP_ISSET(mask, i)) - continue; /* * Use the latest LSN that was evicted from the buffer cache as the * 'not_modified_since' hint. Any pages modified by later WAL records @@ -2413,7 +2498,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum) } neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, - REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1, NULL); + REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1); { NeonExistsRequest request = { .hdr.tag = T_NeonExistsRequest, @@ -2832,8 +2917,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, while (nblocks > 0) { int iterblocks = Min(nblocks, PG_IOV_MAX); - bits8 lfc_present[PG_IOV_MAX / 8]; - memset(lfc_present, 0, sizeof(lfc_present)); + bits8 lfc_present[PG_IOV_MAX / 8] = {0}; if (lfc_cache_containsv(InfoFromSMgrRel(reln), forknum, blocknum, iterblocks, lfc_present) == iterblocks) @@ -2844,12 +2928,13 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, } tag.blockNum = blocknum; - + for (int i = 0; i < PG_IOV_MAX / 8; i++) lfc_present[i] = ~(lfc_present[i]); ring_index = prefetch_register_bufferv(tag, NULL, iterblocks, lfc_present, true); + nblocks -= iterblocks; blocknum += iterblocks; @@ -3105,7 +3190,8 @@ Retry: } } memcpy(buffer, getpage_resp->page, BLCKSZ); - lfc_write(rinfo, forkNum, blockno, buffer); + if (!lfc_store_prefetch_result) + lfc_write(rinfo, forkNum, blockno, buffer); break; } case T_NeonErrorResponse: @@ -3190,6 +3276,17 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } + /* Try to read PS results if they are available */ + prefetch_pump_state(); + + neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1); + + if (prefetch_lookup(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, buffer)) + { + /* Prefetch hit */ + return; + } + /* Try to read from local file cache */ if (lfc_read(InfoFromSMgrRel(reln), forkNum, blkno, buffer)) { @@ -3197,9 +3294,11 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer return; } - neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1, NULL); neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsns, buffer); + /* + * Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes. + */ prefetch_pump_state(); #ifdef DEBUG_COMPARE_LOCAL @@ -3280,11 +3379,14 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer #if PG_MAJORVERSION_NUM >= 17 static void neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, - void **buffers, BlockNumber nblocks) + void **buffers, BlockNumber nblocks) { + bits8 prefetch_hits[PG_IOV_MAX / 8] = {0}; + bits8 lfc_hits[PG_IOV_MAX / 8]; bits8 read[PG_IOV_MAX / 8]; neon_request_lsns request_lsns[PG_IOV_MAX]; int lfc_result; + int prefetch_result; switch (reln->smgr_relpersistence) { @@ -3307,38 +3409,52 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, neon_log(ERROR, "Read request too large: %d is larger than max %d", nblocks, PG_IOV_MAX); - memset(read, 0, sizeof(read)); + /* Try to read PS results if they are available */ + prefetch_pump_state(); + + neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum, + request_lsns, nblocks); + + + prefetch_result = prefetch_lookupv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns, nblocks, buffers, prefetch_hits); + + if (prefetch_result == nblocks) + return; + + /* invert the result: exclude prefetched blocks */ + for (int i = 0; i < PG_IOV_MAX / 8; i++) + lfc_hits[i] = ~prefetch_hits[i]; /* Try to read from local file cache */ lfc_result = lfc_readv_select(InfoFromSMgrRel(reln), forknum, blocknum, buffers, - nblocks, read); + nblocks, lfc_hits); if (lfc_result > 0) MyNeonCounters->file_cache_hits_total += lfc_result; /* Read all blocks from LFC, so we're done */ - if (lfc_result == nblocks) + if (prefetch_result + lfc_result == nblocks) return; - if (lfc_result == -1) + if (lfc_result <= 0) { /* can't use the LFC result, so read all blocks from PS */ for (int i = 0; i < PG_IOV_MAX / 8; i++) - read[i] = 0xFF; + read[i] = ~prefetch_hits[i]; } else { /* invert the result: exclude blocks read from lfc */ for (int i = 0; i < PG_IOV_MAX / 8; i++) - read[i] = ~(read[i]); + read[i] = ~(prefetch_hits[i] | lfc_hits[i]); } - neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum, - request_lsns, nblocks, read); - neon_read_at_lsnv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns, buffers, nblocks, read); + /* + * Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes. + */ prefetch_pump_state(); #ifdef DEBUG_COMPARE_LOCAL @@ -3610,7 +3726,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum) } neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, - REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1, NULL); + REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1); { NeonNblocksRequest request = { @@ -3695,7 +3811,7 @@ neon_dbsize(Oid dbNode) NRelFileInfo dummy_node = {0}; neon_get_request_lsns(dummy_node, MAIN_FORKNUM, - REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1, NULL); + REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1); { NeonDbSizeRequest request = { @@ -4430,7 +4546,12 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id) if (no_redo_needed) { SetLastWrittenLSNForBlock(end_recptr, rinfo, forknum, blkno); - lfc_evict(rinfo, forknum, blkno); + /* + * Redo changes if page exists in LFC. + * We should perform this check after assigning LwLSN to prevent + * prefetching of some older version of the page by some other backend. + */ + no_redo_needed = !lfc_cache_contains(rinfo, forknum, blkno); } LWLockRelease(partitionLock); diff --git a/test_runner/regress/test_lfc_prefetch.py b/test_runner/regress/test_lfc_prefetch.py new file mode 100644 index 0000000000..dd422d996e --- /dev/null +++ b/test_runner/regress/test_lfc_prefetch.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +import time + +import pytest +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnv +from fixtures.utils import USE_LFC + + +@pytest.mark.timeout(600) +@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping") +def test_lfc_prefetch(neon_simple_env: NeonEnv): + """ + Test resizing the Local File Cache + """ + env = neon_simple_env + endpoint = env.endpoints.create_start( + "main", + config_lines=[ + "neon.max_file_cache_size=1GB", + "neon.file_cache_size_limit=1GB", + "effective_io_concurrency=100", + "shared_buffers=1MB", + "enable_bitmapscan=off", + "enable_seqscan=off", + "autovacuum=off", + ], + ) + conn = endpoint.connect() + cur = conn.cursor() + cur.execute("create extension neon") + cur.execute("create table t(pk integer, sk integer, filler text default repeat('x',200))") + cur.execute("set statement_timeout=0") + cur.execute("select setseed(0.5)") + cur.execute("insert into t values (generate_series(1,1000000),random()*1000000)") + cur.execute("create index on t(sk)") + cur.execute("vacuum t") + + # reset LFC + cur.execute("alter system set neon.file_cache_size_limit=0") + cur.execute("select pg_reload_conf()") + time.sleep(1) + cur.execute("alter system set neon.file_cache_size_limit='1GB'") + cur.execute("select pg_reload_conf()") + + cur.execute( + "explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 100000 and 200000 limit 100) s1" + ) + prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"] + log.info(f"Unused prefetches: {prefetch_expired}") + + cur.execute( + "explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 200000 and 300000 limit 100) s2" + ) + prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"] + log.info(f"Unused prefetches: {prefetch_expired}") + + cur.execute( + "explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 300000 and 400000 limit 100) s3" + ) + prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"] + log.info(f"Unused prefetches: {prefetch_expired}") + + cur.execute( + "explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 100000 and 200000 limit 100) s4" + ) + prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"] + log.info(f"Unused prefetches: {prefetch_expired}") + + # if prefetch requests are not stored in LFC, we continue to sent unused prefetch request tyo PS + assert prefetch_expired > 0 + + cur.execute("set neon.store_prefetch_result_in_lfc=on") + + cur.execute( + "explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 500000 and 600000 limit 100) s5" + ) + prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"] + log.info(f"Unused prefetches: {prefetch_expired}") + + cur.execute( + "explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 600000 and 700000 limit 100) s6" + ) + prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"] + log.info(f"Unused prefetches: {prefetch_expired}") + + cur.execute( + "explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 700000 and 800000 limit 100) s7" + ) + prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"] + log.info(f"Unused prefetches: {prefetch_expired}") + + cur.execute( + "explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 500000 and 600000 limit 100) s8" + ) + prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"] + log.info(f"Unused prefetches: {prefetch_expired}") + + # No redundant prefethc requrests if prefetch results are stored in LFC + assert prefetch_expired == 0