From b9152f1ef45ccf64454b51442eaf6f2161e2c306 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Fri, 18 Nov 2022 15:04:58 +0200 Subject: [PATCH] Correctly terminate prefetch in case of pageserver restart (#2850) refer #2819 This patch requires deep knowledge of prefetch internals. So @MMeent please review it or suggest better solution. --- pgxn/neon/libpagestore.c | 13 +++--- pgxn/neon/pagestore_client.h | 5 +++ pgxn/neon/pagestore_smgr.c | 76 ++++++++++++++++++++++-------------- 3 files changed, 58 insertions(+), 36 deletions(-) diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index d8e9d8b52c..d7507e69df 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -32,11 +32,6 @@ #define PageStoreTrace DEBUG5 -#define NEON_TAG "[NEON_SMGR] " -#define neon_log(tag, fmt, ...) ereport(tag, \ - (errmsg(NEON_TAG fmt, ##__VA_ARGS__), \ - errhidestmt(true), errhidecontext(true))) - bool connected = false; PGconn *pageserver_conn = NULL; @@ -239,6 +234,9 @@ pageserver_receive(void) StringInfoData resp_buff; NeonResponse *resp; + if (!connected) + return NULL; + PG_TRY(); { /* read response */ @@ -248,7 +246,10 @@ pageserver_receive(void) if (resp_buff.len < 0) { if (resp_buff.len == -1) - neon_log(ERROR, "end of COPY"); + { + pageserver_disconnect(); + return NULL; + } else if (resp_buff.len == -2) neon_log(ERROR, "could not read COPY data: %s", PQerrorMessage(pageserver_conn)); } diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 9b8081065c..170a0cb72d 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -49,6 +49,11 @@ typedef struct #define messageTag(m) (((const NeonMessage *)(m))->tag) +#define NEON_TAG "[NEON_SMGR] " +#define neon_log(tag, fmt, ...) ereport(tag, \ + (errmsg(NEON_TAG fmt, ##__VA_ARGS__), \ + errhidestmt(true), errhidecontext(true))) + /* * supertype of all the Neon*Request structs below * diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index d6fa7c46c9..d9b45e3933 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -251,9 +251,9 @@ XLogRecPtr prefetch_lsn = 0; static void consume_prefetch_responses(void); static uint64 prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_lsn); -static void prefetch_read(PrefetchRequest *slot); +static bool prefetch_read(PrefetchRequest *slot); static void prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn); -static void prefetch_wait_for(uint64 ring_index); +static bool prefetch_wait_for(uint64 ring_index); static void prefetch_cleanup(void); static inline void prefetch_set_unused(uint64 ring_index); @@ -393,7 +393,7 @@ prefetch_cleanup(void) * NOTE: this function may indirectly update MyPState->pfs_hash; which * invalidates any active pointers into the hash table. */ -static void +static bool prefetch_wait_for(uint64 ring_index) { PrefetchRequest *entry; @@ -412,8 +412,10 @@ prefetch_wait_for(uint64 ring_index) entry = GetPrfSlot(MyPState->ring_receive); Assert(entry->status == PRFS_REQUESTED); - prefetch_read(entry); + if (!prefetch_read(entry)) + return false; } + return true; } /* @@ -425,7 +427,7 @@ prefetch_wait_for(uint64 ring_index) * NOTE: this function may indirectly update MyPState->pfs_hash; which * invalidates any active pointers into the hash table. */ -static void +static bool prefetch_read(PrefetchRequest *slot) { NeonResponse *response; @@ -438,15 +440,22 @@ prefetch_read(PrefetchRequest *slot) old = MemoryContextSwitchTo(MyPState->errctx); response = (NeonResponse *) page_server->receive(); MemoryContextSwitchTo(old); - - /* update prefetch state */ - MyPState->n_responses_buffered += 1; - MyPState->n_requests_inflight -= 1; - MyPState->ring_receive += 1; + if (response) + { + /* update prefetch state */ + MyPState->n_responses_buffered += 1; + MyPState->n_requests_inflight -= 1; + MyPState->ring_receive += 1; - /* update slot state */ - slot->status = PRFS_RECEIVED; - slot->response = response; + /* update slot state */ + slot->status = PRFS_RECEIVED; + slot->response = response; + return true; + } + else + { + return false; + } } /* @@ -746,11 +755,16 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls static NeonResponse * page_server_request(void const *req) { - page_server->send((NeonRequest *) req); - page_server->flush(); - MyPState->ring_flush = MyPState->ring_unused; - consume_prefetch_responses(); - return page_server->receive(); + NeonResponse* resp; + do { + page_server->send((NeonRequest *) req); + page_server->flush(); + MyPState->ring_flush = MyPState->ring_unused; + consume_prefetch_responses(); + resp = page_server->receive(); + } while (resp == NULL); + return resp; + } @@ -1755,22 +1769,24 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, } } - if (entry == NULL) + do { - n_prefetch_misses += 1; + if (entry == NULL) + { + n_prefetch_misses += 1; - ring_index = prefetch_register_buffer(buftag, &request_latest, - &request_lsn); - slot = GetPrfSlot(ring_index); - } + ring_index = prefetch_register_buffer(buftag, &request_latest, + &request_lsn); + slot = GetPrfSlot(ring_index); + } - Assert(slot->my_ring_index == ring_index); - Assert(MyPState->ring_last <= ring_index && - MyPState->ring_unused > ring_index); - Assert(slot->status != PRFS_UNUSED); - Assert(GetPrfSlot(ring_index) == slot); + Assert(slot->my_ring_index == ring_index); + Assert(MyPState->ring_last <= ring_index && + MyPState->ring_unused > ring_index); + Assert(slot->status != PRFS_UNUSED); + Assert(GetPrfSlot(ring_index) == slot); - prefetch_wait_for(ring_index); + } while (!prefetch_wait_for(ring_index)); Assert(slot->status == PRFS_RECEIVED);