mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 05:50:38 +00:00
Use standard prefetch mechanism for geting prewarm results fropm page server
This commit is contained in:
@@ -124,13 +124,6 @@ typedef struct FileCacheEntry
|
||||
dlist_node list_node; /* LRU/holes list node */
|
||||
} FileCacheEntry;
|
||||
|
||||
typedef struct PrewarmRequest
|
||||
{
|
||||
NeonRequestId reqid;
|
||||
XLogRecPtr lsn;
|
||||
XLogRecPtr not_modified_since;
|
||||
} PrewarmRequest;
|
||||
|
||||
#define GET_STATE(entry, i) (((entry)->state[(i) / 16] >> ((i) % 16 * 2)) & 3)
|
||||
#define SET_STATE(entry, i, new_state) (entry)->state[(i) / 16] = ((entry)->state[(i) / 16] & ~(3 << ((i) % 16 * 2))) | ((new_state) << ((i) % 16 * 2))
|
||||
|
||||
@@ -614,9 +607,7 @@ lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries)
|
||||
{
|
||||
size_t snd_idx = 0, rcv_idx = 0;
|
||||
size_t n_sent = 0, n_received = 0;
|
||||
int shard_no;
|
||||
PrewarmRequest* ring;
|
||||
size_t ring_size = pg_nextpower2_32(lfc_prewarm_batch);
|
||||
bool save_lfc_store_prefetch_result;
|
||||
|
||||
if (!lfc_ensure_opened())
|
||||
return;
|
||||
@@ -654,57 +645,30 @@ lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries)
|
||||
|
||||
LWLockRelease(lfc_lock);
|
||||
|
||||
|
||||
ring = (PrewarmRequest*)palloc(sizeof(PrewarmRequest)*ring_size);
|
||||
/* enable prefetch in LFC */
|
||||
save_lfc_store_prefetch_result = lfc_store_prefetch_result;
|
||||
lfc_store_prefetch_result = true;
|
||||
|
||||
elog(LOG, "LFC: start loading %ld chunks", (long)n_entries);
|
||||
|
||||
while (true)
|
||||
{
|
||||
BufferTag tag;
|
||||
size_t chunk_no = snd_idx / BLOCKS_PER_CHUNK;
|
||||
BlockNumber offs_in_chunk = snd_idx % BLOCKS_PER_CHUNK;
|
||||
if (chunk_no < n_entries)
|
||||
{
|
||||
if (fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31)))
|
||||
{
|
||||
/*
|
||||
* In case of prewarming replica we should be careful not to load too new version
|
||||
* of the page - with LSN larger than current replay LSN.
|
||||
* At primary we are always loading latest version.
|
||||
*/
|
||||
XLogRecPtr req_lsn = RecoveryInProgress() ? GetXLogReplayRecPtr(NULL) : UINT64_MAX;
|
||||
|
||||
NeonGetPageRequest request = {
|
||||
.hdr.tag = T_NeonGetPageRequest,
|
||||
.hdr.lsn = req_lsn,
|
||||
/* not_modified_since is filled in below */
|
||||
.rinfo = BufTagGetNRelFileInfo(fs[chunk_no].key),
|
||||
.forknum = fs[chunk_no].key.forkNum,
|
||||
.blkno = fs[chunk_no].key.blockNum + offs_in_chunk,
|
||||
};
|
||||
shard_no = get_shard_number(&fs[chunk_no].key);
|
||||
request.hdr.not_modified_since = GetLastWrittenLSN(request.rinfo, request.forknum, request.blkno);
|
||||
|
||||
while (!page_server->send(shard_no, (NeonRequest *) &request)
|
||||
|| !page_server->flush(shard_no))
|
||||
{
|
||||
/* page server disconnected: all previusly sent prefetch requests are lost */
|
||||
n_sent = 0;
|
||||
n_received = 0;
|
||||
}
|
||||
ring[n_sent & (ring_size-1)].reqid = request.hdr.reqid;
|
||||
ring[n_sent & (ring_size-1)].lsn = request.hdr.lsn;
|
||||
ring[n_sent & (ring_size-1)].not_modified_since = request.hdr.not_modified_since;
|
||||
tag = fs[chunk_no].key;
|
||||
tag.blockNum += offs_in_chunk;
|
||||
(void)prefetch_register_bufferv(tag, NULL, 1, NULL, true);
|
||||
n_sent += 1;
|
||||
}
|
||||
snd_idx += 1;
|
||||
}
|
||||
if (n_sent >= n_received + lfc_prewarm_batch || chunk_no == n_entries)
|
||||
{
|
||||
NRelFileInfo rinfo;
|
||||
NeonResponse* resp;
|
||||
PrewarmRequest* req = &ring[n_received & (ring_size-1)];
|
||||
|
||||
do
|
||||
{
|
||||
chunk_no = rcv_idx / BLOCKS_PER_CHUNK;
|
||||
@@ -712,54 +676,11 @@ lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries)
|
||||
rcv_idx += 1;
|
||||
} while (!(fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31))));
|
||||
|
||||
shard_no = get_shard_number(&fs[chunk_no].key);
|
||||
resp = page_server->receive(shard_no);
|
||||
lfc_ctl->prewarm_curr_chunk = chunk_no;
|
||||
rinfo = BufTagGetNRelFileInfo(fs[chunk_no].key);
|
||||
|
||||
switch (resp->tag)
|
||||
{
|
||||
case T_NeonGetPageResponse:
|
||||
if (neon_protocol_version >= 3)
|
||||
{
|
||||
NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp;
|
||||
if (resp->reqid != req->reqid ||
|
||||
resp->lsn != req->lsn ||
|
||||
resp->not_modified_since != req->not_modified_since ||
|
||||
!RelFileInfoEquals(getpage_resp->req.rinfo, rinfo) ||
|
||||
getpage_resp->req.forknum != fs[chunk_no].key.forkNum ||
|
||||
getpage_resp->req.blkno != fs[chunk_no].key.blockNum + offs_in_chunk)
|
||||
{
|
||||
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
|
||||
"Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u} to get page request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u}",
|
||||
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(getpage_resp->req.rinfo), getpage_resp->req.forknum, getpage_resp->req.blkno,
|
||||
req->reqid, LSN_FORMAT_ARGS(req->lsn), LSN_FORMAT_ARGS(req->not_modified_since), RelFileInfoFmt(rinfo), fs[chunk_no].key.forkNum, fs[chunk_no].key.blockNum + offs_in_chunk);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case T_NeonErrorResponse:
|
||||
if (neon_protocol_version >= 3)
|
||||
{
|
||||
if (resp->reqid != req->reqid ||
|
||||
resp->lsn != req->lsn ||
|
||||
resp->not_modified_since != req->not_modified_since)
|
||||
{
|
||||
elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}",
|
||||
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since),
|
||||
req->reqid, LSN_FORMAT_ARGS(req->lsn), LSN_FORMAT_ARGS(req->not_modified_since));
|
||||
}
|
||||
}
|
||||
/* Prefech can request page which is already dropped so PS can respond with error: just ignore it */
|
||||
elog(LOG, "LFC: page server failed to load page %u of relation %u/%u/%u.%u: %s",
|
||||
fs[chunk_no].key.blockNum + offs_in_chunk, RelFileInfoFmt(rinfo), fs[chunk_no].key.forkNum, ((NeonErrorResponse *) resp)->message);
|
||||
goto next_block;
|
||||
default:
|
||||
elog(LOG, "LFC: unexpected response type: %d", resp->tag);
|
||||
return;
|
||||
}
|
||||
|
||||
if (lfc_prefetch(rinfo, fs[chunk_no].key.forkNum, fs[chunk_no].key.blockNum + offs_in_chunk,
|
||||
((NeonGetPageResponse*)resp)->page, req->not_modified_since))
|
||||
tag = fs[chunk_no].key;
|
||||
tag.blockNum += offs_in_chunk;
|
||||
if (prefetch_receive(tag))
|
||||
{
|
||||
lfc_ctl->prewarmed_pages += 1;
|
||||
}
|
||||
@@ -767,7 +688,7 @@ lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries)
|
||||
{
|
||||
lfc_ctl->skipped_pages += 1;
|
||||
}
|
||||
next_block:
|
||||
|
||||
if (++n_received == n_sent && snd_idx >= n_entries * BLOCKS_PER_CHUNK)
|
||||
{
|
||||
break;
|
||||
@@ -775,7 +696,7 @@ lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries)
|
||||
}
|
||||
}
|
||||
Assert(n_sent == n_received);
|
||||
pfree(ring);
|
||||
lfc_store_prefetch_result = save_lfc_store_prefetch_result;
|
||||
lfc_ctl->prewarm_curr_chunk = n_entries;
|
||||
elog(LOG, "LFC: complete prewarming: loaded %ld pages", (long)n_received);
|
||||
}
|
||||
|
||||
@@ -58,6 +58,17 @@ typedef struct
|
||||
|
||||
#define messageTag(m) (((const NeonMessage *)(m))->tag)
|
||||
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
#define NEON_TAG "[NEON_SMGR] "
|
||||
#define neon_log(tag, fmt, ...) ereport(tag, \
|
||||
(errmsg(NEON_TAG fmt, ##__VA_ARGS__), \
|
||||
errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0)))
|
||||
#define neon_shard_log(shard_no, tag, fmt, ...) ereport(tag, \
|
||||
(errmsg(NEON_TAG "[shard %d] " fmt, shard_no, ##__VA_ARGS__), \
|
||||
errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0)))
|
||||
|
||||
>>>>>>> 7dbb65404 (Use standard prefetch mechanism for geting prewarm results fropm page server)
|
||||
/* SLRUs downloadable from page server */
|
||||
typedef enum {
|
||||
SLRU_CLOG,
|
||||
@@ -281,4 +292,6 @@ extern void set_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumb
|
||||
extern void update_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size);
|
||||
extern void forget_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum);
|
||||
|
||||
|
||||
#endif /* PAGESTORE_CLIENT_H */
|
||||
k
|
||||
|
||||
Reference in New Issue
Block a user