mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
Store prefetch results in LFC cache once as soon as they are received (#10442)
## Problem Prefetch is performed locally, so different backers can request the same pages form PS. Such duplicated request increase load of page server and network traffic. Making prefetch global seems to be very difficult and undesirable, because different queries can access chunks on different speed. Storing prefetch chunks in LFC will not completely eliminate duplicates, but can minimise such requests. The problem with storing prefetch result in LFC is that in this case page is not protected by share buffer lock. So we will have to perform extra synchronisation at LFC side. See: https://neondb.slack.com/archives/C0875PUD0LC/p1736772890602029?thread_ts=1736762541.116949&cid=C0875PUD0LC @MMeent implementation of prewarm: See https://github.com/neondatabase/neon/pull/10312/ ## Summary of changes Use conditional variables to sycnhronize access to LFC entry. --------- Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
This commit is contained in:
committed by
GitHub
parent
3e82addd64
commit
b1d8771d5f
File diff suppressed because it is too large
Load Diff
@@ -56,6 +56,7 @@ uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE;
|
||||
uint32 WAIT_EVENT_NEON_LFC_READ;
|
||||
uint32 WAIT_EVENT_NEON_LFC_TRUNCATE;
|
||||
uint32 WAIT_EVENT_NEON_LFC_WRITE;
|
||||
uint32 WAIT_EVENT_NEON_LFC_CV_WAIT;
|
||||
uint32 WAIT_EVENT_NEON_PS_STARTING;
|
||||
uint32 WAIT_EVENT_NEON_PS_CONFIGURING;
|
||||
uint32 WAIT_EVENT_NEON_PS_SEND;
|
||||
@@ -538,6 +539,7 @@ neon_shmem_startup_hook(void)
|
||||
WAIT_EVENT_NEON_LFC_READ = WaitEventExtensionNew("Neon/FileCache_Read");
|
||||
WAIT_EVENT_NEON_LFC_TRUNCATE = WaitEventExtensionNew("Neon/FileCache_Truncate");
|
||||
WAIT_EVENT_NEON_LFC_WRITE = WaitEventExtensionNew("Neon/FileCache_Write");
|
||||
WAIT_EVENT_NEON_LFC_CV_WAIT = WaitEventExtensionNew("Neon/FileCache_CvWait");
|
||||
WAIT_EVENT_NEON_PS_STARTING = WaitEventExtensionNew("Neon/PS_Starting");
|
||||
WAIT_EVENT_NEON_PS_CONFIGURING = WaitEventExtensionNew("Neon/PS_Configuring");
|
||||
WAIT_EVENT_NEON_PS_SEND = WaitEventExtensionNew("Neon/PS_SendIO");
|
||||
|
||||
@@ -28,6 +28,7 @@ extern uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE;
|
||||
extern uint32 WAIT_EVENT_NEON_LFC_READ;
|
||||
extern uint32 WAIT_EVENT_NEON_LFC_TRUNCATE;
|
||||
extern uint32 WAIT_EVENT_NEON_LFC_WRITE;
|
||||
extern uint32 WAIT_EVENT_NEON_LFC_CV_WAIT;
|
||||
extern uint32 WAIT_EVENT_NEON_PS_STARTING;
|
||||
extern uint32 WAIT_EVENT_NEON_PS_CONFIGURING;
|
||||
extern uint32 WAIT_EVENT_NEON_PS_SEND;
|
||||
@@ -38,6 +39,7 @@ extern uint32 WAIT_EVENT_NEON_WAL_DL;
|
||||
#define WAIT_EVENT_NEON_LFC_READ WAIT_EVENT_BUFFILE_READ
|
||||
#define WAIT_EVENT_NEON_LFC_TRUNCATE WAIT_EVENT_BUFFILE_TRUNCATE
|
||||
#define WAIT_EVENT_NEON_LFC_WRITE WAIT_EVENT_BUFFILE_WRITE
|
||||
#define WAIT_EVENT_NEON_LFC_CV_WAIT WAIT_EVENT_BUFFILE_READ
|
||||
#define WAIT_EVENT_NEON_PS_STARTING PG_WAIT_EXTENSION
|
||||
#define WAIT_EVENT_NEON_PS_CONFIGURING PG_WAIT_EXTENSION
|
||||
#define WAIT_EVENT_NEON_PS_SEND PG_WAIT_EXTENSION
|
||||
|
||||
@@ -233,6 +233,7 @@ extern char *neon_timeline;
|
||||
extern char *neon_tenant;
|
||||
extern int32 max_cluster_size;
|
||||
extern int neon_protocol_version;
|
||||
extern bool lfc_store_prefetch_result;
|
||||
|
||||
extern shardno_t get_shard_number(BufferTag* tag);
|
||||
|
||||
@@ -301,14 +302,16 @@ extern bool lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum,
|
||||
BlockNumber blkno);
|
||||
extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
|
||||
BlockNumber blkno, int nblocks, bits8 *bitmap);
|
||||
extern void lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno);
|
||||
extern void lfc_init(void);
|
||||
extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
const void* buffer, XLogRecPtr lsn);
|
||||
|
||||
|
||||
static inline bool
|
||||
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
void *buffer)
|
||||
{
|
||||
bits8 rv = 0;
|
||||
bits8 rv = 1;
|
||||
return lfc_readv_select(rinfo, forkNum, blkno, &buffer, 1, &rv) == 1;
|
||||
}
|
||||
|
||||
|
||||
@@ -162,7 +162,7 @@ static uint32 local_request_counter;
|
||||
* UNUSED ------> REQUESTED --> RECEIVED
|
||||
* ^ : | |
|
||||
* | : v |
|
||||
* | : TAG_UNUSED |
|
||||
* | : TAG_REMAINS |
|
||||
* | : | |
|
||||
* +----------------+------------+
|
||||
* :
|
||||
@@ -181,7 +181,7 @@ typedef enum PrefetchStatus
|
||||
/* must fit in uint8; bits 0x1 are used */
|
||||
typedef enum {
|
||||
PRFSF_NONE = 0x0,
|
||||
PRFSF_SEQ = 0x1,
|
||||
PRFSF_LFC = 0x1 /* received prefetch result is stored in LFC */
|
||||
} PrefetchRequestFlags;
|
||||
|
||||
typedef struct PrefetchRequest
|
||||
@@ -305,7 +305,7 @@ GetLastWrittenLSNv(NRelFileInfo relfilenode, ForkNumber forknum,
|
||||
static void
|
||||
neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum,
|
||||
BlockNumber blkno, neon_request_lsns *output,
|
||||
BlockNumber nblocks, const bits8 *mask);
|
||||
BlockNumber nblocks);
|
||||
static bool neon_prefetch_response_usable(neon_request_lsns *request_lsns,
|
||||
PrefetchRequest *slot);
|
||||
|
||||
@@ -363,6 +363,7 @@ compact_prefetch_buffers(void)
|
||||
target_slot->buftag = source_slot->buftag;
|
||||
target_slot->shard_no = source_slot->shard_no;
|
||||
target_slot->status = source_slot->status;
|
||||
target_slot->flags = source_slot->flags;
|
||||
target_slot->response = source_slot->response;
|
||||
target_slot->reqid = source_slot->reqid;
|
||||
target_slot->request_lsns = source_slot->request_lsns;
|
||||
@@ -452,6 +453,18 @@ prefetch_pump_state(void)
|
||||
/* update slot state */
|
||||
slot->status = PRFS_RECEIVED;
|
||||
slot->response = response;
|
||||
|
||||
if (response->tag == T_NeonGetPageResponse && !(slot->flags & PRFSF_LFC) && lfc_store_prefetch_result)
|
||||
{
|
||||
/*
|
||||
* Store prefetched result in LFC (please read comments to lfc_prefetch
|
||||
* explaining why it can be done without holding shared buffer lock
|
||||
*/
|
||||
if (lfc_prefetch(BufTagGetNRelFileInfo(slot->buftag), slot->buftag.forkNum, slot->buftag.blockNum, ((NeonGetPageResponse*)response)->page, slot->request_lsns.not_modified_since))
|
||||
{
|
||||
slot->flags |= PRFSF_LFC;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -713,6 +726,18 @@ prefetch_read(PrefetchRequest *slot)
|
||||
/* update slot state */
|
||||
slot->status = PRFS_RECEIVED;
|
||||
slot->response = response;
|
||||
|
||||
if (response->tag == T_NeonGetPageResponse && !(slot->flags & PRFSF_LFC) && lfc_store_prefetch_result)
|
||||
{
|
||||
/*
|
||||
* Store prefetched result in LFC (please read comments to lfc_prefetch
|
||||
* explaining why it can be done without holding shared buffer lock
|
||||
*/
|
||||
if (lfc_prefetch(BufTagGetNRelFileInfo(buftag), buftag.forkNum, buftag.blockNum, ((NeonGetPageResponse*)response)->page, slot->request_lsns.not_modified_since))
|
||||
{
|
||||
slot->flags |= PRFSF_LFC;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
else
|
||||
@@ -864,7 +889,7 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
else
|
||||
neon_get_request_lsns(BufTagGetNRelFileInfo(slot->buftag),
|
||||
slot->buftag.forkNum, slot->buftag.blockNum,
|
||||
&slot->request_lsns, 1, NULL);
|
||||
&slot->request_lsns, 1);
|
||||
request.hdr.lsn = slot->request_lsns.request_lsn;
|
||||
request.hdr.not_modified_since = slot->request_lsns.not_modified_since;
|
||||
|
||||
@@ -890,6 +915,73 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
Assert(!found);
|
||||
}
|
||||
|
||||
/*
|
||||
* Lookup of already received prefetch requests. Only already received responses matching required LSNs are accepted.
|
||||
* Present pages are marked in "mask" bitmap and total number of such pages is returned.
|
||||
*/
|
||||
static int
|
||||
prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blocknum, neon_request_lsns *lsns,
|
||||
BlockNumber nblocks, void **buffers, bits8 *mask)
|
||||
{
|
||||
int hits = 0;
|
||||
PrefetchRequest hashkey;
|
||||
|
||||
/*
|
||||
* Use an intermediate PrefetchRequest struct as the hash key to ensure
|
||||
* correct alignment and that the padding bytes are cleared.
|
||||
*/
|
||||
memset(&hashkey.buftag, 0, sizeof(BufferTag));
|
||||
CopyNRelFileInfoToBufTag(hashkey.buftag, rinfo);
|
||||
hashkey.buftag.forkNum = forknum;
|
||||
|
||||
for (int i = 0; i < nblocks; i++)
|
||||
{
|
||||
PrfHashEntry *entry;
|
||||
|
||||
hashkey.buftag.blockNum = blocknum + i;
|
||||
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
|
||||
|
||||
if (entry != NULL)
|
||||
{
|
||||
PrefetchRequest *slot = entry->slot;
|
||||
uint64 ring_index = slot->my_ring_index;
|
||||
Assert(slot == GetPrfSlot(ring_index));
|
||||
|
||||
Assert(slot->status != PRFS_UNUSED);
|
||||
Assert(MyPState->ring_last <= ring_index &&
|
||||
ring_index < MyPState->ring_unused);
|
||||
Assert(BufferTagsEqual(&slot->buftag, &hashkey.buftag));
|
||||
|
||||
if (slot->status != PRFS_RECEIVED)
|
||||
continue;
|
||||
|
||||
/*
|
||||
* If the caller specified a request LSN to use, only accept
|
||||
* prefetch responses that satisfy that request.
|
||||
*/
|
||||
if (!neon_prefetch_response_usable(&lsns[i], slot))
|
||||
continue;
|
||||
|
||||
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
|
||||
prefetch_set_unused(ring_index);
|
||||
BITMAP_SET(mask, i);
|
||||
|
||||
hits += 1;
|
||||
}
|
||||
}
|
||||
pgBufferUsage.prefetch.hits += hits;
|
||||
return hits;
|
||||
}
|
||||
|
||||
#if PG_MAJORVERSION_NUM < 17
|
||||
static bool
|
||||
prefetch_lookup(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkn, neon_request_lsns *lsns, void *buffer)
|
||||
{
|
||||
bits8 present = 0;
|
||||
return prefetch_lookupv(rinfo, forkNum, blkn, lsns, 1, &buffer, &present) != 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
* prefetch_register_bufferv() - register and prefetch buffers
|
||||
*
|
||||
@@ -1013,8 +1105,6 @@ Retry:
|
||||
/* The buffered request is good enough, return that index */
|
||||
if (is_prefetch)
|
||||
pgBufferUsage.prefetch.duplicates++;
|
||||
else
|
||||
pgBufferUsage.prefetch.hits++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@@ -1116,6 +1206,7 @@ Retry:
|
||||
slot->buftag = hashkey.buftag;
|
||||
slot->shard_no = get_shard_number(&tag);
|
||||
slot->my_ring_index = ring_index;
|
||||
slot->flags = 0;
|
||||
|
||||
min_ring_index = Min(min_ring_index, ring_index);
|
||||
|
||||
@@ -2056,8 +2147,7 @@ GetLastWrittenLSNv(NRelFileInfo relfilenode, ForkNumber forknum,
|
||||
*/
|
||||
static void
|
||||
neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
neon_request_lsns *output, BlockNumber nblocks,
|
||||
const bits8 *mask)
|
||||
neon_request_lsns *output, BlockNumber nblocks)
|
||||
{
|
||||
XLogRecPtr last_written_lsns[PG_IOV_MAX];
|
||||
|
||||
@@ -2145,9 +2235,6 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
neon_request_lsns *result = &output[i];
|
||||
XLogRecPtr last_written_lsn = last_written_lsns[i];
|
||||
|
||||
if (PointerIsValid(mask) && !BITMAP_ISSET(mask, i))
|
||||
continue;
|
||||
|
||||
if (last_written_lsn > replay_lsn)
|
||||
{
|
||||
/* GetCurrentReplayRecPtr was introduced in v15 */
|
||||
@@ -2190,8 +2277,6 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
neon_request_lsns *result = &output[i];
|
||||
XLogRecPtr last_written_lsn = last_written_lsns[i];
|
||||
|
||||
if (PointerIsValid(mask) && !BITMAP_ISSET(mask, i))
|
||||
continue;
|
||||
/*
|
||||
* Use the latest LSN that was evicted from the buffer cache as the
|
||||
* 'not_modified_since' hint. Any pages modified by later WAL records
|
||||
@@ -2413,7 +2498,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
|
||||
}
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum,
|
||||
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1, NULL);
|
||||
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1);
|
||||
{
|
||||
NeonExistsRequest request = {
|
||||
.hdr.tag = T_NeonExistsRequest,
|
||||
@@ -2832,8 +2917,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
while (nblocks > 0)
|
||||
{
|
||||
int iterblocks = Min(nblocks, PG_IOV_MAX);
|
||||
bits8 lfc_present[PG_IOV_MAX / 8];
|
||||
memset(lfc_present, 0, sizeof(lfc_present));
|
||||
bits8 lfc_present[PG_IOV_MAX / 8] = {0};
|
||||
|
||||
if (lfc_cache_containsv(InfoFromSMgrRel(reln), forknum, blocknum,
|
||||
iterblocks, lfc_present) == iterblocks)
|
||||
@@ -2844,12 +2928,13 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
}
|
||||
|
||||
tag.blockNum = blocknum;
|
||||
|
||||
|
||||
for (int i = 0; i < PG_IOV_MAX / 8; i++)
|
||||
lfc_present[i] = ~(lfc_present[i]);
|
||||
|
||||
ring_index = prefetch_register_bufferv(tag, NULL, iterblocks,
|
||||
lfc_present, true);
|
||||
|
||||
nblocks -= iterblocks;
|
||||
blocknum += iterblocks;
|
||||
|
||||
@@ -3105,7 +3190,8 @@ Retry:
|
||||
}
|
||||
}
|
||||
memcpy(buffer, getpage_resp->page, BLCKSZ);
|
||||
lfc_write(rinfo, forkNum, blockno, buffer);
|
||||
if (!lfc_store_prefetch_result)
|
||||
lfc_write(rinfo, forkNum, blockno, buffer);
|
||||
break;
|
||||
}
|
||||
case T_NeonErrorResponse:
|
||||
@@ -3190,6 +3276,17 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
}
|
||||
|
||||
/* Try to read PS results if they are available */
|
||||
prefetch_pump_state();
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1);
|
||||
|
||||
if (prefetch_lookup(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, buffer))
|
||||
{
|
||||
/* Prefetch hit */
|
||||
return;
|
||||
}
|
||||
|
||||
/* Try to read from local file cache */
|
||||
if (lfc_read(InfoFromSMgrRel(reln), forkNum, blkno, buffer))
|
||||
{
|
||||
@@ -3197,9 +3294,11 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
return;
|
||||
}
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1, NULL);
|
||||
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsns, buffer);
|
||||
|
||||
/*
|
||||
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
|
||||
*/
|
||||
prefetch_pump_state();
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
@@ -3280,11 +3379,14 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
#if PG_MAJORVERSION_NUM >= 17
|
||||
static void
|
||||
neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
void **buffers, BlockNumber nblocks)
|
||||
void **buffers, BlockNumber nblocks)
|
||||
{
|
||||
bits8 prefetch_hits[PG_IOV_MAX / 8] = {0};
|
||||
bits8 lfc_hits[PG_IOV_MAX / 8];
|
||||
bits8 read[PG_IOV_MAX / 8];
|
||||
neon_request_lsns request_lsns[PG_IOV_MAX];
|
||||
int lfc_result;
|
||||
int prefetch_result;
|
||||
|
||||
switch (reln->smgr_relpersistence)
|
||||
{
|
||||
@@ -3307,38 +3409,52 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
neon_log(ERROR, "Read request too large: %d is larger than max %d",
|
||||
nblocks, PG_IOV_MAX);
|
||||
|
||||
memset(read, 0, sizeof(read));
|
||||
/* Try to read PS results if they are available */
|
||||
prefetch_pump_state();
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum,
|
||||
request_lsns, nblocks);
|
||||
|
||||
|
||||
prefetch_result = prefetch_lookupv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns, nblocks, buffers, prefetch_hits);
|
||||
|
||||
if (prefetch_result == nblocks)
|
||||
return;
|
||||
|
||||
/* invert the result: exclude prefetched blocks */
|
||||
for (int i = 0; i < PG_IOV_MAX / 8; i++)
|
||||
lfc_hits[i] = ~prefetch_hits[i];
|
||||
|
||||
/* Try to read from local file cache */
|
||||
lfc_result = lfc_readv_select(InfoFromSMgrRel(reln), forknum, blocknum, buffers,
|
||||
nblocks, read);
|
||||
nblocks, lfc_hits);
|
||||
|
||||
if (lfc_result > 0)
|
||||
MyNeonCounters->file_cache_hits_total += lfc_result;
|
||||
|
||||
/* Read all blocks from LFC, so we're done */
|
||||
if (lfc_result == nblocks)
|
||||
if (prefetch_result + lfc_result == nblocks)
|
||||
return;
|
||||
|
||||
if (lfc_result == -1)
|
||||
if (lfc_result <= 0)
|
||||
{
|
||||
/* can't use the LFC result, so read all blocks from PS */
|
||||
for (int i = 0; i < PG_IOV_MAX / 8; i++)
|
||||
read[i] = 0xFF;
|
||||
read[i] = ~prefetch_hits[i];
|
||||
}
|
||||
else
|
||||
{
|
||||
/* invert the result: exclude blocks read from lfc */
|
||||
for (int i = 0; i < PG_IOV_MAX / 8; i++)
|
||||
read[i] = ~(read[i]);
|
||||
read[i] = ~(prefetch_hits[i] | lfc_hits[i]);
|
||||
}
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum,
|
||||
request_lsns, nblocks, read);
|
||||
|
||||
neon_read_at_lsnv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns,
|
||||
buffers, nblocks, read);
|
||||
|
||||
/*
|
||||
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
|
||||
*/
|
||||
prefetch_pump_state();
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
@@ -3610,7 +3726,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||
}
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum,
|
||||
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1, NULL);
|
||||
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1);
|
||||
|
||||
{
|
||||
NeonNblocksRequest request = {
|
||||
@@ -3695,7 +3811,7 @@ neon_dbsize(Oid dbNode)
|
||||
NRelFileInfo dummy_node = {0};
|
||||
|
||||
neon_get_request_lsns(dummy_node, MAIN_FORKNUM,
|
||||
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1, NULL);
|
||||
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1);
|
||||
|
||||
{
|
||||
NeonDbSizeRequest request = {
|
||||
@@ -4430,7 +4546,12 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
|
||||
if (no_redo_needed)
|
||||
{
|
||||
SetLastWrittenLSNForBlock(end_recptr, rinfo, forknum, blkno);
|
||||
lfc_evict(rinfo, forknum, blkno);
|
||||
/*
|
||||
* Redo changes if page exists in LFC.
|
||||
* We should perform this check after assigning LwLSN to prevent
|
||||
* prefetching of some older version of the page by some other backend.
|
||||
*/
|
||||
no_redo_needed = !lfc_cache_contains(rinfo, forknum, blkno);
|
||||
}
|
||||
|
||||
LWLockRelease(partitionLock);
|
||||
|
||||
101
test_runner/regress/test_lfc_prefetch.py
Normal file
101
test_runner/regress/test_lfc_prefetch.py
Normal file
@@ -0,0 +1,101 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.utils import USE_LFC
|
||||
|
||||
|
||||
@pytest.mark.timeout(600)
|
||||
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
|
||||
def test_lfc_prefetch(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test resizing the Local File Cache
|
||||
"""
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
"neon.max_file_cache_size=1GB",
|
||||
"neon.file_cache_size_limit=1GB",
|
||||
"effective_io_concurrency=100",
|
||||
"shared_buffers=1MB",
|
||||
"enable_bitmapscan=off",
|
||||
"enable_seqscan=off",
|
||||
"autovacuum=off",
|
||||
],
|
||||
)
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute("create extension neon")
|
||||
cur.execute("create table t(pk integer, sk integer, filler text default repeat('x',200))")
|
||||
cur.execute("set statement_timeout=0")
|
||||
cur.execute("select setseed(0.5)")
|
||||
cur.execute("insert into t values (generate_series(1,1000000),random()*1000000)")
|
||||
cur.execute("create index on t(sk)")
|
||||
cur.execute("vacuum t")
|
||||
|
||||
# reset LFC
|
||||
cur.execute("alter system set neon.file_cache_size_limit=0")
|
||||
cur.execute("select pg_reload_conf()")
|
||||
time.sleep(1)
|
||||
cur.execute("alter system set neon.file_cache_size_limit='1GB'")
|
||||
cur.execute("select pg_reload_conf()")
|
||||
|
||||
cur.execute(
|
||||
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 100000 and 200000 limit 100) s1"
|
||||
)
|
||||
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
|
||||
log.info(f"Unused prefetches: {prefetch_expired}")
|
||||
|
||||
cur.execute(
|
||||
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 200000 and 300000 limit 100) s2"
|
||||
)
|
||||
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
|
||||
log.info(f"Unused prefetches: {prefetch_expired}")
|
||||
|
||||
cur.execute(
|
||||
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 300000 and 400000 limit 100) s3"
|
||||
)
|
||||
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
|
||||
log.info(f"Unused prefetches: {prefetch_expired}")
|
||||
|
||||
cur.execute(
|
||||
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 100000 and 200000 limit 100) s4"
|
||||
)
|
||||
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
|
||||
log.info(f"Unused prefetches: {prefetch_expired}")
|
||||
|
||||
# if prefetch requests are not stored in LFC, we continue to sent unused prefetch request tyo PS
|
||||
assert prefetch_expired > 0
|
||||
|
||||
cur.execute("set neon.store_prefetch_result_in_lfc=on")
|
||||
|
||||
cur.execute(
|
||||
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 500000 and 600000 limit 100) s5"
|
||||
)
|
||||
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
|
||||
log.info(f"Unused prefetches: {prefetch_expired}")
|
||||
|
||||
cur.execute(
|
||||
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 600000 and 700000 limit 100) s6"
|
||||
)
|
||||
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
|
||||
log.info(f"Unused prefetches: {prefetch_expired}")
|
||||
|
||||
cur.execute(
|
||||
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 700000 and 800000 limit 100) s7"
|
||||
)
|
||||
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
|
||||
log.info(f"Unused prefetches: {prefetch_expired}")
|
||||
|
||||
cur.execute(
|
||||
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 500000 and 600000 limit 100) s8"
|
||||
)
|
||||
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
|
||||
log.info(f"Unused prefetches: {prefetch_expired}")
|
||||
|
||||
# No redundant prefethc requrests if prefetch results are stored in LFC
|
||||
assert prefetch_expired == 0
|
||||
Reference in New Issue
Block a user