Compare commits

...

4 Commits

Author SHA1 Message Date
Konstantin Knizhnik
d05cefaae1 Fix mistyping 2025-06-12 08:07:17 +03:00
Konstantin Knizhnik
fad69227d6 Fix bug in neon_redo_read_buffer_filter 2025-06-11 20:20:54 +03:00
Konstantin Knizhnik
4a397638bf Prohibit partial redo of wal records: if record affects several pages then either all of them are reconstructed, either all skipped 2025-06-11 18:19:04 +03:00
Konstantin Knizhnik
b34648c136 Remove XLogWaitForReplayOf at replica to avoid deadlock 2025-06-11 17:56:25 +03:00
2 changed files with 76 additions and 58 deletions

View File

@@ -2085,9 +2085,6 @@ communicator_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber ba
start_ts = GetCurrentTimestamp();
if (RecoveryInProgress() && MyBackendType != B_STARTUP)
XLogWaitForReplayOf(reqlsns->request_lsn);
/*
* Try to find prefetched page in the list of received pages.
*/

View File

@@ -2382,6 +2382,8 @@ get_fsm_physical_block(BlockNumber heapblk)
static bool
neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
{
static XLogRecPtr last_recptr = InvalidXLogRecPtr;
static bool last_no_redo_needed;
XLogRecPtr end_recptr = record->EndRecPtr;
NRelFileInfo rinfo;
ForkNumber forknum;
@@ -2390,69 +2392,88 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
uint32 hash;
LWLock *partitionLock;
int buf_id;
bool no_redo_needed;
bool no_redo_needed = true;
if (old_redo_read_buffer_filter && old_redo_read_buffer_filter(record, block_id))
return true;
#if PG_VERSION_NUM < 150000
if (!XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno))
neon_log(PANIC, "failed to locate backup block with ID %d", block_id);
#else
XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno);
#endif
CopyNRelFileInfoToBufTag(tag, rinfo);
tag.forkNum = forknum;
tag.blockNum = blkno;
hash = BufTableHashCode(&tag);
partitionLock = BufMappingPartitionLock(hash);
/*
* Lock the partition of shared_buffers so that it can't be updated
* concurrently.
*/
LWLockAcquire(partitionLock, LW_SHARED);
/*
* Out of an abundance of caution, we always run redo on shared catalogs,
* regardless of whether the block is stored in shared buffers. See also
* this function's top comment.
*/
if (!OidIsValid(NInfoGetDbOid(rinfo)))
if (last_recptr != end_recptr)
{
no_redo_needed = false;
#if PG_VERSION_NUM < 150000
int max_block_id = record->max_block_id;
#else
int max_block_id = XLogRecMaxBlockId(record);
#endif
for (int block_id = 0; block_id <= max_block_id && no_redo_needed; block_id++)
{
if (XLogRecHasBlockRef(record, block_id))
{
#if PG_VERSION_NUM < 150000
if (!XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno))
neon_log(PANIC, "failed to locate backup block with ID %d", block_id);
#else
XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno);
#endif
CopyNRelFileInfoToBufTag(tag, rinfo);
tag.forkNum = forknum;
tag.blockNum = blkno;
hash = BufTableHashCode(&tag);
partitionLock = BufMappingPartitionLock(hash);
/*
* Lock the partition of shared_buffers so that it can't be updated
* concurrently.
*/
LWLockAcquire(partitionLock, LW_SHARED);
/*
* Out of an abundance of caution, we always run redo on shared catalogs,
* regardless of whether the block is stored in shared buffers. See also
* this function's top comment.
*/
if (!OidIsValid(NInfoGetDbOid(rinfo)))
{
no_redo_needed = false;
}
else
{
/* Try to find the relevant buffer */
buf_id = BufTableLookup(&tag, hash);
no_redo_needed = buf_id < 0;
}
/*
* we don't have the buffer in memory, update lwLsn past this record, also
* evict page from file cache
*/
if (no_redo_needed)
{
neon_set_lwlsn_block(end_recptr, rinfo, forknum, blkno);
/*
* Redo changes if page exists in LFC.
* We should perform this check after assigning LwLSN to prevent
* prefetching of some older version of the page by some other backend.
*/
no_redo_needed = !lfc_cache_contains(rinfo, forknum, blkno);
}
LWLockRelease(partitionLock);
neon_extend_rel_size(rinfo, forknum, blkno, end_recptr);
if (forknum == MAIN_FORKNUM)
{
neon_extend_rel_size(rinfo, FSM_FORKNUM, get_fsm_physical_block(blkno), end_recptr);
}
}
}
last_recptr = end_recptr;
last_no_redo_needed = no_redo_needed;
}
else
{
/* Try to find the relevant buffer */
buf_id = BufTableLookup(&tag, hash);
no_redo_needed = buf_id < 0;
}
/*
* we don't have the buffer in memory, update lwLsn past this record, also
* evict page from file cache
*/
if (no_redo_needed)
{
neon_set_lwlsn_block(end_recptr, rinfo, forknum, blkno);
/*
* Redo changes if page exists in LFC.
* We should perform this check after assigning LwLSN to prevent
* prefetching of some older version of the page by some other backend.
*/
no_redo_needed = !lfc_cache_contains(rinfo, forknum, blkno);
}
LWLockRelease(partitionLock);
neon_extend_rel_size(rinfo, forknum, blkno, end_recptr);
if (forknum == MAIN_FORKNUM)
{
neon_extend_rel_size(rinfo, FSM_FORKNUM, get_fsm_physical_block(blkno), end_recptr);
no_redo_needed = last_no_redo_needed;
}
return no_redo_needed;
}