diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 606af9741f..4fdc7f8c82 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -34,7 +34,6 @@ #define PageStoreTrace DEBUG5 -#define MAX_RECONNECT_ATTEMPTS 5 #define RECONNECT_INTERVAL_USEC 1000000 bool connected = false; @@ -55,13 +54,15 @@ int32 max_cluster_size; char *page_server_connstring; char *neon_auth_token; -int n_unflushed_requests = 0; -int flush_every_n_requests = 8; int readahead_buffer_size = 128; +int flush_every_n_requests = 8; + +int n_reconnect_attempts = 0; +int max_reconnect_attempts = 60; bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL; -static void pageserver_flush(void); +static bool pageserver_flush(void); static bool pageserver_connect(int elevel) @@ -232,16 +233,17 @@ pageserver_disconnect(void) } } -static void +static bool pageserver_send(NeonRequest * request) { StringInfoData req_buff; - int n_reconnect_attempts = 0; /* If the connection was lost for some reason, reconnect */ if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD) + { + neon_log(LOG, "pageserver_send disconnect bad connection"); pageserver_disconnect(); - + } req_buff = nm_pack_request(request); @@ -252,53 +254,36 @@ pageserver_send(NeonRequest * request) * See https://github.com/neondatabase/neon/issues/1138 * So try to reestablish connection in case of failure. */ - while (true) + if (!connected) { - if (!connected) + while (!pageserver_connect(n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR)) { - if (!pageserver_connect(n_reconnect_attempts < MAX_RECONNECT_ATTEMPTS ? LOG : ERROR)) - { - n_reconnect_attempts += 1; - pg_usleep(RECONNECT_INTERVAL_USEC); - continue; - } + n_reconnect_attempts += 1; + pg_usleep(RECONNECT_INTERVAL_USEC); } + n_reconnect_attempts = 0; + } - /* - * Send request. - * - * In principle, this could block if the output buffer is full, and we - * should use async mode and check for interrupts while waiting. In - * practice, our requests are small enough to always fit in the output and - * TCP buffer. - */ - if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0) - { - char *msg = pchomp(PQerrorMessage(pageserver_conn)); - if (n_reconnect_attempts < MAX_RECONNECT_ATTEMPTS) - { - neon_log(LOG, "failed to send page request (try to reconnect): %s", msg); - if (n_reconnect_attempts != 0) /* do not sleep before first reconnect attempt, assuming that pageserver is already restarted */ - pg_usleep(RECONNECT_INTERVAL_USEC); - n_reconnect_attempts += 1; - continue; - } - else - { - pageserver_disconnect(); - neon_log(ERROR, "failed to send page request: %s", msg); - } - } - break; + /* + * Send request. + * + * In principle, this could block if the output buffer is full, and we + * should use async mode and check for interrupts while waiting. In + * practice, our requests are small enough to always fit in the output and + * TCP buffer. + */ + if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0) + { + char *msg = pchomp(PQerrorMessage(pageserver_conn)); + pageserver_disconnect(); + neon_log(LOG, "pageserver_send disconnect because failed to send page request (try to reconnect): %s", msg); + pfree(msg); + pfree(req_buff.data); + return false; } pfree(req_buff.data); - n_unflushed_requests++; - - if (flush_every_n_requests > 0 && n_unflushed_requests >= flush_every_n_requests) - pageserver_flush(); - if (message_level_is_interesting(PageStoreTrace)) { char *msg = nm_to_string((NeonMessage *) request); @@ -306,6 +291,7 @@ pageserver_send(NeonRequest * request) neon_log(PageStoreTrace, "sent request: %s", msg); pfree(msg); } + return true; } static NeonResponse * @@ -340,16 +326,25 @@ pageserver_receive(void) } else if (rc == -1) { + neon_log(LOG, "pageserver_receive disconnect because call_PQgetCopyData returns -1: %s", pchomp(PQerrorMessage(pageserver_conn))); pageserver_disconnect(); resp = NULL; } else if (rc == -2) - neon_log(ERROR, "could not read COPY data: %s", pchomp(PQerrorMessage(pageserver_conn))); + { + char* msg = pchomp(PQerrorMessage(pageserver_conn)); + pageserver_disconnect(); + neon_log(ERROR, "pageserver_receive disconnect because could not read COPY data: %s", msg); + } else - neon_log(ERROR, "unexpected PQgetCopyData return value: %d", rc); + { + pageserver_disconnect(); + neon_log(ERROR, "pageserver_receive disconnect because unexpected PQgetCopyData return value: %d", rc); + } } PG_CATCH(); { + neon_log(LOG, "pageserver_receive disconnect due to caught exception"); pageserver_disconnect(); PG_RE_THROW(); } @@ -359,21 +354,25 @@ pageserver_receive(void) } -static void +static bool pageserver_flush(void) { if (!connected) { neon_log(WARNING, "Tried to flush while disconnected"); } - else if (PQflush(pageserver_conn)) + else { - char *msg = pchomp(PQerrorMessage(pageserver_conn)); - - pageserver_disconnect(); - neon_log(ERROR, "failed to flush page requests: %s", msg); + if (PQflush(pageserver_conn)) + { + char *msg = pchomp(PQerrorMessage(pageserver_conn)); + pageserver_disconnect(); + neon_log(LOG, "pageserver_flush disconnect because failed to flush page requests: %s", msg); + pfree(msg); + return false; + } } - n_unflushed_requests = 0; + return true; } page_server_api api = { @@ -439,6 +438,14 @@ pg_init_libpagestore(void) PGC_USERSET, 0, /* no flags required */ NULL, NULL, NULL); + DefineCustomIntVariable("neon.max_reconnect_attempts", + "Maximal attempts to reconnect to pages server (with 1 second timeout)", + NULL, + &max_reconnect_attempts, + 10, 0, INT_MAX, + PGC_USERSET, + 0, + NULL, NULL, NULL); DefineCustomIntVariable("neon.readahead_buffer_size", "number of prefetches to buffer", "This buffer is used to hold and manage prefetched " diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 8257b90ac3..2889db49bc 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -145,9 +145,9 @@ extern char *nm_to_string(NeonMessage * msg); typedef struct { - void (*send) (NeonRequest * request); + bool (*send) (NeonRequest * request); NeonResponse *(*receive) (void); - void (*flush) (void); + bool (*flush) (void); } page_server_api; extern void prefetch_on_ps_disconnect(void); diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 79dec0881d..76d71dd94b 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -489,7 +489,8 @@ prefetch_wait_for(uint64 ring_index) if (MyPState->ring_flush <= ring_index && MyPState->ring_unused > MyPState->ring_flush) { - page_server->flush(); + if (!page_server->flush()) + return false; MyPState->ring_flush = MyPState->ring_unused; } @@ -666,7 +667,7 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force * smaller than the current WAL insert/redo pointer, which is already * larger than this prefetch_lsn. So in any case, that would * invalidate this cache. - * + * * The best LSN to use for effective_request_lsn would be * XLogCtl->Insert.RedoRecPtr, but that's expensive to access. */ @@ -677,7 +678,8 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force Assert(slot->response == NULL); Assert(slot->my_ring_index == MyPState->ring_unused); - page_server->send((NeonRequest *) &request); + + while (!page_server->send((NeonRequest *) &request)); /* update prefetch state */ MyPState->n_requests_inflight += 1; @@ -687,6 +689,7 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force /* update slot state */ slot->status = PRFS_REQUESTED; + prfh_insert(MyPState->prf_hash, slot, &found); Assert(!found); } @@ -743,6 +746,7 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls prefetch_set_unused(ring_index); entry = NULL; } + } /* if we don't want the latest version, only accept requests with the exact same LSN */ else @@ -756,20 +760,23 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls } } - /* - * We received a prefetch for a page that was recently read and - * removed from the buffers. Remove that request from the buffers. - */ - else if (slot->status == PRFS_TAG_REMAINS) + if (entry != NULL) { - prefetch_set_unused(ring_index); - entry = NULL; - } - else - { - /* The buffered request is good enough, return that index */ - pgBufferUsage.prefetch.duplicates++; - return ring_index; + /* + * We received a prefetch for a page that was recently read and + * removed from the buffers. Remove that request from the buffers. + */ + if (slot->status == PRFS_TAG_REMAINS) + { + prefetch_set_unused(ring_index); + entry = NULL; + } + else + { + /* The buffered request is good enough, return that index */ + pgBufferUsage.prefetch.duplicates++; + return ring_index; + } } } @@ -859,8 +866,7 @@ page_server_request(void const *req) { NeonResponse* resp; do { - page_server->send((NeonRequest *) req); - page_server->flush(); + while (!page_server->send((NeonRequest *) req) || !page_server->flush()); MyPState->ring_flush = MyPState->ring_unused; consume_prefetch_responses(); resp = page_server->receive();