mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
Correctly terminate prefetch in case of pageserver restart (#2850)
refer #2819 This patch requires deep knowledge of prefetch internals. So @MMeent please review it or suggest better solution.
This commit is contained in:
committed by
GitHub
parent
328ec1ce24
commit
b9152f1ef4
@@ -32,11 +32,6 @@
|
||||
|
||||
#define PageStoreTrace DEBUG5
|
||||
|
||||
#define NEON_TAG "[NEON_SMGR] "
|
||||
#define neon_log(tag, fmt, ...) ereport(tag, \
|
||||
(errmsg(NEON_TAG fmt, ##__VA_ARGS__), \
|
||||
errhidestmt(true), errhidecontext(true)))
|
||||
|
||||
bool connected = false;
|
||||
PGconn *pageserver_conn = NULL;
|
||||
|
||||
@@ -239,6 +234,9 @@ pageserver_receive(void)
|
||||
StringInfoData resp_buff;
|
||||
NeonResponse *resp;
|
||||
|
||||
if (!connected)
|
||||
return NULL;
|
||||
|
||||
PG_TRY();
|
||||
{
|
||||
/* read response */
|
||||
@@ -248,7 +246,10 @@ pageserver_receive(void)
|
||||
if (resp_buff.len < 0)
|
||||
{
|
||||
if (resp_buff.len == -1)
|
||||
neon_log(ERROR, "end of COPY");
|
||||
{
|
||||
pageserver_disconnect();
|
||||
return NULL;
|
||||
}
|
||||
else if (resp_buff.len == -2)
|
||||
neon_log(ERROR, "could not read COPY data: %s", PQerrorMessage(pageserver_conn));
|
||||
}
|
||||
|
||||
@@ -49,6 +49,11 @@ typedef struct
|
||||
|
||||
#define messageTag(m) (((const NeonMessage *)(m))->tag)
|
||||
|
||||
#define NEON_TAG "[NEON_SMGR] "
|
||||
#define neon_log(tag, fmt, ...) ereport(tag, \
|
||||
(errmsg(NEON_TAG fmt, ##__VA_ARGS__), \
|
||||
errhidestmt(true), errhidecontext(true)))
|
||||
|
||||
/*
|
||||
* supertype of all the Neon*Request structs below
|
||||
*
|
||||
|
||||
@@ -251,9 +251,9 @@ XLogRecPtr prefetch_lsn = 0;
|
||||
|
||||
static void consume_prefetch_responses(void);
|
||||
static uint64 prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_lsn);
|
||||
static void prefetch_read(PrefetchRequest *slot);
|
||||
static bool prefetch_read(PrefetchRequest *slot);
|
||||
static void prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn);
|
||||
static void prefetch_wait_for(uint64 ring_index);
|
||||
static bool prefetch_wait_for(uint64 ring_index);
|
||||
static void prefetch_cleanup(void);
|
||||
static inline void prefetch_set_unused(uint64 ring_index);
|
||||
|
||||
@@ -393,7 +393,7 @@ prefetch_cleanup(void)
|
||||
* NOTE: this function may indirectly update MyPState->pfs_hash; which
|
||||
* invalidates any active pointers into the hash table.
|
||||
*/
|
||||
static void
|
||||
static bool
|
||||
prefetch_wait_for(uint64 ring_index)
|
||||
{
|
||||
PrefetchRequest *entry;
|
||||
@@ -412,8 +412,10 @@ prefetch_wait_for(uint64 ring_index)
|
||||
entry = GetPrfSlot(MyPState->ring_receive);
|
||||
|
||||
Assert(entry->status == PRFS_REQUESTED);
|
||||
prefetch_read(entry);
|
||||
if (!prefetch_read(entry))
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -425,7 +427,7 @@ prefetch_wait_for(uint64 ring_index)
|
||||
* NOTE: this function may indirectly update MyPState->pfs_hash; which
|
||||
* invalidates any active pointers into the hash table.
|
||||
*/
|
||||
static void
|
||||
static bool
|
||||
prefetch_read(PrefetchRequest *slot)
|
||||
{
|
||||
NeonResponse *response;
|
||||
@@ -438,15 +440,22 @@ prefetch_read(PrefetchRequest *slot)
|
||||
old = MemoryContextSwitchTo(MyPState->errctx);
|
||||
response = (NeonResponse *) page_server->receive();
|
||||
MemoryContextSwitchTo(old);
|
||||
|
||||
/* update prefetch state */
|
||||
MyPState->n_responses_buffered += 1;
|
||||
MyPState->n_requests_inflight -= 1;
|
||||
MyPState->ring_receive += 1;
|
||||
if (response)
|
||||
{
|
||||
/* update prefetch state */
|
||||
MyPState->n_responses_buffered += 1;
|
||||
MyPState->n_requests_inflight -= 1;
|
||||
MyPState->ring_receive += 1;
|
||||
|
||||
/* update slot state */
|
||||
slot->status = PRFS_RECEIVED;
|
||||
slot->response = response;
|
||||
/* update slot state */
|
||||
slot->status = PRFS_RECEIVED;
|
||||
slot->response = response;
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -746,11 +755,16 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
|
||||
static NeonResponse *
|
||||
page_server_request(void const *req)
|
||||
{
|
||||
page_server->send((NeonRequest *) req);
|
||||
page_server->flush();
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
consume_prefetch_responses();
|
||||
return page_server->receive();
|
||||
NeonResponse* resp;
|
||||
do {
|
||||
page_server->send((NeonRequest *) req);
|
||||
page_server->flush();
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
consume_prefetch_responses();
|
||||
resp = page_server->receive();
|
||||
} while (resp == NULL);
|
||||
return resp;
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -1755,22 +1769,24 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
}
|
||||
}
|
||||
|
||||
if (entry == NULL)
|
||||
do
|
||||
{
|
||||
n_prefetch_misses += 1;
|
||||
if (entry == NULL)
|
||||
{
|
||||
n_prefetch_misses += 1;
|
||||
|
||||
ring_index = prefetch_register_buffer(buftag, &request_latest,
|
||||
&request_lsn);
|
||||
slot = GetPrfSlot(ring_index);
|
||||
}
|
||||
ring_index = prefetch_register_buffer(buftag, &request_latest,
|
||||
&request_lsn);
|
||||
slot = GetPrfSlot(ring_index);
|
||||
}
|
||||
|
||||
Assert(slot->my_ring_index == ring_index);
|
||||
Assert(MyPState->ring_last <= ring_index &&
|
||||
MyPState->ring_unused > ring_index);
|
||||
Assert(slot->status != PRFS_UNUSED);
|
||||
Assert(GetPrfSlot(ring_index) == slot);
|
||||
Assert(slot->my_ring_index == ring_index);
|
||||
Assert(MyPState->ring_last <= ring_index &&
|
||||
MyPState->ring_unused > ring_index);
|
||||
Assert(slot->status != PRFS_UNUSED);
|
||||
Assert(GetPrfSlot(ring_index) == slot);
|
||||
|
||||
prefetch_wait_for(ring_index);
|
||||
} while (!prefetch_wait_for(ring_index));
|
||||
|
||||
Assert(slot->status == PRFS_RECEIVED);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user