diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index e89ffdb628..f73c409f98 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -847,9 +847,9 @@ HandleElectedProposer(WalProposer *wp) * otherwise we must be sync-safekeepers and we have nothing to do then. * * Proceeding is not only pointless but harmful, because we'd give - * safekeepers term history starting with 0/0. These hacks will go away once - * we disable implicit timeline creation on safekeepers and create it with - * non zero LSN from the start. + * safekeepers term history starting with 0/0. These hacks will go away + * once we disable implicit timeline creation on safekeepers and create it + * with non zero LSN from the start. */ if (wp->propEpochStartLsn == InvalidXLogRecPtr) { @@ -1052,10 +1052,11 @@ DetermineEpochStartLsn(WalProposer *wp) if (SkipXLogPageHeader(wp, wp->propEpochStartLsn) != wp->api.get_redo_start_lsn(wp)) { /* - * However, allow to proceed if last_log_term on the node which gave - * the highest vote (i.e. point where we are going to start writing) - * actually had been won by me; plain restart of walproposer not - * intervened by concurrent compute which wrote WAL is ok. + * However, allow to proceed if last_log_term on the node which + * gave the highest vote (i.e. point where we are going to start + * writing) actually had been won by me; plain restart of + * walproposer not intervened by concurrent compute which wrote + * WAL is ok. * * This avoids compute crash after manual term_bump. */ @@ -1366,14 +1367,17 @@ SendAppendRequests(Safekeeper *sk) req = &sk->appendRequest; req_len = req->endLsn - req->beginLsn; - /* We send zero sized AppenRequests as heartbeats; don't wal_read for these. */ + /* + * We send zero sized AppenRequests as heartbeats; don't wal_read + * for these. + */ if (req_len > 0) { switch (wp->api.wal_read(sk, - &sk->outbuf.data[sk->outbuf.len], - req->beginLsn, - req_len, - &errmsg)) + &sk->outbuf.data[sk->outbuf.len], + req->beginLsn, + req_len, + &errmsg)) { case NEON_WALREAD_SUCCESS: break; @@ -1381,7 +1385,7 @@ SendAppendRequests(Safekeeper *sk) return true; case NEON_WALREAD_ERROR: wp_log(WARNING, "WAL reading for node %s:%s failed: %s", - sk->host, sk->port, errmsg); + sk->host, sk->port, errmsg); ShutdownConnection(sk); return false; default: @@ -1469,11 +1473,11 @@ RecvAppendResponses(Safekeeper *sk) * Term has changed to higher one, probably another compute is * running. If this is the case we could PANIC as well because * likely it inserted some data and our basebackup is unsuitable - * anymore. However, we also bump term manually (term_bump endpoint) - * on safekeepers for migration purposes, in this case we do want - * compute to stay alive. So restart walproposer with FATAL instead - * of panicking; if basebackup is spoiled next election will notice - * this. + * anymore. However, we also bump term manually (term_bump + * endpoint) on safekeepers for migration purposes, in this case + * we do want compute to stay alive. So restart walproposer with + * FATAL instead of panicking; if basebackup is spoiled next + * election will notice this. */ wp_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT ", meaning another compute is running at the same time, and it conflicts with us", sk->host, sk->port, diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 86444084ff..faea20291f 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -225,19 +225,21 @@ nwp_register_gucs(void) static int split_safekeepers_list(char *safekeepers_list, char *safekeepers[]) { - int n_safekeepers = 0; - char *curr_sk = safekeepers_list; + int n_safekeepers = 0; + char *curr_sk = safekeepers_list; for (char *coma = safekeepers_list; coma != NULL && *coma != '\0'; curr_sk = coma) { - if (++n_safekeepers >= MAX_SAFEKEEPERS) { + if (++n_safekeepers >= MAX_SAFEKEEPERS) + { wpg_log(FATAL, "too many safekeepers"); } coma = strchr(coma, ','); - safekeepers[n_safekeepers-1] = curr_sk; + safekeepers[n_safekeepers - 1] = curr_sk; - if (coma != NULL) { + if (coma != NULL) + { *coma++ = '\0'; } } @@ -252,10 +254,10 @@ split_safekeepers_list(char *safekeepers_list, char *safekeepers[]) static bool safekeepers_cmp(char *old, char *new) { - char *safekeepers_old[MAX_SAFEKEEPERS]; - char *safekeepers_new[MAX_SAFEKEEPERS]; - int len_old = 0; - int len_new = 0; + char *safekeepers_old[MAX_SAFEKEEPERS]; + char *safekeepers_new[MAX_SAFEKEEPERS]; + int len_old = 0; + int len_new = 0; len_old = split_safekeepers_list(old, safekeepers_old); len_new = split_safekeepers_list(new, safekeepers_new); @@ -292,7 +294,8 @@ assign_neon_safekeepers(const char *newval, void *extra) if (!am_walproposer) return; - if (!newval) { + if (!newval) + { /* should never happen */ wpg_log(FATAL, "neon.safekeepers is empty"); } @@ -301,11 +304,11 @@ assign_neon_safekeepers(const char *newval, void *extra) newval_copy = pstrdup(newval); oldval = pstrdup(wal_acceptors_list); - /* + /* * TODO: restarting through FATAL is stupid and introduces 1s delay before - * next bgw start. We should refactor walproposer to allow graceful exit and - * thus remove this delay. - * XXX: If you change anything here, sync with test_safekeepers_reconfigure_reorder. + * next bgw start. We should refactor walproposer to allow graceful exit + * and thus remove this delay. XXX: If you change anything here, sync with + * test_safekeepers_reconfigure_reorder. */ if (!safekeepers_cmp(oldval, newval_copy)) { @@ -454,7 +457,8 @@ backpressure_throttling_impl(void) memcpy(new_status, old_status, len); snprintf(new_status + len, 64, "backpressure throttling: lag %lu", lag); set_ps_display(new_status); - new_status[len] = '\0'; /* truncate off " backpressure ..." to later reset the ps */ + new_status[len] = '\0'; /* truncate off " backpressure ..." to later + * reset the ps */ elog(DEBUG2, "backpressure throttling: lag %lu", lag); start = GetCurrentTimestamp(); @@ -1963,10 +1967,11 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk) FullTransactionId xmin = hsFeedback.xmin; FullTransactionId catalog_xmin = hsFeedback.catalog_xmin; FullTransactionId next_xid = ReadNextFullTransactionId(); + /* - * Page server is updating nextXid in checkpoint each 1024 transactions, - * so feedback xmin can be actually larger then nextXid and - * function TransactionIdInRecentPast return false in this case, + * Page server is updating nextXid in checkpoint each 1024 + * transactions, so feedback xmin can be actually larger then nextXid + * and function TransactionIdInRecentPast return false in this case, * preventing update of slot's xmin. */ if (FullTransactionIdPrecedes(next_xid, xmin))