diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index acb8092990..caffdc9612 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -178,6 +178,8 @@ static PageServer page_servers[MAX_SHARDS]; static bool pageserver_flush(shardno_t shard_no); static void pageserver_disconnect(shardno_t shard_no); static void pageserver_disconnect_shard(shardno_t shard_no); +// HADRON +shardno_t get_num_shards(void); static bool PagestoreShmemIsValid(void) @@ -286,6 +288,22 @@ AssignPageserverConnstring(const char *newval, void *extra) } } +/* BEGIN_HADRON */ +/** + * Return the total number of shards seen in the shard map. + */ +shardno_t get_num_shards(void) +{ + const ShardMap *shard_map; + + Assert(pagestore_shared); + shard_map = &pagestore_shared->shard_map; + + Assert(shard_map != NULL); + return shard_map->num_shards; +} +/* END_HADRON */ + /* * Get the current number of shards, and/or the connection string for a * particular shard from the shard map in shared memory. diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 93807be8c2..d43d372c2e 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -110,6 +110,9 @@ static void rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk); static void CheckGracefulShutdown(WalProposer *wp); +// HADRON +shardno_t get_num_shards(void); + static void init_walprop_config(bool syncSafekeepers) { @@ -646,18 +649,19 @@ walprop_pg_get_shmem_state(WalProposer *wp) * Record new ps_feedback in the array with shards and update min_feedback. */ static PageserverFeedback -record_pageserver_feedback(PageserverFeedback *ps_feedback) +record_pageserver_feedback(PageserverFeedback *ps_feedback, shardno_t num_shards) { PageserverFeedback min_feedback; Assert(ps_feedback->present); Assert(ps_feedback->shard_number < MAX_SHARDS); + Assert(ps_feedback->shard_number < num_shards); SpinLockAcquire(&walprop_shared->mutex); - /* Update the number of shards */ - if (ps_feedback->shard_number + 1 > walprop_shared->num_shards) - walprop_shared->num_shards = ps_feedback->shard_number + 1; + // Hadron: Update the num_shards from the source-of-truth (shard map) lazily when we receive + // a new pageserver feedback. + walprop_shared->num_shards = Max(walprop_shared->num_shards, num_shards); /* Update the feedback */ memcpy(&walprop_shared->shard_ps_feedback[ps_feedback->shard_number], ps_feedback, sizeof(PageserverFeedback)); @@ -2023,19 +2027,43 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk) if (wp->config->syncSafekeepers) return; + /* handle fresh ps_feedback */ if (sk->appendResponse.ps_feedback.present) { - PageserverFeedback min_feedback = record_pageserver_feedback(&sk->appendResponse.ps_feedback); + shardno_t num_shards = get_num_shards(); - /* Only one main shard sends non-zero currentClusterSize */ - if (sk->appendResponse.ps_feedback.currentClusterSize > 0) - SetNeonCurrentClusterSize(sk->appendResponse.ps_feedback.currentClusterSize); - - if (min_feedback.disk_consistent_lsn != standby_apply_lsn) + // During shard split, we receive ps_feedback from child shards before + // the split commits and our shard map GUC has been updated. We must + // filter out such feedback here because record_pageserver_feedback() + // doesn't do it. + // + // NB: what we would actually want to happen is that we only receive + // ps_feedback from the parent shards when the split is committed, then + // apply the split to our set of tracked feedback and from here on only + // receive ps_feedback from child shards. This filter condition doesn't + // do that: if we split from N parent to 2N child shards, the first N + // child shards' feedback messages will pass this condition, even before + // the split is committed. That's a bit sloppy, but OK for now. + if (sk->appendResponse.ps_feedback.shard_number < num_shards) { - standby_apply_lsn = min_feedback.disk_consistent_lsn; - needToAdvanceSlot = true; + PageserverFeedback min_feedback = record_pageserver_feedback(&sk->appendResponse.ps_feedback, num_shards); + + /* Only one main shard sends non-zero currentClusterSize */ + if (sk->appendResponse.ps_feedback.currentClusterSize > 0) + SetNeonCurrentClusterSize(sk->appendResponse.ps_feedback.currentClusterSize); + + if (min_feedback.disk_consistent_lsn != standby_apply_lsn) + { + standby_apply_lsn = min_feedback.disk_consistent_lsn; + needToAdvanceSlot = true; + } + } + else + { + // HADRON + elog(DEBUG2, "Ignoring pageserver feedback for unknown shard %d (current shard number %d)", + sk->appendResponse.ps_feedback.shard_number, num_shards); } }