From 140c0edac8a8322efaf88fc15d11977b077869a5 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Tue, 27 Dec 2022 14:42:51 +0200 Subject: [PATCH] Yet another port of local file system cache (#2622) --- pgxn/neon/Makefile | 3 +- pgxn/neon/file_cache.c | 597 +++++++++++++++++++++++++++++++++++ pgxn/neon/libpagestore.c | 1 + pgxn/neon/neon--1.0.sql | 10 + pgxn/neon/pagestore_client.h | 7 + pgxn/neon/pagestore_smgr.c | 14 + 6 files changed, 631 insertions(+), 1 deletion(-) create mode 100644 pgxn/neon/file_cache.c diff --git a/pgxn/neon/Makefile b/pgxn/neon/Makefile index 7f4e30a12e..ec377dbb1e 100644 --- a/pgxn/neon/Makefile +++ b/pgxn/neon/Makefile @@ -4,11 +4,12 @@ MODULE_big = neon OBJS = \ $(WIN32RES) \ + file_cache.o \ libpagestore.o \ libpqwalproposer.o \ + neon.o \ pagestore_smgr.o \ relsize_cache.o \ - neon.o \ walproposer.o \ walproposer_utils.o diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c new file mode 100644 index 0000000000..96c2461e2d --- /dev/null +++ b/pgxn/neon/file_cache.c @@ -0,0 +1,597 @@ +/* + * + * file_cache.c + * + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * pgxn/neon/file_cache.c + * + *------------------------------------------------------------------------- + */ + +#include +#include +#include + +#include "postgres.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "pagestore_client.h" +#include "access/parallel.h" +#include "postmaster/bgworker.h" +#include "storage/relfilenode.h" +#include "storage/buf_internals.h" +#include "storage/latch.h" +#include "storage/ipc.h" +#include "storage/lwlock.h" +#include "utils/dynahash.h" +#include "utils/guc.h" +#include "storage/fd.h" +#include "storage/pg_shmem.h" +#include "storage/buf_internals.h" + +/* + * Local file cache is used to temporary store relations pages in local file system. + * All blocks of all relations are stored inside one file and addressed using shared hash map. + * Currently LRU eviction policy based on L2 list is used as replacement algorithm. + * As far as manipulation of L2-list requires global critical section, we are not using partitioned hash. + * Also we are using exclusive lock even for read operation because LRU requires relinking element in L2 list. + * If this lock become a bottleneck, we can consider other eviction strategies, for example clock algorithm. + * + * Cache is always reconstructed at node startup, so we do not need to save mapping somewhere and worry about + * its consistency. + */ + +/* Local file storage allocation chunk. + * Should be power of two and not less than 32. Using larger than page chunks can + * 1. Reduce hash-map memory footprint: 8TB database contains billion pages + * and size of hash entry is 40 bytes, so we need 40Gb just for hash map. + * 1Mb chunks can reduce hash map size to 320Mb. + * 2. Improve access locality, subsequent pages will be allocated together improving seqscan speed + */ +#define BLOCKS_PER_CHUNK 128 /* 1Mb chunk */ +#define MB ((uint64)1024*1024) + +#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK)) + +typedef struct FileCacheEntry +{ + BufferTag key; + uint32 offset; + uint32 access_count; + uint32 bitmap[BLOCKS_PER_CHUNK/32]; + dlist_node lru_node; /* LRU list node */ +} FileCacheEntry; + +typedef struct FileCacheControl +{ + uint32 size; /* size of cache file in chunks */ + dlist_head lru; /* double linked list for LRU replacement algorithm */ +} FileCacheControl; + +static HTAB* lfc_hash; +static int lfc_desc; +static LWLockId lfc_lock; +static int lfc_max_size; +static int lfc_size_limit; +static char* lfc_path; +static FileCacheControl* lfc_ctl; +static shmem_startup_hook_type prev_shmem_startup_hook; +#if PG_VERSION_NUM>=150000 +static shmem_request_hook_type prev_shmem_request_hook; +#endif + +static void +lfc_shmem_startup(void) +{ + bool found; + static HASHCTL info; + + if (prev_shmem_startup_hook) + { + prev_shmem_startup_hook(); + } + + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + lfc_ctl = (FileCacheControl*)ShmemInitStruct("lfc", sizeof(FileCacheControl), &found); + if (!found) + { + uint32 lfc_size = SIZE_MB_TO_CHUNKS(lfc_max_size); + lfc_lock = (LWLockId)GetNamedLWLockTranche("lfc_lock"); + info.keysize = sizeof(BufferTag); + info.entrysize = sizeof(FileCacheEntry); + lfc_hash = ShmemInitHash("lfc_hash", + /* lfc_size+1 because we add new element to hash table before eviction of victim */ + lfc_size+1, lfc_size+1, + &info, + HASH_ELEM | HASH_BLOBS); + lfc_ctl->size = 0; + dlist_init(&lfc_ctl->lru); + + /* Remove file cache on restart */ + (void)unlink(lfc_path); + } + LWLockRelease(AddinShmemInitLock); +} + +static void +lfc_shmem_request(void) +{ +#if PG_VERSION_NUM>=150000 + if (prev_shmem_request_hook) + prev_shmem_request_hook(); +#endif + + RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size)+1, sizeof(FileCacheEntry))); + RequestNamedLWLockTranche("lfc_lock", 1); +} + +bool +lfc_check_limit_hook(int *newval, void **extra, GucSource source) +{ + if (*newval > lfc_max_size) + { + elog(ERROR, "neon.file_cache_size_limit can not be larger than neon.max_file_cache_size"); + return false; + } + return true; +} + +void +lfc_change_limit_hook(int newval, void *extra) +{ + uint32 new_size = SIZE_MB_TO_CHUNKS(newval); + /* + * Stats collector detach shared memory, so we should not try to access shared memory here. + * Parallel workers first assign default value (0), so not perform truncation in parallel workers. + */ + if (!lfc_ctl || !UsedShmemSegAddr || IsParallelWorker()) + return; + + /* Open cache file if not done yet */ + if (lfc_desc == 0) + { + lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT); + if (lfc_desc < 0) { + elog(LOG, "Failed to open file cache %s: %m", lfc_path); + lfc_size_limit = 0; /* disable file cache */ + return; + } + } + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + while (new_size < lfc_ctl->size && !dlist_is_empty(&lfc_ctl->lru)) + { + /* Shrink cache by throwing away least recently accessed chunks and returning their space to file system */ + FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru)); + Assert(victim->access_count == 0); +#ifdef FALLOC_FL_PUNCH_HOLE + if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE, (off_t)victim->offset*BLOCKS_PER_CHUNK*BLCKSZ, BLOCKS_PER_CHUNK*BLCKSZ) < 0) + elog(LOG, "Failed to punch hole in file: %m"); +#endif + hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL); + lfc_ctl->size -= 1; + } + elog(LOG, "set local file cache limit to %d", new_size); + LWLockRelease(lfc_lock); +} + +void +lfc_init(void) +{ + /* + * In order to create our shared memory area, we have to be loaded via + * shared_preload_libraries. + */ + if (!process_shared_preload_libraries_in_progress) + elog(ERROR, "Neon module should be loaded via shared_preload_libraries"); + + DefineCustomIntVariable("neon.max_file_cache_size", + "Maximal size of Neon local file cache", + NULL, + &lfc_max_size, + 0, /* disabled by default */ + 0, + INT_MAX, + PGC_POSTMASTER, + GUC_UNIT_MB, + NULL, + NULL, + NULL); + + DefineCustomIntVariable("neon.file_cache_size_limit", + "Current limit for size of Neon local file cache", + NULL, + &lfc_size_limit, + 0, /* disabled by default */ + 0, + INT_MAX, + PGC_SIGHUP, + GUC_UNIT_MB, + NULL, + lfc_change_limit_hook, + NULL); + + DefineCustomStringVariable("neon.file_cache_path", + "Path to local file cache (can be raw device)", + NULL, + &lfc_path, + "file.cache", + PGC_POSTMASTER, + 0, + NULL, + NULL, + NULL); + + if (lfc_max_size == 0) + return; + + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = lfc_shmem_startup; +#if PG_VERSION_NUM>=150000 + prev_shmem_request_hook = shmem_request_hook; + shmem_request_hook = lfc_shmem_request; +#else + lfc_shmem_request(); +#endif +} + +/* + * Check if page is present in the cache. + * Returns true if page is found in local cache. + */ +bool +lfc_cache_contains(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno) +{ + BufferTag tag; + FileCacheEntry* entry; + int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1); + bool found; + uint32 hash; + + if (lfc_size_limit == 0) /* fast exit if file cache is disabled */ + return false; + + tag.rnode = rnode; + tag.forkNum = forkNum; + tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1); + hash = get_hash_value(lfc_hash, &tag); + + LWLockAcquire(lfc_lock, LW_SHARED); + entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL); + found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0; + LWLockRelease(lfc_lock); + return found; +} + +/* + * Try to read page from local cache. + * Returns true if page is found in local cache. + * In case of error lfc_size_limit is set to zero to disable any further opera-tins with cache. + */ +bool +lfc_read(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, + char *buffer) +{ + BufferTag tag; + FileCacheEntry* entry; + ssize_t rc; + int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1); + bool result = true; + uint32 hash; + + if (lfc_size_limit == 0) /* fast exit if file cache is disabled */ + return false; + + tag.rnode = rnode; + tag.forkNum = forkNum; + tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1); + hash = get_hash_value(lfc_hash, &tag); + + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL); + if (entry == NULL || (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) == 0) + { + /* Page is not cached */ + LWLockRelease(lfc_lock); + return false; + } + /* Unlink entry from LRU list to pin it for the duration of IO operation */ + if (entry->access_count++ == 0) + dlist_delete(&entry->lru_node); + LWLockRelease(lfc_lock); + + /* Open cache file if not done yet */ + if (lfc_desc == 0) + { + lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT); + if (lfc_desc < 0) { + elog(LOG, "Failed to open file cache %s: %m", lfc_path); + lfc_size_limit = 0; /* disable file cache */ + result = false; + } + } + + if (lfc_desc > 0) + { + rc = pread(lfc_desc, buffer, BLCKSZ, ((off_t)entry->offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ); + if (rc != BLCKSZ) + { + elog(INFO, "Failed to read file cache: %m"); + lfc_size_limit = 0; /* disable file cache */ + result = false; + } + } + + /* Place entry to the head of LRU list */ + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + Assert(entry->access_count > 0); + if (--entry->access_count == 0) + dlist_push_tail(&lfc_ctl->lru, &entry->lru_node); + LWLockRelease(lfc_lock); + + return result; +} + +/* + * Put page in local file cache. + * If cache is full then evict some other page. + */ +void +lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, + char *buffer) +{ + BufferTag tag; + FileCacheEntry* entry; + ssize_t rc; + bool found; + int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1); + uint32 hash; + + if (lfc_size_limit == 0) /* fast exit if file cache is disabled */ + return; + + tag.rnode = rnode; + tag.forkNum = forkNum; + tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1); + hash = get_hash_value(lfc_hash, &tag); + + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found); + + if (found) + { + /* Unlink entry from LRU list to pin it for the duration of IO operation */ + if (entry->access_count++ == 0) + dlist_delete(&entry->lru_node); + } + else + { + /* + * We have two choices if all cache pages are pinned (i.e. used in IO operations): + * 1. Wait until some of this operation is completed and pages is unpinned + * 2. Allocate one more chunk, so that specified cache size is more recommendation than hard limit. + * As far as probability of such event (that all pages are pinned) is considered to be very very small: + * there are should be very large number of concurrent IO operations and them are limited by max_connections, + * we prefer not to complicate code and use second approach. + */ + if (lfc_ctl->size >= SIZE_MB_TO_CHUNKS(lfc_size_limit) && !dlist_is_empty(&lfc_ctl->lru)) + { + /* Cache overflow: evict least recently used chunk */ + FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru)); + Assert(victim->access_count == 0); + entry->offset = victim->offset; /* grab victim's chunk */ + hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL); + elog(LOG, "Swap file cache page"); + } + else + entry->offset = lfc_ctl->size++; /* allocate new chunk at end of file */ + entry->access_count = 1; + memset(entry->bitmap, 0, sizeof entry->bitmap); + } + LWLockRelease(lfc_lock); + + /* Open cache file if not done yet */ + if (lfc_desc == 0) + { + lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT); + if (lfc_desc < 0) { + elog(LOG, "Failed to open file cache %s: %m", lfc_path); + lfc_size_limit = 0; /* disable file cache */ + } + } + if (lfc_desc > 0) + { + rc = pwrite(lfc_desc, buffer, BLCKSZ, ((off_t)entry->offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ); + if (rc != BLCKSZ) + { + elog(INFO, "Failed to write file cache: %m"); + lfc_size_limit = 0; /* disable file cache */ + } + } + /* Place entry to the head of LRU list */ + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + Assert(entry->access_count > 0); + if (--entry->access_count == 0) + dlist_push_tail(&lfc_ctl->lru, &entry->lru_node); + if (lfc_size_limit != 0) + entry->bitmap[chunk_offs >> 5] |= (1 << (chunk_offs & 31)); + LWLockRelease(lfc_lock); +} + + +/* + * Record structure holding the to be exposed cache data. + */ +typedef struct +{ + uint32 pageoffs; + Oid relfilenode; + Oid reltablespace; + Oid reldatabase; + ForkNumber forknum; + BlockNumber blocknum; + uint16 accesscount; +} LocalCachePagesRec; + +/* + * Function context for data persisting over repeated calls. + */ +typedef struct +{ + TupleDesc tupdesc; + LocalCachePagesRec *record; +} LocalCachePagesContext; + +/* + * Function returning data from the local file cache + * relation node/tablespace/database/blocknum and access_counter + */ +PG_FUNCTION_INFO_V1(local_cache_pages); + +#define NUM_LOCALCACHE_PAGES_ELEM 7 + +Datum +local_cache_pages(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + Datum result; + MemoryContext oldcontext; + LocalCachePagesContext *fctx; /* User function context. */ + TupleDesc tupledesc; + TupleDesc expected_tupledesc; + HeapTuple tuple; + + if (SRF_IS_FIRSTCALL()) + { + HASH_SEQ_STATUS status; + FileCacheEntry* entry; + uint32 n_pages = 0; + uint32 i; + + funcctx = SRF_FIRSTCALL_INIT(); + + /* Switch context when allocating stuff to be used in later calls */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + /* Create a user function context for cross-call persistence */ + fctx = (LocalCachePagesContext *) palloc(sizeof(LocalCachePagesContext)); + + /* + * To smoothly support upgrades from version 1.0 of this extension + * transparently handle the (non-)existence of the pinning_backends + * column. We unfortunately have to get the result type for that... - + * we can't use the result type determined by the function definition + * without potentially crashing when somebody uses the old (or even + * wrong) function definition though. + */ + if (get_call_result_type(fcinfo, NULL, &expected_tupledesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + if (expected_tupledesc->natts != NUM_LOCALCACHE_PAGES_ELEM) + elog(ERROR, "incorrect number of output arguments"); + + /* Construct a tuple descriptor for the result rows. */ + tupledesc = CreateTemplateTupleDesc(expected_tupledesc->natts); + TupleDescInitEntry(tupledesc, (AttrNumber) 1, "pageoffs", + INT8OID, -1, 0); + TupleDescInitEntry(tupledesc, (AttrNumber) 2, "relfilenode", + OIDOID, -1, 0); + TupleDescInitEntry(tupledesc, (AttrNumber) 3, "reltablespace", + OIDOID, -1, 0); + TupleDescInitEntry(tupledesc, (AttrNumber) 4, "reldatabase", + OIDOID, -1, 0); + TupleDescInitEntry(tupledesc, (AttrNumber) 5, "relforknumber", + INT2OID, -1, 0); + TupleDescInitEntry(tupledesc, (AttrNumber) 6, "relblocknumber", + INT8OID, -1, 0); + TupleDescInitEntry(tupledesc, (AttrNumber) 7, "accesscount", + INT4OID, -1, 0); + + fctx->tupdesc = BlessTupleDesc(tupledesc); + + LWLockAcquire(lfc_lock, LW_SHARED); + + hash_seq_init(&status, lfc_hash); + while ((entry = hash_seq_search(&status)) != NULL) + { + for (int i = 0; i < BLOCKS_PER_CHUNK; i++) + n_pages += (entry->bitmap[i >> 5] & (1 << (i & 31))) != 0; + } + fctx->record = (LocalCachePagesRec *) + MemoryContextAllocHuge(CurrentMemoryContext, + sizeof(LocalCachePagesRec) * n_pages); + + /* Set max calls and remember the user function context. */ + funcctx->max_calls = n_pages; + funcctx->user_fctx = fctx; + + /* Return to original context when allocating transient memory */ + MemoryContextSwitchTo(oldcontext); + + /* + * Scan through all the buffers, saving the relevant fields in the + * fctx->record structure. + * + * We don't hold the partition locks, so we don't get a consistent + * snapshot across all buffers, but we do grab the buffer header + * locks, so the information of each buffer is self-consistent. + */ + n_pages = 0; + hash_seq_init(&status, lfc_hash); + while ((entry = hash_seq_search(&status)) != NULL) + { + for (int i = 0; i < BLOCKS_PER_CHUNK; i++) + { + if (entry->bitmap[i >> 5] & (1 << (i & 31))) + { + fctx->record[n_pages].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i; + fctx->record[n_pages].relfilenode = entry->key.rnode.relNode; + fctx->record[n_pages].reltablespace = entry->key.rnode.spcNode; + fctx->record[n_pages].reldatabase = entry->key.rnode.dbNode; + fctx->record[n_pages].forknum = entry->key.forkNum; + fctx->record[n_pages].blocknum = entry->key.blockNum + i; + fctx->record[n_pages].accesscount = entry->access_count; + n_pages += 1; + } + } + } + Assert(n_pages == funcctx->max_calls); + LWLockRelease(lfc_lock); + } + + funcctx = SRF_PERCALL_SETUP(); + + /* Get the saved state */ + fctx = funcctx->user_fctx; + + if (funcctx->call_cntr < funcctx->max_calls) + { + uint32 i = funcctx->call_cntr; + Datum values[NUM_LOCALCACHE_PAGES_ELEM]; + bool nulls[NUM_LOCALCACHE_PAGES_ELEM] = { + false, false, false, false, false, false, false + }; + + values[0] = Int64GetDatum((int64) fctx->record[i].pageoffs); + values[1] = ObjectIdGetDatum(fctx->record[i].relfilenode); + values[2] = ObjectIdGetDatum(fctx->record[i].reltablespace); + values[3] = ObjectIdGetDatum(fctx->record[i].reldatabase); + values[4] = ObjectIdGetDatum(fctx->record[i].forknum); + values[5] = Int64GetDatum((int64) fctx->record[i].blocknum); + values[6] = Int32GetDatum(fctx->record[i].accesscount); + + /* Build and return the tuple. */ + tuple = heap_form_tuple(fctx->tupdesc, values, nulls); + result = HeapTupleGetDatum(tuple); + + SRF_RETURN_NEXT(funcctx, result); + } + else + SRF_RETURN_DONE(funcctx); +} diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 1aba2e1ede..5f134e3924 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -516,4 +516,5 @@ pg_init_libpagestore(void) smgr_init_hook = smgr_init_neon; dbsize_hook = neon_dbsize; } + lfc_init(); } diff --git a/pgxn/neon/neon--1.0.sql b/pgxn/neon/neon--1.0.sql index 58b98a5923..6cf111ea6a 100644 --- a/pgxn/neon/neon--1.0.sql +++ b/pgxn/neon/neon--1.0.sql @@ -22,3 +22,13 @@ AS 'MODULE_PATHNAME', 'backpressure_throttling_time' LANGUAGE C STRICT PARALLEL UNSAFE; +CREATE FUNCTION local_cache_pages() +RETURNS SETOF RECORD +AS 'MODULE_PATHNAME', 'local_cache_pages' +LANGUAGE C PARALLEL SAFE; + +-- Create a view for convenient access. +CREATE VIEW local_cache AS + SELECT P.* FROM local_cache_pages() AS P + (pageoffs int8, relfilenode oid, reltablespace oid, reldatabase oid, + relforknumber int2, relblocknumber int8, accesscount int4); diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 170a0cb72d..831756b849 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -203,4 +203,11 @@ extern void set_cached_relsize(RelFileNode rnode, ForkNumber forknum, BlockNumbe extern void update_cached_relsize(RelFileNode rnode, ForkNumber forknum, BlockNumber size); extern void forget_cached_relsize(RelFileNode rnode, ForkNumber forknum); +/* functions for local file cache */ +extern void lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, char *buffer); +extern bool lfc_read(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, char *buffer); +extern bool lfc_cache_contains(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno); +extern void lfc_init(void); + + #endif diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 900f44ca10..0b34cb3ca9 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -1684,6 +1684,8 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, forkNum, blkno, (uint32) (lsn >> 32), (uint32) lsn); + lfc_write(reln->smgr_rnode.node, forkNum, blkno, buffer); + #ifdef DEBUG_COMPARE_LOCAL if (IS_LOCAL_REL(reln)) mdextend(reln, forkNum, blkno, buffer, skipFsync); @@ -1757,6 +1759,9 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } + if (lfc_cache_contains(reln->smgr_rnode.node, forknum, blocknum)) + return false; + tag = (BufferTag) { .rnode = reln->smgr_rnode.node, .forkNum = forknum, @@ -1899,6 +1904,7 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, { case T_NeonGetPageResponse: memcpy(buffer, ((NeonGetPageResponse *) resp)->page, BLCKSZ); + lfc_write(rnode, forkNum, blkno, buffer); break; case T_NeonErrorResponse: @@ -1950,6 +1956,12 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } + /* Try to read from local file cache */ + if (lfc_read(reln->smgr_rnode.node, forkNum, blkno, buffer)) + { + return; + } + request_lsn = neon_get_request_lsn(&latest, reln->smgr_rnode.node, forkNum, blkno); neon_read_at_lsn(reln->smgr_rnode.node, forkNum, blkno, request_lsn, latest, buffer); @@ -2111,6 +2123,8 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, forknum, blocknum, (uint32) (lsn >> 32), (uint32) lsn); + lfc_write(reln->smgr_rnode.node, forknum, blocknum, buffer); + #ifdef DEBUG_COMPARE_LOCAL if (IS_LOCAL_REL(reln)) mdwrite(reln, forknum, blocknum, buffer, skipFsync);