Add more guards for prefetch_pump_state (#11859)

## Problem

See https://neondb.slack.com/archives/C08PJ07BZ44/p1746566292750689

Looks like there are more cases when `prefetch_pump_state` can be called
in unexpected place and cause core dump.

## Summary of changes

Add more guards.

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
This commit is contained in:
Konstantin Knizhnik
2025-05-09 12:07:08 +03:00
committed by GitHub
parent 5cd7f936f9
commit 03d635b916
5 changed files with 34 additions and 28 deletions

View File

@@ -425,15 +425,12 @@ compact_prefetch_buffers(void)
* point inside and outside PostgreSQL.
*
* This still does throw errors when it receives malformed responses from PS.
*
* When we're not called from CHECK_FOR_INTERRUPTS (indicated by
* IsHandlingInterrupts) we also report we've ended prefetch receive work,
* just in case state tracking was lost due to an error in the sync getPage
* response code.
*/
void
communicator_prefetch_pump_state(bool IsHandlingInterrupts)
communicator_prefetch_pump_state(void)
{
START_PREFETCH_RECEIVE_WORK();
while (MyPState->ring_receive != MyPState->ring_flush)
{
NeonResponse *response;
@@ -482,8 +479,6 @@ communicator_prefetch_pump_state(bool IsHandlingInterrupts)
}
}
/* We never pump the prefetch state while handling other pages */
if (!IsHandlingInterrupts)
END_PREFETCH_RECEIVE_WORK();
communicator_reconfigure_timeout_if_needed();
@@ -672,9 +667,10 @@ prefetch_wait_for(uint64 ring_index)
Assert(MyPState->ring_unused > ring_index);
START_PREFETCH_RECEIVE_WORK();
while (MyPState->ring_receive <= ring_index)
{
START_PREFETCH_RECEIVE_WORK();
entry = GetPrfSlot(MyPState->ring_receive);
Assert(entry->status == PRFS_REQUESTED);
@@ -683,17 +679,18 @@ prefetch_wait_for(uint64 ring_index)
result = false;
break;
}
END_PREFETCH_RECEIVE_WORK();
CHECK_FOR_INTERRUPTS();
}
if (result)
{
/* Check that slot is actually received (srver can be disconnected in prefetch_pump_state called from CHECK_FOR_INTERRUPTS */
PrefetchRequest *slot = GetPrfSlot(ring_index);
return slot->status == PRFS_RECEIVED;
result = slot->status == PRFS_RECEIVED;
}
return false;
END_PREFETCH_RECEIVE_WORK();
return result;
;
}
@@ -720,6 +717,7 @@ prefetch_read(PrefetchRequest *slot)
Assert(slot->status == PRFS_REQUESTED);
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_receive);
Assert(readpage_reentrant_guard);
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
@@ -802,6 +800,7 @@ communicator_prefetch_receive(BufferTag tag)
PrfHashEntry *entry;
PrefetchRequest hashkey;
Assert(readpage_reentrant_guard);
hashkey.buftag = tag;
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
if (entry != NULL && prefetch_wait_for(entry->slot->my_ring_index))
@@ -821,8 +820,12 @@ communicator_prefetch_receive(BufferTag tag)
void
prefetch_on_ps_disconnect(void)
{
bool save_readpage_reentrant_guard = readpage_reentrant_guard;
MyPState->ring_flush = MyPState->ring_unused;
/* Prohibit callig of prefetch_pump_state */
START_PREFETCH_RECEIVE_WORK();
while (MyPState->ring_receive < MyPState->ring_unused)
{
PrefetchRequest *slot;
@@ -851,6 +854,9 @@ prefetch_on_ps_disconnect(void)
MyNeonCounters->getpage_prefetch_discards_total += 1;
}
/* Restore guard */
readpage_reentrant_guard = save_readpage_reentrant_guard;
/*
* We can have gone into retry due to network error, so update stats with
* the latest available
@@ -2509,7 +2515,7 @@ communicator_processinterrupts(void)
if (timeout_signaled)
{
if (!readpage_reentrant_guard && readahead_getpage_pull_timeout_ms > 0)
communicator_prefetch_pump_state(true);
communicator_prefetch_pump_state();
timeout_signaled = false;
communicator_reconfigure_timeout_if_needed();

View File

@@ -44,7 +44,7 @@ extern int communicator_read_slru_segment(SlruKind kind, int64 segno,
void *buffer);
extern void communicator_reconfigure_timeout_if_needed(void);
extern void communicator_prefetch_pump_state(bool IsHandlingInterrupts);
extern void communicator_prefetch_pump_state(void);
#endif

View File

@@ -1179,7 +1179,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
blocknum += iterblocks;
}
communicator_prefetch_pump_state(false);
communicator_prefetch_pump_state();
return false;
}
@@ -1218,7 +1218,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
communicator_prefetch_register_bufferv(tag, NULL, 1, NULL);
communicator_prefetch_pump_state(false);
communicator_prefetch_pump_state();
return false;
}
@@ -1262,7 +1262,7 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
*/
neon_log(SmgrTrace, "writeback noop");
communicator_prefetch_pump_state(false);
communicator_prefetch_pump_state();
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
@@ -1315,7 +1315,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
}
/* Try to read PS results if they are available */
communicator_prefetch_pump_state(false);
communicator_prefetch_pump_state();
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1);
@@ -1339,7 +1339,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
/*
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
*/
communicator_prefetch_pump_state(false);
communicator_prefetch_pump_state();
#ifdef DEBUG_COMPARE_LOCAL
if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
@@ -1449,7 +1449,7 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
nblocks, PG_IOV_MAX);
/* Try to read PS results if they are available */
communicator_prefetch_pump_state(false);
communicator_prefetch_pump_state();
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum,
request_lsns, nblocks);
@@ -1480,7 +1480,7 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
/*
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
*/
communicator_prefetch_pump_state(false);
communicator_prefetch_pump_state();
#ifdef DEBUG_COMPARE_LOCAL
if (forknum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
@@ -1665,7 +1665,7 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
lfc_write(InfoFromSMgrRel(reln), forknum, blocknum, buffer);
communicator_prefetch_pump_state(false);
communicator_prefetch_pump_state();
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
@@ -1727,7 +1727,7 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
lfc_writev(InfoFromSMgrRel(reln), forknum, blkno, buffers, nblocks);
communicator_prefetch_pump_state(false);
communicator_prefetch_pump_state();
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
@@ -1902,7 +1902,7 @@ neon_immedsync(SMgrRelation reln, ForkNumber forknum)
neon_log(SmgrTrace, "[NEON_SMGR] immedsync noop");
communicator_prefetch_pump_state(false);
communicator_prefetch_pump_state();
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))

View File

@@ -5,7 +5,7 @@
],
"v16": [
"16.8",
"05ddf212e2e07b788b5c8b88bdcf98630941f6ae"
"d72d76f2cdee4194dd052ce099e9784aca7c794a"
],
"v15": [
"15.12",