Don't update the legacy last-written LSN cache with new communicator

The new communicator has its own tracking
This commit is contained in:
Heikki Linnakangas
2025-07-30 17:31:51 +03:00
parent 95ef69ca95
commit fca52af7e3
4 changed files with 82 additions and 48 deletions

View File

@@ -526,7 +526,7 @@ _PG_init(void)
lfc_init();
pg_init_prewarm();
pg_init_walproposer();
init_lwlsncache();
pg_init_lwlsncache();
pg_init_communicator_process();

View File

@@ -85,12 +85,54 @@ static set_lwlsn_db_hook_type prev_set_lwlsn_db_hook = NULL;
static void neon_set_max_lwlsn(XLogRecPtr lsn);
void
init_lwlsncache(void)
pg_init_lwlsncache(void)
{
if (!process_shared_preload_libraries_in_progress)
ereport(ERROR, errcode(ERRCODE_INTERNAL_ERROR), errmsg("Loading of shared preload libraries is not in progress. Exiting"));
lwlc_register_gucs();
}
void
LwLsnCacheShmemRequest(void)
{
Size requested_size;
if (neon_use_communicator_worker)
return;
requested_size = sizeof(LwLsnCacheCtl);
requested_size += hash_estimate_size(lwlsn_cache_size, sizeof(LastWrittenLsnCacheEntry));
RequestAddinShmemSpace(requested_size);
}
void
LwLsnCacheShmemInit(void)
{
static HASHCTL info;
bool found;
if (neon_use_communicator_worker)
return;
Assert(lwlsn_cache_size > 0);
info.keysize = sizeof(BufferTag);
info.entrysize = sizeof(LastWrittenLsnCacheEntry);
lastWrittenLsnCache = ShmemInitHash("last_written_lsn_cache",
lwlsn_cache_size, lwlsn_cache_size,
&info,
HASH_ELEM | HASH_BLOBS);
LwLsnCache = ShmemInitStruct("neon/LwLsnCacheCtl", sizeof(LwLsnCacheCtl), &found);
// Now set the size in the struct
LwLsnCache->lastWrittenLsnCacheSize = lwlsn_cache_size;
if (found) {
return;
}
dlist_init(&LwLsnCache->lastWrittenLsnLRU);
LwLsnCache->maxLastWrittenLsn = GetRedoRecPtr();
prev_set_lwlsn_block_range_hook = set_lwlsn_block_range_hook;
set_lwlsn_block_range_hook = neon_set_lwlsn_block_range;
@@ -106,41 +148,6 @@ init_lwlsncache(void)
set_lwlsn_db_hook = neon_set_lwlsn_db;
}
void
LwLsnCacheShmemRequest(void)
{
Size requested_size = sizeof(LwLsnCacheCtl);
requested_size += hash_estimate_size(lwlsn_cache_size, sizeof(LastWrittenLsnCacheEntry));
RequestAddinShmemSpace(requested_size);
}
void
LwLsnCacheShmemInit(void)
{
static HASHCTL info;
bool found;
if (lwlsn_cache_size > 0)
{
info.keysize = sizeof(BufferTag);
info.entrysize = sizeof(LastWrittenLsnCacheEntry);
lastWrittenLsnCache = ShmemInitHash("last_written_lsn_cache",
lwlsn_cache_size, lwlsn_cache_size,
&info,
HASH_ELEM | HASH_BLOBS);
LwLsnCache = ShmemInitStruct("neon/LwLsnCacheCtl", sizeof(LwLsnCacheCtl), &found);
// Now set the size in the struct
LwLsnCache->lastWrittenLsnCacheSize = lwlsn_cache_size;
if (found) {
return;
}
}
dlist_init(&LwLsnCache->lastWrittenLsnLRU);
LwLsnCache->maxLastWrittenLsn = GetRedoRecPtr();
}
/*
* neon_get_lwlsn -- Returns maximal LSN of written page.
* It returns an upper bound for the last written LSN of a given page,
@@ -155,6 +162,7 @@ neon_get_lwlsn(NRelFileInfo rlocator, ForkNumber forknum, BlockNumber blkno)
XLogRecPtr lsn;
LastWrittenLsnCacheEntry* entry;
Assert(!neon_use_communicator_worker);
Assert(LwLsnCache->lastWrittenLsnCacheSize != 0);
LWLockAcquire(LastWrittenLsnLock, LW_SHARED);
@@ -207,7 +215,10 @@ neon_get_lwlsn(NRelFileInfo rlocator, ForkNumber forknum, BlockNumber blkno)
return lsn;
}
static void neon_set_max_lwlsn(XLogRecPtr lsn) {
static void
neon_set_max_lwlsn(XLogRecPtr lsn)
{
Assert(!neon_use_communicator_worker);
LWLockAcquire(LastWrittenLsnLock, LW_EXCLUSIVE);
LwLsnCache->maxLastWrittenLsn = lsn;
LWLockRelease(LastWrittenLsnLock);
@@ -228,6 +239,7 @@ neon_get_lwlsn_v(NRelFileInfo relfilenode, ForkNumber forknum,
LastWrittenLsnCacheEntry* entry;
XLogRecPtr lsn;
Assert(!neon_use_communicator_worker);
Assert(LwLsnCache->lastWrittenLsnCacheSize != 0);
Assert(nblocks > 0);
Assert(PointerIsValid(lsns));
@@ -376,6 +388,8 @@ SetLastWrittenLSNForBlockRangeInternal(XLogRecPtr lsn,
XLogRecPtr
neon_set_lwlsn_block_range(XLogRecPtr lsn, NRelFileInfo rlocator, ForkNumber forknum, BlockNumber from, BlockNumber n_blocks)
{
Assert(!neon_use_communicator_worker);
if (lsn == InvalidXLogRecPtr || n_blocks == 0 || LwLsnCache->lastWrittenLsnCacheSize == 0)
return lsn;
@@ -412,6 +426,8 @@ neon_set_lwlsn_block_v(const XLogRecPtr *lsns, NRelFileInfo relfilenode,
Oid dbOid = NInfoGetDbOid(relfilenode);
Oid relNumber = NInfoGetRelNumber(relfilenode);
Assert(!neon_use_communicator_worker);
if (lsns == NULL || nblocks == 0 || LwLsnCache->lastWrittenLsnCacheSize == 0 ||
NInfoGetRelNumber(relfilenode) == InvalidOid)
return InvalidXLogRecPtr;
@@ -469,6 +485,7 @@ neon_set_lwlsn_block_v(const XLogRecPtr *lsns, NRelFileInfo relfilenode,
XLogRecPtr
neon_set_lwlsn_block(XLogRecPtr lsn, NRelFileInfo rlocator, ForkNumber forknum, BlockNumber blkno)
{
Assert(!neon_use_communicator_worker);
return neon_set_lwlsn_block_range(lsn, rlocator, forknum, blkno, 1);
}
@@ -478,6 +495,7 @@ neon_set_lwlsn_block(XLogRecPtr lsn, NRelFileInfo rlocator, ForkNumber forknum,
XLogRecPtr
neon_set_lwlsn_relation(XLogRecPtr lsn, NRelFileInfo rlocator, ForkNumber forknum)
{
Assert(!neon_use_communicator_worker);
return neon_set_lwlsn_block(lsn, rlocator, forknum, REL_METADATA_PSEUDO_BLOCKNO);
}
@@ -488,6 +506,8 @@ XLogRecPtr
neon_set_lwlsn_db(XLogRecPtr lsn)
{
NRelFileInfo dummyNode = {InvalidOid, InvalidOid, InvalidOid};
Assert(!neon_use_communicator_worker);
return neon_set_lwlsn_block(lsn, dummyNode, MAIN_FORKNUM, 0);
}

View File

@@ -3,7 +3,7 @@
#include "neon_pgversioncompat.h"
void init_lwlsncache(void);
extern void pg_init_lwlsncache(void);
/* Hooks */
XLogRecPtr neon_get_lwlsn(NRelFileInfo rlocator, ForkNumber forknum, BlockNumber blkno);
@@ -14,4 +14,4 @@ XLogRecPtr neon_set_lwlsn_block(XLogRecPtr lsn, NRelFileInfo rlocator, ForkNumbe
XLogRecPtr neon_set_lwlsn_relation(XLogRecPtr lsn, NRelFileInfo rlocator, ForkNumber forknum);
XLogRecPtr neon_set_lwlsn_db(XLogRecPtr lsn);
#endif /* NEON_LWLSNCACHE_H */
#endif /* NEON_LWLSNCACHE_H */

View File

@@ -302,7 +302,7 @@ neon_wallog_pagev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
*/
lsns[batch_size++] = lsn;
if (batch_size >= BLOCK_BATCH_SIZE)
if (batch_size >= BLOCK_BATCH_SIZE && !neon_use_communicator_worker)
{
neon_set_lwlsn_block_v(lsns, InfoFromSMgrRel(reln), forknum,
batch_blockno,
@@ -312,7 +312,7 @@ neon_wallog_pagev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
}
}
if (batch_size != 0)
if (batch_size != 0 && !neon_use_communicator_worker)
{
neon_set_lwlsn_block_v(lsns, InfoFromSMgrRel(reln), forknum,
batch_blockno,
@@ -564,6 +564,7 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
{
XLogRecPtr last_written_lsns[PG_IOV_MAX];
Assert(!neon_use_communicator_worker);
Assert(nblocks <= PG_IOV_MAX);
neon_get_lwlsn_v(rinfo, forknum, blkno, (int) nblocks, last_written_lsns);
@@ -987,6 +988,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
#endif
{
XLogRecPtr lsn;
bool lsn_was_zero;
BlockNumber n_blocks = 0;
switch (reln->smgr_relpersistence)
@@ -1051,9 +1053,19 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
forkNum, blkno,
(uint32) (lsn >> 32), (uint32) lsn);
/*
* smgr_extend is often called with an all-zeroes page, so
* lsn==InvalidXLogRecPtr. An smgr_write() call will come for the buffer
* later, after it has been initialized with the real page contents, and
* it is eventually evicted from the buffer cache. But we need a valid LSN
* to the relation metadata update now.
*/
lsn_was_zero = (lsn == InvalidXLogRecPtr);
if (lsn_was_zero)
lsn = GetXLogInsertRecPtr();
if (neon_use_communicator_worker)
{
// FIXME: this can pass lsn == invalid. Is that ok?
communicator_new_rel_extend(InfoFromSMgrRel(reln), forkNum, blkno, (const void *) buffer, lsn);
if (debug_compare_local)
@@ -1080,11 +1092,8 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
* it is eventually evicted from the buffer cache. But we need a valid LSN
* to the relation metadata update now.
*/
if (lsn == InvalidXLogRecPtr)
{
lsn = GetXLogInsertRecPtr();
if (lsn_was_zero)
neon_set_lwlsn_block(lsn, InfoFromSMgrRel(reln), forkNum, blkno);
}
neon_set_lwlsn_relation(lsn, InfoFromSMgrRel(reln), forkNum);
}
}
@@ -2667,16 +2676,21 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
*/
if (no_redo_needed)
{
neon_set_lwlsn_block(end_recptr, rinfo, forknum, blkno);
/*
* Redo changes if page exists in LFC.
* We should perform this check after assigning LwLSN to prevent
* prefetching of some older version of the page by some other backend.
*/
if (neon_use_communicator_worker)
{
no_redo_needed = communicator_new_cache_contains(rinfo, forknum, blkno);
// FIXME: update lwlsn
}
else
{
no_redo_needed = !lfc_cache_contains(rinfo, forknum, blkno);
neon_set_lwlsn_block(end_recptr, rinfo, forknum, blkno);
}
}
LWLockRelease(partitionLock);