From 03d635b916ed057826d80bbc709864acb1c108f1 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Fri, 9 May 2025 12:07:08 +0300 Subject: [PATCH] Add more guards for prefetch_pump_state (#11859) ## Problem See https://neondb.slack.com/archives/C08PJ07BZ44/p1746566292750689 Looks like there are more cases when `prefetch_pump_state` can be called in unexpected place and cause core dump. ## Summary of changes Add more guards. --------- Co-authored-by: Konstantin Knizhnik --- pgxn/neon/communicator.c | 36 +++++++++++++++++++++--------------- pgxn/neon/communicator.h | 2 +- pgxn/neon/pagestore_smgr.c | 20 ++++++++++---------- vendor/postgres-v16 | 2 +- vendor/revisions.json | 2 +- 5 files changed, 34 insertions(+), 28 deletions(-) diff --git a/pgxn/neon/communicator.c b/pgxn/neon/communicator.c index 818a149499..9609f186b9 100644 --- a/pgxn/neon/communicator.c +++ b/pgxn/neon/communicator.c @@ -425,15 +425,12 @@ compact_prefetch_buffers(void) * point inside and outside PostgreSQL. * * This still does throw errors when it receives malformed responses from PS. - * - * When we're not called from CHECK_FOR_INTERRUPTS (indicated by - * IsHandlingInterrupts) we also report we've ended prefetch receive work, - * just in case state tracking was lost due to an error in the sync getPage - * response code. */ void -communicator_prefetch_pump_state(bool IsHandlingInterrupts) +communicator_prefetch_pump_state(void) { + START_PREFETCH_RECEIVE_WORK(); + while (MyPState->ring_receive != MyPState->ring_flush) { NeonResponse *response; @@ -482,9 +479,7 @@ communicator_prefetch_pump_state(bool IsHandlingInterrupts) } } - /* We never pump the prefetch state while handling other pages */ - if (!IsHandlingInterrupts) - END_PREFETCH_RECEIVE_WORK(); + END_PREFETCH_RECEIVE_WORK(); communicator_reconfigure_timeout_if_needed(); } @@ -672,9 +667,10 @@ prefetch_wait_for(uint64 ring_index) Assert(MyPState->ring_unused > ring_index); + START_PREFETCH_RECEIVE_WORK(); + while (MyPState->ring_receive <= ring_index) { - START_PREFETCH_RECEIVE_WORK(); entry = GetPrfSlot(MyPState->ring_receive); Assert(entry->status == PRFS_REQUESTED); @@ -683,17 +679,18 @@ prefetch_wait_for(uint64 ring_index) result = false; break; } - - END_PREFETCH_RECEIVE_WORK(); CHECK_FOR_INTERRUPTS(); } + if (result) { /* Check that slot is actually received (srver can be disconnected in prefetch_pump_state called from CHECK_FOR_INTERRUPTS */ PrefetchRequest *slot = GetPrfSlot(ring_index); - return slot->status == PRFS_RECEIVED; + result = slot->status == PRFS_RECEIVED; } - return false; + END_PREFETCH_RECEIVE_WORK(); + + return result; ; } @@ -720,6 +717,7 @@ prefetch_read(PrefetchRequest *slot) Assert(slot->status == PRFS_REQUESTED); Assert(slot->response == NULL); Assert(slot->my_ring_index == MyPState->ring_receive); + Assert(readpage_reentrant_guard); if (slot->status != PRFS_REQUESTED || slot->response != NULL || @@ -802,6 +800,7 @@ communicator_prefetch_receive(BufferTag tag) PrfHashEntry *entry; PrefetchRequest hashkey; + Assert(readpage_reentrant_guard); hashkey.buftag = tag; entry = prfh_lookup(MyPState->prf_hash, &hashkey); if (entry != NULL && prefetch_wait_for(entry->slot->my_ring_index)) @@ -821,8 +820,12 @@ communicator_prefetch_receive(BufferTag tag) void prefetch_on_ps_disconnect(void) { + bool save_readpage_reentrant_guard = readpage_reentrant_guard; MyPState->ring_flush = MyPState->ring_unused; + /* Prohibit callig of prefetch_pump_state */ + START_PREFETCH_RECEIVE_WORK(); + while (MyPState->ring_receive < MyPState->ring_unused) { PrefetchRequest *slot; @@ -851,6 +854,9 @@ prefetch_on_ps_disconnect(void) MyNeonCounters->getpage_prefetch_discards_total += 1; } + /* Restore guard */ + readpage_reentrant_guard = save_readpage_reentrant_guard; + /* * We can have gone into retry due to network error, so update stats with * the latest available @@ -2509,7 +2515,7 @@ communicator_processinterrupts(void) if (timeout_signaled) { if (!readpage_reentrant_guard && readahead_getpage_pull_timeout_ms > 0) - communicator_prefetch_pump_state(true); + communicator_prefetch_pump_state(); timeout_signaled = false; communicator_reconfigure_timeout_if_needed(); diff --git a/pgxn/neon/communicator.h b/pgxn/neon/communicator.h index f55c4b10f1..5376c9b839 100644 --- a/pgxn/neon/communicator.h +++ b/pgxn/neon/communicator.h @@ -44,7 +44,7 @@ extern int communicator_read_slru_segment(SlruKind kind, int64 segno, void *buffer); extern void communicator_reconfigure_timeout_if_needed(void); -extern void communicator_prefetch_pump_state(bool IsHandlingInterrupts); +extern void communicator_prefetch_pump_state(void); #endif diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 87eb420717..f574517b2a 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -1179,7 +1179,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, blocknum += iterblocks; } - communicator_prefetch_pump_state(false); + communicator_prefetch_pump_state(); return false; } @@ -1218,7 +1218,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) communicator_prefetch_register_bufferv(tag, NULL, 1, NULL); - communicator_prefetch_pump_state(false); + communicator_prefetch_pump_state(); return false; } @@ -1262,7 +1262,7 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum, */ neon_log(SmgrTrace, "writeback noop"); - communicator_prefetch_pump_state(false); + communicator_prefetch_pump_state(); #ifdef DEBUG_COMPARE_LOCAL if (IS_LOCAL_REL(reln)) @@ -1315,7 +1315,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer } /* Try to read PS results if they are available */ - communicator_prefetch_pump_state(false); + communicator_prefetch_pump_state(); neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1); @@ -1339,7 +1339,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer /* * Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes. */ - communicator_prefetch_pump_state(false); + communicator_prefetch_pump_state(); #ifdef DEBUG_COMPARE_LOCAL if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln)) @@ -1449,7 +1449,7 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, nblocks, PG_IOV_MAX); /* Try to read PS results if they are available */ - communicator_prefetch_pump_state(false); + communicator_prefetch_pump_state(); neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns, nblocks); @@ -1480,7 +1480,7 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, /* * Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes. */ - communicator_prefetch_pump_state(false); + communicator_prefetch_pump_state(); #ifdef DEBUG_COMPARE_LOCAL if (forknum == MAIN_FORKNUM && IS_LOCAL_REL(reln)) @@ -1665,7 +1665,7 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo lfc_write(InfoFromSMgrRel(reln), forknum, blocknum, buffer); - communicator_prefetch_pump_state(false); + communicator_prefetch_pump_state(); #ifdef DEBUG_COMPARE_LOCAL if (IS_LOCAL_REL(reln)) @@ -1727,7 +1727,7 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno, lfc_writev(InfoFromSMgrRel(reln), forknum, blkno, buffers, nblocks); - communicator_prefetch_pump_state(false); + communicator_prefetch_pump_state(); #ifdef DEBUG_COMPARE_LOCAL if (IS_LOCAL_REL(reln)) @@ -1902,7 +1902,7 @@ neon_immedsync(SMgrRelation reln, ForkNumber forknum) neon_log(SmgrTrace, "[NEON_SMGR] immedsync noop"); - communicator_prefetch_pump_state(false); + communicator_prefetch_pump_state(); #ifdef DEBUG_COMPARE_LOCAL if (IS_LOCAL_REL(reln)) diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index 05ddf212e2..d72d76f2cd 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit 05ddf212e2e07b788b5c8b88bdcf98630941f6ae +Subproject commit d72d76f2cdee4194dd052ce099e9784aca7c794a diff --git a/vendor/revisions.json b/vendor/revisions.json index 10aad7e1a2..e76510f969 100644 --- a/vendor/revisions.json +++ b/vendor/revisions.json @@ -5,7 +5,7 @@ ], "v16": [ "16.8", - "05ddf212e2e07b788b5c8b88bdcf98630941f6ae" + "d72d76f2cdee4194dd052ce099e9784aca7c794a" ], "v15": [ "15.12",