diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index e059a7ad06..20934254f8 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -124,13 +124,6 @@ typedef struct FileCacheEntry dlist_node list_node; /* LRU/holes list node */ } FileCacheEntry; -typedef struct PrewarmRequest -{ - NeonRequestId reqid; - XLogRecPtr lsn; - XLogRecPtr not_modified_since; -} PrewarmRequest; - #define GET_STATE(entry, i) (((entry)->state[(i) / 16] >> ((i) % 16 * 2)) & 3) #define SET_STATE(entry, i, new_state) (entry)->state[(i) / 16] = ((entry)->state[(i) / 16] & ~(3 << ((i) % 16 * 2))) | ((new_state) << ((i) % 16 * 2)) @@ -614,9 +607,7 @@ lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries) { size_t snd_idx = 0, rcv_idx = 0; size_t n_sent = 0, n_received = 0; - int shard_no; - PrewarmRequest* ring; - size_t ring_size = pg_nextpower2_32(lfc_prewarm_batch); + bool save_lfc_store_prefetch_result; if (!lfc_ensure_opened()) return; @@ -654,57 +645,30 @@ lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries) LWLockRelease(lfc_lock); - - ring = (PrewarmRequest*)palloc(sizeof(PrewarmRequest)*ring_size); + /* enable prefetch in LFC */ + save_lfc_store_prefetch_result = lfc_store_prefetch_result; + lfc_store_prefetch_result = true; elog(LOG, "LFC: start loading %ld chunks", (long)n_entries); while (true) { + BufferTag tag; size_t chunk_no = snd_idx / BLOCKS_PER_CHUNK; BlockNumber offs_in_chunk = snd_idx % BLOCKS_PER_CHUNK; if (chunk_no < n_entries) { if (fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31))) { - /* - * In case of prewarming replica we should be careful not to load too new version - * of the page - with LSN larger than current replay LSN. - * At primary we are always loading latest version. - */ - XLogRecPtr req_lsn = RecoveryInProgress() ? GetXLogReplayRecPtr(NULL) : UINT64_MAX; - - NeonGetPageRequest request = { - .hdr.tag = T_NeonGetPageRequest, - .hdr.lsn = req_lsn, - /* not_modified_since is filled in below */ - .rinfo = BufTagGetNRelFileInfo(fs[chunk_no].key), - .forknum = fs[chunk_no].key.forkNum, - .blkno = fs[chunk_no].key.blockNum + offs_in_chunk, - }; - shard_no = get_shard_number(&fs[chunk_no].key); - request.hdr.not_modified_since = GetLastWrittenLSN(request.rinfo, request.forknum, request.blkno); - - while (!page_server->send(shard_no, (NeonRequest *) &request) - || !page_server->flush(shard_no)) - { - /* page server disconnected: all previusly sent prefetch requests are lost */ - n_sent = 0; - n_received = 0; - } - ring[n_sent & (ring_size-1)].reqid = request.hdr.reqid; - ring[n_sent & (ring_size-1)].lsn = request.hdr.lsn; - ring[n_sent & (ring_size-1)].not_modified_since = request.hdr.not_modified_since; + tag = fs[chunk_no].key; + tag.blockNum += offs_in_chunk; + (void)prefetch_register_bufferv(tag, NULL, 1, NULL, true); n_sent += 1; } snd_idx += 1; } if (n_sent >= n_received + lfc_prewarm_batch || chunk_no == n_entries) { - NRelFileInfo rinfo; - NeonResponse* resp; - PrewarmRequest* req = &ring[n_received & (ring_size-1)]; - do { chunk_no = rcv_idx / BLOCKS_PER_CHUNK; @@ -712,54 +676,11 @@ lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries) rcv_idx += 1; } while (!(fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31)))); - shard_no = get_shard_number(&fs[chunk_no].key); - resp = page_server->receive(shard_no); lfc_ctl->prewarm_curr_chunk = chunk_no; - rinfo = BufTagGetNRelFileInfo(fs[chunk_no].key); - switch (resp->tag) - { - case T_NeonGetPageResponse: - if (neon_protocol_version >= 3) - { - NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp; - if (resp->reqid != req->reqid || - resp->lsn != req->lsn || - resp->not_modified_since != req->not_modified_since || - !RelFileInfoEquals(getpage_resp->req.rinfo, rinfo) || - getpage_resp->req.forknum != fs[chunk_no].key.forkNum || - getpage_resp->req.blkno != fs[chunk_no].key.blockNum + offs_in_chunk) - { - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u} to get page request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(getpage_resp->req.rinfo), getpage_resp->req.forknum, getpage_resp->req.blkno, - req->reqid, LSN_FORMAT_ARGS(req->lsn), LSN_FORMAT_ARGS(req->not_modified_since), RelFileInfoFmt(rinfo), fs[chunk_no].key.forkNum, fs[chunk_no].key.blockNum + offs_in_chunk); - } - } - break; - case T_NeonErrorResponse: - if (neon_protocol_version >= 3) - { - if (resp->reqid != req->reqid || - resp->lsn != req->lsn || - resp->not_modified_since != req->not_modified_since) - { - elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), - req->reqid, LSN_FORMAT_ARGS(req->lsn), LSN_FORMAT_ARGS(req->not_modified_since)); - } - } - /* Prefech can request page which is already dropped so PS can respond with error: just ignore it */ - elog(LOG, "LFC: page server failed to load page %u of relation %u/%u/%u.%u: %s", - fs[chunk_no].key.blockNum + offs_in_chunk, RelFileInfoFmt(rinfo), fs[chunk_no].key.forkNum, ((NeonErrorResponse *) resp)->message); - goto next_block; - default: - elog(LOG, "LFC: unexpected response type: %d", resp->tag); - return; - } - - if (lfc_prefetch(rinfo, fs[chunk_no].key.forkNum, fs[chunk_no].key.blockNum + offs_in_chunk, - ((NeonGetPageResponse*)resp)->page, req->not_modified_since)) + tag = fs[chunk_no].key; + tag.blockNum += offs_in_chunk; + if (prefetch_receive(tag)) { lfc_ctl->prewarmed_pages += 1; } @@ -767,7 +688,7 @@ lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries) { lfc_ctl->skipped_pages += 1; } - next_block: + if (++n_received == n_sent && snd_idx >= n_entries * BLOCKS_PER_CHUNK) { break; @@ -775,7 +696,7 @@ lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries) } } Assert(n_sent == n_received); - pfree(ring); + lfc_store_prefetch_result = save_lfc_store_prefetch_result; lfc_ctl->prewarm_curr_chunk = n_entries; elog(LOG, "LFC: complete prewarming: loaded %ld pages", (long)n_received); } diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 8c25958e5b..e5efd01a6d 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -58,6 +58,17 @@ typedef struct #define messageTag(m) (((const NeonMessage *)(m))->tag) +<<<<<<< HEAD +======= +#define NEON_TAG "[NEON_SMGR] " +#define neon_log(tag, fmt, ...) ereport(tag, \ + (errmsg(NEON_TAG fmt, ##__VA_ARGS__), \ + errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0))) +#define neon_shard_log(shard_no, tag, fmt, ...) ereport(tag, \ + (errmsg(NEON_TAG "[shard %d] " fmt, shard_no, ##__VA_ARGS__), \ + errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0))) + +>>>>>>> 7dbb65404 (Use standard prefetch mechanism for geting prewarm results fropm page server) /* SLRUs downloadable from page server */ typedef enum { SLRU_CLOG, @@ -281,4 +292,6 @@ extern void set_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumb extern void update_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size); extern void forget_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum); + #endif /* PAGESTORE_CLIENT_H */ +k