diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 4a2199d8d8..1736165c7d 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -99,7 +99,7 @@ static char *hexdump_page(char *page); #define IS_LOCAL_REL(reln) (\ NInfoGetDbOid(InfoFromSMgrRel(reln)) != 0 && \ - NInfoGetRelNumber(InfoFromSMgrRel(reln)) >= FirstNormalObjectId \ + NInfoGetRelNumber(InfoFromSMgrRel(reln)) > FirstNormalObjectId \ ) const int SmgrTrace = DEBUG5; @@ -336,9 +336,9 @@ static PrefetchState *MyPState; static bool compact_prefetch_buffers(void); static void consume_prefetch_responses(void); -static bool prefetch_read(PrefetchRequest *slot, bool sync); +static bool prefetch_read(PrefetchRequest *slot); static void prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns); -static bool prefetch_wait_for(uint64 ring_index, bool sync); +static bool prefetch_wait_for(uint64 ring_index); static void prefetch_cleanup_trailing_unused(void); static inline void prefetch_set_unused(uint64 ring_index); @@ -503,9 +503,8 @@ prefetch_pump_state(bool IsHandlingInterrupts) slot->status = PRFS_RECEIVED; slot->response = response; - if (response->tag == T_NeonGetPageResponse && lfc_store_prefetch_result) + if (response->tag == T_NeonGetPageResponse && !(slot->flags & PRFSF_LFC) && lfc_store_prefetch_result) { - Assert(!(slot->flags & PRFSF_LFC)); /* * Store prefetched result in LFC (please read comments to lfc_prefetch * explaining why it can be done without holding shared buffer lock @@ -514,12 +513,6 @@ prefetch_pump_state(bool IsHandlingInterrupts) { slot->flags |= PRFSF_LFC; } - else - { - prefetch_set_unused(slot->my_ring_index); - pgBufferUsage.prefetch.expired += 1; - MyNeonCounters->getpage_prefetch_discards_total += 1; - } } } @@ -549,7 +542,7 @@ readahead_buffer_resize(int newsize, void *extra) */ if (MyPState->n_requests_inflight > newsize) { - prefetch_wait_for(MyPState->ring_unused - newsize - 1, false); + prefetch_wait_for(MyPState->ring_unused - newsize - 1); Assert(MyPState->n_requests_inflight <= newsize); } @@ -650,7 +643,7 @@ static void consume_prefetch_responses(void) { if (MyPState->ring_receive < MyPState->ring_unused) - prefetch_wait_for(MyPState->ring_unused - 1, false); + prefetch_wait_for(MyPState->ring_unused - 1); } static void @@ -698,7 +691,7 @@ prefetch_flush_requests(void) * function's call path. */ static bool -prefetch_wait_for(uint64 ring_index, bool sync) +prefetch_wait_for(uint64 ring_index) { PrefetchRequest *entry; bool result = true; @@ -719,7 +712,7 @@ prefetch_wait_for(uint64 ring_index, bool sync) entry = GetPrfSlot(MyPState->ring_receive); Assert(entry->status == PRFS_REQUESTED); - if (!prefetch_read(entry, sync)) + if (!prefetch_read(entry)) { result = false; break; @@ -744,7 +737,7 @@ prefetch_wait_for(uint64 ring_index, bool sync) * NOTE: this does IO, and can get canceled out-of-line. */ static bool -prefetch_read(PrefetchRequest *slot, bool sync) +prefetch_read(PrefetchRequest *slot) { NeonResponse *response; MemoryContext old; @@ -798,7 +791,7 @@ prefetch_read(PrefetchRequest *slot, bool sync) slot->status = PRFS_RECEIVED; slot->response = response; - if (response->tag == T_NeonGetPageResponse && !sync && !(slot->flags & PRFSF_LFC) && lfc_store_prefetch_result) + if (response->tag == T_NeonGetPageResponse && !(slot->flags & PRFSF_LFC) && lfc_store_prefetch_result) { /* * Store prefetched result in LFC (please read comments to lfc_prefetch @@ -808,12 +801,6 @@ prefetch_read(PrefetchRequest *slot, bool sync) { slot->flags |= PRFSF_LFC; } - else - { - prefetch_set_unused(slot->my_ring_index); - pgBufferUsage.prefetch.expired += 1; - MyNeonCounters->getpage_prefetch_discards_total += 1; - } } return true; } @@ -1177,7 +1164,7 @@ Retry: if (!neon_prefetch_response_usable(lsns, slot)) { /* Wait for the old request to finish and discard it */ - if (!prefetch_wait_for(ring_index, false)) + if (!prefetch_wait_for(ring_index)) goto Retry; prefetch_set_unused(ring_index); entry = NULL; @@ -1269,7 +1256,7 @@ Retry: { case PRFS_REQUESTED: Assert(MyPState->ring_receive == cleanup_index); - if (!prefetch_wait_for(cleanup_index, false)) + if (!prefetch_wait_for(cleanup_index)) goto Retry; prefetch_set_unused(cleanup_index); pgBufferUsage.prefetch.expired += 1; @@ -3233,7 +3220,7 @@ Retry: */ if (slot->status == PRFS_REQUESTED) { - if (!prefetch_wait_for(slot->my_ring_index, false)) + if (!prefetch_wait_for(slot->my_ring_index)) goto Retry; } /* drop caches */ @@ -3271,11 +3258,11 @@ Retry: Assert(slot->status != PRFS_UNUSED); Assert(GetPrfSlot(ring_index) == slot); - } while (!prefetch_wait_for(ring_index, true)); + } while (!prefetch_wait_for(ring_index)); Assert(slot->status == PRFS_RECEIVED); Assert(memcmp(&hashkey.buftag, &slot->buftag, sizeof(BufferTag)) == 0); - Assert(hashkey.buftag.blockNum == blockno); + Assert(hashkey.buftag.blockNum == base_blockno + i); resp = slot->response; @@ -3291,18 +3278,23 @@ Retry: resp->not_modified_since != slot->request_lsns.not_modified_since || !RelFileInfoEquals(getpage_resp->req.rinfo, rinfo) || getpage_resp->req.forknum != forkNum || - getpage_resp->req.blkno != blockno) + getpage_resp->req.blkno != base_blockno + i) { NEON_PANIC_CONNECTION_STATE(-1, PANIC, "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u} to get page request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u}", resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(getpage_resp->req.rinfo), getpage_resp->req.forknum, getpage_resp->req.blkno, - slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since), RelFileInfoFmt(rinfo), forkNum, blockno); + slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since), RelFileInfoFmt(rinfo), forkNum, base_blockno + i); } } memcpy(buffer, getpage_resp->page, BLCKSZ); - lfc_write(rinfo, forkNum, blockno, buffer); - + /* + * With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received + * from page server. But if lfc_store_prefetch_result=false then it is not yet stored in LFC and we have to do it here + * under buffer lock. + */ + if (!lfc_store_prefetch_result) + lfc_write(rinfo, forkNum, blockno, buffer); break; } case T_NeonErrorResponse: