From 735ccee5b2f51e8a900e8dec7cb63b31291c8fbe Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 27 Mar 2025 14:12:54 +0100 Subject: [PATCH] Do not overwrite buffer filled by prefetch_lookup in lfc_readv_select --- pgxn/neon/file_cache.c | 60 +++++++++++++++++++++++++++++--------- pgxn/neon/pagestore_smgr.c | 56 ++++++++++++++++++++--------------- 2 files changed, 79 insertions(+), 37 deletions(-) diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index e555e069d0..26e6fc988c 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -694,7 +694,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, { struct iovec iov[PG_IOV_MAX]; int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1); - int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK)); + int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - chunk_offs); int iteration_hits = 0; int iteration_misses = 0; uint64 io_time_us = 0; @@ -708,10 +708,13 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, n_blocks_to_read += (BITMAP_ISSET(mask, buf_offset + i) != 0); iov[i].iov_base = buffers[buf_offset + i]; iov[i].iov_len = BLCKSZ; - BITMAP_CLR(mask, buf_offset + i); } if (n_blocks_to_read == 0) { + for (int i = 0; i < blocks_in_chunk; i++) + { + BITMAP_CLR(mask, buf_offset + i); + } buf_offset += blocks_in_chunk; nblocks -= blocks_in_chunk; blkno += blocks_in_chunk; @@ -744,6 +747,10 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, if (entry == NULL) { /* Pages are not cached */ + for (int i = 0; i < blocks_in_chunk; i++) + { + BITMAP_CLR(mask, buf_offset + i); + } lfc_ctl->misses += blocks_in_chunk; pgBufferUsage.file_cache.misses += blocks_in_chunk; LWLockRelease(lfc_lock); @@ -766,6 +773,10 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, { FileCacheBlockState state = UNAVAILABLE; bool sleeping = false; + + if (!BITMAP_ISSET(mask, buf_offset + i)) + continue; + while (lfc_ctl->generation == generation) { state = GET_STATE(entry, chunk_offs + i); @@ -789,11 +800,13 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, } if (state == AVAILABLE) { - BITMAP_SET(mask, buf_offset + i); iteration_hits++; } else + { + BITMAP_CLR(mask, buf_offset + i); iteration_misses++; + } } LWLockRelease(lfc_lock); @@ -801,15 +814,36 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, if (iteration_hits != 0) { - pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ); - rc = preadv(lfc_desc, iov, blocks_in_chunk, - ((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ); - pgstat_report_wait_end(); - - if (rc != (BLCKSZ * blocks_in_chunk)) + if (blocks_in_chunk == n_blocks_to_read) { - lfc_disable("read"); - return -1; + pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ); + rc = preadv(lfc_desc, iov, blocks_in_chunk, + ((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ); + pgstat_report_wait_end(); + + if (rc != (BLCKSZ * blocks_in_chunk)) + { + lfc_disable("read"); + return -1; + } + } + else + { + /* Some blocks are already prefetched in provided buffers, we should not rewrite them, so we can not use vector read */ + for (int i = 0; i < blocks_in_chunk; i++) + { + if (BITMAP_ISSET(mask, buf_offset + i)) + { + pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ); + rc = pread(lfc_desc, iov[i].iov_base, BLCKSZ, ((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs + i) * BLCKSZ); + pgstat_report_wait_end(); + if (rc != BLCKSZ) + { + lfc_disable("read"); + return -1; + } + } + } } } @@ -1000,12 +1034,12 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, LWLockRelease(lfc_lock); return false; } - + lwlsn = neon_get_lwlsn(rinfo, forknum, blkno); if (lwlsn > lsn) { - elog(DEBUG1, "Skip LFC write for %d because LwLSN=%X/%X is greater than not_nodified_since LSN %X/%X", + elog(DEBUG1, "Skip LFC write for %d because LwLSN=%X/%X is greater than not_modified_since LSN %X/%X", blkno, LSN_FORMAT_ARGS(lwlsn), LSN_FORMAT_ARGS(lsn)); LWLockRelease(lfc_lock); return false; diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 1736165c7d..4a2199d8d8 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -99,7 +99,7 @@ static char *hexdump_page(char *page); #define IS_LOCAL_REL(reln) (\ NInfoGetDbOid(InfoFromSMgrRel(reln)) != 0 && \ - NInfoGetRelNumber(InfoFromSMgrRel(reln)) > FirstNormalObjectId \ + NInfoGetRelNumber(InfoFromSMgrRel(reln)) >= FirstNormalObjectId \ ) const int SmgrTrace = DEBUG5; @@ -336,9 +336,9 @@ static PrefetchState *MyPState; static bool compact_prefetch_buffers(void); static void consume_prefetch_responses(void); -static bool prefetch_read(PrefetchRequest *slot); +static bool prefetch_read(PrefetchRequest *slot, bool sync); static void prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns); -static bool prefetch_wait_for(uint64 ring_index); +static bool prefetch_wait_for(uint64 ring_index, bool sync); static void prefetch_cleanup_trailing_unused(void); static inline void prefetch_set_unused(uint64 ring_index); @@ -503,8 +503,9 @@ prefetch_pump_state(bool IsHandlingInterrupts) slot->status = PRFS_RECEIVED; slot->response = response; - if (response->tag == T_NeonGetPageResponse && !(slot->flags & PRFSF_LFC) && lfc_store_prefetch_result) + if (response->tag == T_NeonGetPageResponse && lfc_store_prefetch_result) { + Assert(!(slot->flags & PRFSF_LFC)); /* * Store prefetched result in LFC (please read comments to lfc_prefetch * explaining why it can be done without holding shared buffer lock @@ -513,6 +514,12 @@ prefetch_pump_state(bool IsHandlingInterrupts) { slot->flags |= PRFSF_LFC; } + else + { + prefetch_set_unused(slot->my_ring_index); + pgBufferUsage.prefetch.expired += 1; + MyNeonCounters->getpage_prefetch_discards_total += 1; + } } } @@ -542,7 +549,7 @@ readahead_buffer_resize(int newsize, void *extra) */ if (MyPState->n_requests_inflight > newsize) { - prefetch_wait_for(MyPState->ring_unused - newsize - 1); + prefetch_wait_for(MyPState->ring_unused - newsize - 1, false); Assert(MyPState->n_requests_inflight <= newsize); } @@ -643,7 +650,7 @@ static void consume_prefetch_responses(void) { if (MyPState->ring_receive < MyPState->ring_unused) - prefetch_wait_for(MyPState->ring_unused - 1); + prefetch_wait_for(MyPState->ring_unused - 1, false); } static void @@ -691,7 +698,7 @@ prefetch_flush_requests(void) * function's call path. */ static bool -prefetch_wait_for(uint64 ring_index) +prefetch_wait_for(uint64 ring_index, bool sync) { PrefetchRequest *entry; bool result = true; @@ -712,7 +719,7 @@ prefetch_wait_for(uint64 ring_index) entry = GetPrfSlot(MyPState->ring_receive); Assert(entry->status == PRFS_REQUESTED); - if (!prefetch_read(entry)) + if (!prefetch_read(entry, sync)) { result = false; break; @@ -737,7 +744,7 @@ prefetch_wait_for(uint64 ring_index) * NOTE: this does IO, and can get canceled out-of-line. */ static bool -prefetch_read(PrefetchRequest *slot) +prefetch_read(PrefetchRequest *slot, bool sync) { NeonResponse *response; MemoryContext old; @@ -791,7 +798,7 @@ prefetch_read(PrefetchRequest *slot) slot->status = PRFS_RECEIVED; slot->response = response; - if (response->tag == T_NeonGetPageResponse && !(slot->flags & PRFSF_LFC) && lfc_store_prefetch_result) + if (response->tag == T_NeonGetPageResponse && !sync && !(slot->flags & PRFSF_LFC) && lfc_store_prefetch_result) { /* * Store prefetched result in LFC (please read comments to lfc_prefetch @@ -801,6 +808,12 @@ prefetch_read(PrefetchRequest *slot) { slot->flags |= PRFSF_LFC; } + else + { + prefetch_set_unused(slot->my_ring_index); + pgBufferUsage.prefetch.expired += 1; + MyNeonCounters->getpage_prefetch_discards_total += 1; + } } return true; } @@ -1164,7 +1177,7 @@ Retry: if (!neon_prefetch_response_usable(lsns, slot)) { /* Wait for the old request to finish and discard it */ - if (!prefetch_wait_for(ring_index)) + if (!prefetch_wait_for(ring_index, false)) goto Retry; prefetch_set_unused(ring_index); entry = NULL; @@ -1256,7 +1269,7 @@ Retry: { case PRFS_REQUESTED: Assert(MyPState->ring_receive == cleanup_index); - if (!prefetch_wait_for(cleanup_index)) + if (!prefetch_wait_for(cleanup_index, false)) goto Retry; prefetch_set_unused(cleanup_index); pgBufferUsage.prefetch.expired += 1; @@ -3220,7 +3233,7 @@ Retry: */ if (slot->status == PRFS_REQUESTED) { - if (!prefetch_wait_for(slot->my_ring_index)) + if (!prefetch_wait_for(slot->my_ring_index, false)) goto Retry; } /* drop caches */ @@ -3258,11 +3271,11 @@ Retry: Assert(slot->status != PRFS_UNUSED); Assert(GetPrfSlot(ring_index) == slot); - } while (!prefetch_wait_for(ring_index)); + } while (!prefetch_wait_for(ring_index, true)); Assert(slot->status == PRFS_RECEIVED); Assert(memcmp(&hashkey.buftag, &slot->buftag, sizeof(BufferTag)) == 0); - Assert(hashkey.buftag.blockNum == base_blockno + i); + Assert(hashkey.buftag.blockNum == blockno); resp = slot->response; @@ -3278,23 +3291,18 @@ Retry: resp->not_modified_since != slot->request_lsns.not_modified_since || !RelFileInfoEquals(getpage_resp->req.rinfo, rinfo) || getpage_resp->req.forknum != forkNum || - getpage_resp->req.blkno != base_blockno + i) + getpage_resp->req.blkno != blockno) { NEON_PANIC_CONNECTION_STATE(-1, PANIC, "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u} to get page request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u}", resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(getpage_resp->req.rinfo), getpage_resp->req.forknum, getpage_resp->req.blkno, - slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since), RelFileInfoFmt(rinfo), forkNum, base_blockno + i); + slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since), RelFileInfoFmt(rinfo), forkNum, blockno); } } memcpy(buffer, getpage_resp->page, BLCKSZ); - /* - * With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received - * from page server. But if lfc_store_prefetch_result=false then it is not yet stored in LFC and we have to do it here - * under buffer lock. - */ - if (!lfc_store_prefetch_result) - lfc_write(rinfo, forkNum, blockno, buffer); + lfc_write(rinfo, forkNum, blockno, buffer); + break; } case T_NeonErrorResponse: