diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index cacccf9123..7a6936a740 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -526,7 +526,7 @@ _PG_init(void) lfc_init(); pg_init_prewarm(); pg_init_walproposer(); - init_lwlsncache(); + pg_init_lwlsncache(); pg_init_communicator_process(); diff --git a/pgxn/neon/neon_lwlsncache.c b/pgxn/neon/neon_lwlsncache.c index 5887c02c36..16935edf10 100644 --- a/pgxn/neon/neon_lwlsncache.c +++ b/pgxn/neon/neon_lwlsncache.c @@ -85,12 +85,54 @@ static set_lwlsn_db_hook_type prev_set_lwlsn_db_hook = NULL; static void neon_set_max_lwlsn(XLogRecPtr lsn); void -init_lwlsncache(void) +pg_init_lwlsncache(void) { if (!process_shared_preload_libraries_in_progress) ereport(ERROR, errcode(ERRCODE_INTERNAL_ERROR), errmsg("Loading of shared preload libraries is not in progress. Exiting")); lwlc_register_gucs(); +} + + +void +LwLsnCacheShmemRequest(void) +{ + Size requested_size; + + if (neon_use_communicator_worker) + return; + + requested_size = sizeof(LwLsnCacheCtl); + requested_size += hash_estimate_size(lwlsn_cache_size, sizeof(LastWrittenLsnCacheEntry)); + + RequestAddinShmemSpace(requested_size); +} + +void +LwLsnCacheShmemInit(void) +{ + static HASHCTL info; + bool found; + + if (neon_use_communicator_worker) + return; + + Assert(lwlsn_cache_size > 0); + + info.keysize = sizeof(BufferTag); + info.entrysize = sizeof(LastWrittenLsnCacheEntry); + lastWrittenLsnCache = ShmemInitHash("last_written_lsn_cache", + lwlsn_cache_size, lwlsn_cache_size, + &info, + HASH_ELEM | HASH_BLOBS); + LwLsnCache = ShmemInitStruct("neon/LwLsnCacheCtl", sizeof(LwLsnCacheCtl), &found); + // Now set the size in the struct + LwLsnCache->lastWrittenLsnCacheSize = lwlsn_cache_size; + if (found) { + return; + } + dlist_init(&LwLsnCache->lastWrittenLsnLRU); + LwLsnCache->maxLastWrittenLsn = GetRedoRecPtr(); prev_set_lwlsn_block_range_hook = set_lwlsn_block_range_hook; set_lwlsn_block_range_hook = neon_set_lwlsn_block_range; @@ -106,41 +148,6 @@ init_lwlsncache(void) set_lwlsn_db_hook = neon_set_lwlsn_db; } - -void -LwLsnCacheShmemRequest(void) -{ - Size requested_size = sizeof(LwLsnCacheCtl); - - requested_size += hash_estimate_size(lwlsn_cache_size, sizeof(LastWrittenLsnCacheEntry)); - - RequestAddinShmemSpace(requested_size); -} - -void -LwLsnCacheShmemInit(void) -{ - static HASHCTL info; - bool found; - if (lwlsn_cache_size > 0) - { - info.keysize = sizeof(BufferTag); - info.entrysize = sizeof(LastWrittenLsnCacheEntry); - lastWrittenLsnCache = ShmemInitHash("last_written_lsn_cache", - lwlsn_cache_size, lwlsn_cache_size, - &info, - HASH_ELEM | HASH_BLOBS); - LwLsnCache = ShmemInitStruct("neon/LwLsnCacheCtl", sizeof(LwLsnCacheCtl), &found); - // Now set the size in the struct - LwLsnCache->lastWrittenLsnCacheSize = lwlsn_cache_size; - if (found) { - return; - } - } - dlist_init(&LwLsnCache->lastWrittenLsnLRU); - LwLsnCache->maxLastWrittenLsn = GetRedoRecPtr(); -} - /* * neon_get_lwlsn -- Returns maximal LSN of written page. * It returns an upper bound for the last written LSN of a given page, @@ -155,6 +162,7 @@ neon_get_lwlsn(NRelFileInfo rlocator, ForkNumber forknum, BlockNumber blkno) XLogRecPtr lsn; LastWrittenLsnCacheEntry* entry; + Assert(!neon_use_communicator_worker); Assert(LwLsnCache->lastWrittenLsnCacheSize != 0); LWLockAcquire(LastWrittenLsnLock, LW_SHARED); @@ -207,7 +215,10 @@ neon_get_lwlsn(NRelFileInfo rlocator, ForkNumber forknum, BlockNumber blkno) return lsn; } -static void neon_set_max_lwlsn(XLogRecPtr lsn) { +static void +neon_set_max_lwlsn(XLogRecPtr lsn) +{ + Assert(!neon_use_communicator_worker); LWLockAcquire(LastWrittenLsnLock, LW_EXCLUSIVE); LwLsnCache->maxLastWrittenLsn = lsn; LWLockRelease(LastWrittenLsnLock); @@ -228,6 +239,7 @@ neon_get_lwlsn_v(NRelFileInfo relfilenode, ForkNumber forknum, LastWrittenLsnCacheEntry* entry; XLogRecPtr lsn; + Assert(!neon_use_communicator_worker); Assert(LwLsnCache->lastWrittenLsnCacheSize != 0); Assert(nblocks > 0); Assert(PointerIsValid(lsns)); @@ -376,6 +388,8 @@ SetLastWrittenLSNForBlockRangeInternal(XLogRecPtr lsn, XLogRecPtr neon_set_lwlsn_block_range(XLogRecPtr lsn, NRelFileInfo rlocator, ForkNumber forknum, BlockNumber from, BlockNumber n_blocks) { + Assert(!neon_use_communicator_worker); + if (lsn == InvalidXLogRecPtr || n_blocks == 0 || LwLsnCache->lastWrittenLsnCacheSize == 0) return lsn; @@ -412,6 +426,8 @@ neon_set_lwlsn_block_v(const XLogRecPtr *lsns, NRelFileInfo relfilenode, Oid dbOid = NInfoGetDbOid(relfilenode); Oid relNumber = NInfoGetRelNumber(relfilenode); + Assert(!neon_use_communicator_worker); + if (lsns == NULL || nblocks == 0 || LwLsnCache->lastWrittenLsnCacheSize == 0 || NInfoGetRelNumber(relfilenode) == InvalidOid) return InvalidXLogRecPtr; @@ -469,6 +485,7 @@ neon_set_lwlsn_block_v(const XLogRecPtr *lsns, NRelFileInfo relfilenode, XLogRecPtr neon_set_lwlsn_block(XLogRecPtr lsn, NRelFileInfo rlocator, ForkNumber forknum, BlockNumber blkno) { + Assert(!neon_use_communicator_worker); return neon_set_lwlsn_block_range(lsn, rlocator, forknum, blkno, 1); } @@ -478,6 +495,7 @@ neon_set_lwlsn_block(XLogRecPtr lsn, NRelFileInfo rlocator, ForkNumber forknum, XLogRecPtr neon_set_lwlsn_relation(XLogRecPtr lsn, NRelFileInfo rlocator, ForkNumber forknum) { + Assert(!neon_use_communicator_worker); return neon_set_lwlsn_block(lsn, rlocator, forknum, REL_METADATA_PSEUDO_BLOCKNO); } @@ -488,6 +506,8 @@ XLogRecPtr neon_set_lwlsn_db(XLogRecPtr lsn) { NRelFileInfo dummyNode = {InvalidOid, InvalidOid, InvalidOid}; + + Assert(!neon_use_communicator_worker); return neon_set_lwlsn_block(lsn, dummyNode, MAIN_FORKNUM, 0); } diff --git a/pgxn/neon/neon_lwlsncache.h b/pgxn/neon/neon_lwlsncache.h index acb5561c0c..e022e7a998 100644 --- a/pgxn/neon/neon_lwlsncache.h +++ b/pgxn/neon/neon_lwlsncache.h @@ -3,7 +3,7 @@ #include "neon_pgversioncompat.h" -void init_lwlsncache(void); +extern void pg_init_lwlsncache(void); /* Hooks */ XLogRecPtr neon_get_lwlsn(NRelFileInfo rlocator, ForkNumber forknum, BlockNumber blkno); @@ -14,4 +14,4 @@ XLogRecPtr neon_set_lwlsn_block(XLogRecPtr lsn, NRelFileInfo rlocator, ForkNumbe XLogRecPtr neon_set_lwlsn_relation(XLogRecPtr lsn, NRelFileInfo rlocator, ForkNumber forknum); XLogRecPtr neon_set_lwlsn_db(XLogRecPtr lsn); -#endif /* NEON_LWLSNCACHE_H */ \ No newline at end of file +#endif /* NEON_LWLSNCACHE_H */ diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index a8ee6ea92d..87f4444b39 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -302,7 +302,7 @@ neon_wallog_pagev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, */ lsns[batch_size++] = lsn; - if (batch_size >= BLOCK_BATCH_SIZE) + if (batch_size >= BLOCK_BATCH_SIZE && !neon_use_communicator_worker) { neon_set_lwlsn_block_v(lsns, InfoFromSMgrRel(reln), forknum, batch_blockno, @@ -312,7 +312,7 @@ neon_wallog_pagev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, } } - if (batch_size != 0) + if (batch_size != 0 && !neon_use_communicator_worker) { neon_set_lwlsn_block_v(lsns, InfoFromSMgrRel(reln), forknum, batch_blockno, @@ -564,6 +564,7 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, { XLogRecPtr last_written_lsns[PG_IOV_MAX]; + Assert(!neon_use_communicator_worker); Assert(nblocks <= PG_IOV_MAX); neon_get_lwlsn_v(rinfo, forknum, blkno, (int) nblocks, last_written_lsns); @@ -987,6 +988,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, #endif { XLogRecPtr lsn; + bool lsn_was_zero; BlockNumber n_blocks = 0; switch (reln->smgr_relpersistence) @@ -1051,9 +1053,19 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, forkNum, blkno, (uint32) (lsn >> 32), (uint32) lsn); + /* + * smgr_extend is often called with an all-zeroes page, so + * lsn==InvalidXLogRecPtr. An smgr_write() call will come for the buffer + * later, after it has been initialized with the real page contents, and + * it is eventually evicted from the buffer cache. But we need a valid LSN + * to the relation metadata update now. + */ + lsn_was_zero = (lsn == InvalidXLogRecPtr); + if (lsn_was_zero) + lsn = GetXLogInsertRecPtr(); + if (neon_use_communicator_worker) { - // FIXME: this can pass lsn == invalid. Is that ok? communicator_new_rel_extend(InfoFromSMgrRel(reln), forkNum, blkno, (const void *) buffer, lsn); if (debug_compare_local) @@ -1080,11 +1092,8 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, * it is eventually evicted from the buffer cache. But we need a valid LSN * to the relation metadata update now. */ - if (lsn == InvalidXLogRecPtr) - { - lsn = GetXLogInsertRecPtr(); + if (lsn_was_zero) neon_set_lwlsn_block(lsn, InfoFromSMgrRel(reln), forkNum, blkno); - } neon_set_lwlsn_relation(lsn, InfoFromSMgrRel(reln), forkNum); } } @@ -2667,16 +2676,21 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id) */ if (no_redo_needed) { - neon_set_lwlsn_block(end_recptr, rinfo, forknum, blkno); /* * Redo changes if page exists in LFC. * We should perform this check after assigning LwLSN to prevent * prefetching of some older version of the page by some other backend. */ if (neon_use_communicator_worker) + { no_redo_needed = communicator_new_cache_contains(rinfo, forknum, blkno); + // FIXME: update lwlsn + } else + { no_redo_needed = !lfc_cache_contains(rinfo, forknum, blkno); + neon_set_lwlsn_block(end_recptr, rinfo, forknum, blkno); + } } LWLockRelease(partitionLock);