Do not overwrite buffer filled by prefetch_lookup in lfc_readv_select

This commit is contained in:
Konstantin Knizhnik
2025-03-27 14:12:54 +01:00
parent a729bc98a9
commit 735ccee5b2
2 changed files with 79 additions and 37 deletions

View File

@@ -694,7 +694,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
{
struct iovec iov[PG_IOV_MAX];
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK));
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - chunk_offs);
int iteration_hits = 0;
int iteration_misses = 0;
uint64 io_time_us = 0;
@@ -708,10 +708,13 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
n_blocks_to_read += (BITMAP_ISSET(mask, buf_offset + i) != 0);
iov[i].iov_base = buffers[buf_offset + i];
iov[i].iov_len = BLCKSZ;
BITMAP_CLR(mask, buf_offset + i);
}
if (n_blocks_to_read == 0)
{
for (int i = 0; i < blocks_in_chunk; i++)
{
BITMAP_CLR(mask, buf_offset + i);
}
buf_offset += blocks_in_chunk;
nblocks -= blocks_in_chunk;
blkno += blocks_in_chunk;
@@ -744,6 +747,10 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (entry == NULL)
{
/* Pages are not cached */
for (int i = 0; i < blocks_in_chunk; i++)
{
BITMAP_CLR(mask, buf_offset + i);
}
lfc_ctl->misses += blocks_in_chunk;
pgBufferUsage.file_cache.misses += blocks_in_chunk;
LWLockRelease(lfc_lock);
@@ -766,6 +773,10 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
{
FileCacheBlockState state = UNAVAILABLE;
bool sleeping = false;
if (!BITMAP_ISSET(mask, buf_offset + i))
continue;
while (lfc_ctl->generation == generation)
{
state = GET_STATE(entry, chunk_offs + i);
@@ -789,11 +800,13 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
if (state == AVAILABLE)
{
BITMAP_SET(mask, buf_offset + i);
iteration_hits++;
}
else
{
BITMAP_CLR(mask, buf_offset + i);
iteration_misses++;
}
}
LWLockRelease(lfc_lock);
@@ -801,15 +814,36 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (iteration_hits != 0)
{
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
rc = preadv(lfc_desc, iov, blocks_in_chunk,
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
pgstat_report_wait_end();
if (rc != (BLCKSZ * blocks_in_chunk))
if (blocks_in_chunk == n_blocks_to_read)
{
lfc_disable("read");
return -1;
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
rc = preadv(lfc_desc, iov, blocks_in_chunk,
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
pgstat_report_wait_end();
if (rc != (BLCKSZ * blocks_in_chunk))
{
lfc_disable("read");
return -1;
}
}
else
{
/* Some blocks are already prefetched in provided buffers, we should not rewrite them, so we can not use vector read */
for (int i = 0; i < blocks_in_chunk; i++)
{
if (BITMAP_ISSET(mask, buf_offset + i))
{
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
rc = pread(lfc_desc, iov[i].iov_base, BLCKSZ, ((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs + i) * BLCKSZ);
pgstat_report_wait_end();
if (rc != BLCKSZ)
{
lfc_disable("read");
return -1;
}
}
}
}
}
@@ -1000,12 +1034,12 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
LWLockRelease(lfc_lock);
return false;
}
lwlsn = neon_get_lwlsn(rinfo, forknum, blkno);
if (lwlsn > lsn)
{
elog(DEBUG1, "Skip LFC write for %d because LwLSN=%X/%X is greater than not_nodified_since LSN %X/%X",
elog(DEBUG1, "Skip LFC write for %d because LwLSN=%X/%X is greater than not_modified_since LSN %X/%X",
blkno, LSN_FORMAT_ARGS(lwlsn), LSN_FORMAT_ARGS(lsn));
LWLockRelease(lfc_lock);
return false;

View File

@@ -99,7 +99,7 @@ static char *hexdump_page(char *page);
#define IS_LOCAL_REL(reln) (\
NInfoGetDbOid(InfoFromSMgrRel(reln)) != 0 && \
NInfoGetRelNumber(InfoFromSMgrRel(reln)) > FirstNormalObjectId \
NInfoGetRelNumber(InfoFromSMgrRel(reln)) >= FirstNormalObjectId \
)
const int SmgrTrace = DEBUG5;
@@ -336,9 +336,9 @@ static PrefetchState *MyPState;
static bool compact_prefetch_buffers(void);
static void consume_prefetch_responses(void);
static bool prefetch_read(PrefetchRequest *slot);
static bool prefetch_read(PrefetchRequest *slot, bool sync);
static void prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns);
static bool prefetch_wait_for(uint64 ring_index);
static bool prefetch_wait_for(uint64 ring_index, bool sync);
static void prefetch_cleanup_trailing_unused(void);
static inline void prefetch_set_unused(uint64 ring_index);
@@ -503,8 +503,9 @@ prefetch_pump_state(bool IsHandlingInterrupts)
slot->status = PRFS_RECEIVED;
slot->response = response;
if (response->tag == T_NeonGetPageResponse && !(slot->flags & PRFSF_LFC) && lfc_store_prefetch_result)
if (response->tag == T_NeonGetPageResponse && lfc_store_prefetch_result)
{
Assert(!(slot->flags & PRFSF_LFC));
/*
* Store prefetched result in LFC (please read comments to lfc_prefetch
* explaining why it can be done without holding shared buffer lock
@@ -513,6 +514,12 @@ prefetch_pump_state(bool IsHandlingInterrupts)
{
slot->flags |= PRFSF_LFC;
}
else
{
prefetch_set_unused(slot->my_ring_index);
pgBufferUsage.prefetch.expired += 1;
MyNeonCounters->getpage_prefetch_discards_total += 1;
}
}
}
@@ -542,7 +549,7 @@ readahead_buffer_resize(int newsize, void *extra)
*/
if (MyPState->n_requests_inflight > newsize)
{
prefetch_wait_for(MyPState->ring_unused - newsize - 1);
prefetch_wait_for(MyPState->ring_unused - newsize - 1, false);
Assert(MyPState->n_requests_inflight <= newsize);
}
@@ -643,7 +650,7 @@ static void
consume_prefetch_responses(void)
{
if (MyPState->ring_receive < MyPState->ring_unused)
prefetch_wait_for(MyPState->ring_unused - 1);
prefetch_wait_for(MyPState->ring_unused - 1, false);
}
static void
@@ -691,7 +698,7 @@ prefetch_flush_requests(void)
* function's call path.
*/
static bool
prefetch_wait_for(uint64 ring_index)
prefetch_wait_for(uint64 ring_index, bool sync)
{
PrefetchRequest *entry;
bool result = true;
@@ -712,7 +719,7 @@ prefetch_wait_for(uint64 ring_index)
entry = GetPrfSlot(MyPState->ring_receive);
Assert(entry->status == PRFS_REQUESTED);
if (!prefetch_read(entry))
if (!prefetch_read(entry, sync))
{
result = false;
break;
@@ -737,7 +744,7 @@ prefetch_wait_for(uint64 ring_index)
* NOTE: this does IO, and can get canceled out-of-line.
*/
static bool
prefetch_read(PrefetchRequest *slot)
prefetch_read(PrefetchRequest *slot, bool sync)
{
NeonResponse *response;
MemoryContext old;
@@ -791,7 +798,7 @@ prefetch_read(PrefetchRequest *slot)
slot->status = PRFS_RECEIVED;
slot->response = response;
if (response->tag == T_NeonGetPageResponse && !(slot->flags & PRFSF_LFC) && lfc_store_prefetch_result)
if (response->tag == T_NeonGetPageResponse && !sync && !(slot->flags & PRFSF_LFC) && lfc_store_prefetch_result)
{
/*
* Store prefetched result in LFC (please read comments to lfc_prefetch
@@ -801,6 +808,12 @@ prefetch_read(PrefetchRequest *slot)
{
slot->flags |= PRFSF_LFC;
}
else
{
prefetch_set_unused(slot->my_ring_index);
pgBufferUsage.prefetch.expired += 1;
MyNeonCounters->getpage_prefetch_discards_total += 1;
}
}
return true;
}
@@ -1164,7 +1177,7 @@ Retry:
if (!neon_prefetch_response_usable(lsns, slot))
{
/* Wait for the old request to finish and discard it */
if (!prefetch_wait_for(ring_index))
if (!prefetch_wait_for(ring_index, false))
goto Retry;
prefetch_set_unused(ring_index);
entry = NULL;
@@ -1256,7 +1269,7 @@ Retry:
{
case PRFS_REQUESTED:
Assert(MyPState->ring_receive == cleanup_index);
if (!prefetch_wait_for(cleanup_index))
if (!prefetch_wait_for(cleanup_index, false))
goto Retry;
prefetch_set_unused(cleanup_index);
pgBufferUsage.prefetch.expired += 1;
@@ -3220,7 +3233,7 @@ Retry:
*/
if (slot->status == PRFS_REQUESTED)
{
if (!prefetch_wait_for(slot->my_ring_index))
if (!prefetch_wait_for(slot->my_ring_index, false))
goto Retry;
}
/* drop caches */
@@ -3258,11 +3271,11 @@ Retry:
Assert(slot->status != PRFS_UNUSED);
Assert(GetPrfSlot(ring_index) == slot);
} while (!prefetch_wait_for(ring_index));
} while (!prefetch_wait_for(ring_index, true));
Assert(slot->status == PRFS_RECEIVED);
Assert(memcmp(&hashkey.buftag, &slot->buftag, sizeof(BufferTag)) == 0);
Assert(hashkey.buftag.blockNum == base_blockno + i);
Assert(hashkey.buftag.blockNum == blockno);
resp = slot->response;
@@ -3278,23 +3291,18 @@ Retry:
resp->not_modified_since != slot->request_lsns.not_modified_since ||
!RelFileInfoEquals(getpage_resp->req.rinfo, rinfo) ||
getpage_resp->req.forknum != forkNum ||
getpage_resp->req.blkno != base_blockno + i)
getpage_resp->req.blkno != blockno)
{
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
"Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u} to get page request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(getpage_resp->req.rinfo), getpage_resp->req.forknum, getpage_resp->req.blkno,
slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since), RelFileInfoFmt(rinfo), forkNum, base_blockno + i);
slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since), RelFileInfoFmt(rinfo), forkNum, blockno);
}
}
memcpy(buffer, getpage_resp->page, BLCKSZ);
/*
* With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received
* from page server. But if lfc_store_prefetch_result=false then it is not yet stored in LFC and we have to do it here
* under buffer lock.
*/
if (!lfc_store_prefetch_result)
lfc_write(rinfo, forkNum, blockno, buffer);
lfc_write(rinfo, forkNum, blockno, buffer);
break;
}
case T_NeonErrorResponse: