From 66fdb54d8de0dff256349f9163da4f3519344d9a Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Sat, 2 Dec 2023 07:59:57 +0200 Subject: [PATCH] Merge with #5837 --- pgxn/neon/libpagestore.c | 160 +++++++++++++++++++++---------------- pgxn/neon/pagestore_smgr.c | 2 +- 2 files changed, 90 insertions(+), 72 deletions(-) diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index b608528af7..f29fb96b3a 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -61,6 +61,7 @@ bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = static bool pageserver_flush(shardno_t shard_no); static void pageserver_disconnect(shardno_t shard_no); +static void AssignPageserverConnstring(const char *newval, void *extra); static shmem_startup_hook_type prev_shmem_startup_hook; #if PG_VERSION_NUM>=150000 @@ -70,20 +71,19 @@ static shmem_request_hook_type prev_shmem_request_hook; typedef struct { size_t n_shards; - size_t update_counter; + pg_atomic_uint64 begin_update_counter; + pg_atomic_uint64 end_update_counter; char shard_connstr[MAX_SHARDS][MAX_PS_CONNSTR_LEN]; } ShardMap; static ShardMap* shard_map; -static LWLockId shard_map_lock; -static size_t shard_map_update_counter; -static bool config_updated; +static uint64 shard_map_update_counter; typedef struct { /* - * connection for each shard + * Connection for each shard */ PGconn *conn; /* @@ -112,9 +112,10 @@ psm_shmem_startup(void) shard_map = (ShardMap*)ShmemInitStruct("shard_map", sizeof(ShardMap), &found); if (!found) { - shard_map_lock = (LWLockId)GetNamedLWLockTranche("shard_map_lock"); shard_map->n_shards = 0; - shard_map->update_counter = 1; /* force update of shared map on forst access */ + pg_atomic_init_u64(&shard_map->begin_update_counter, 0); + pg_atomic_init_u64(&shard_map->end_update_counter, 0); + AssignPageserverConnstring(page_server_connstring, NULL); } LWLockRelease(AddinShmemInitLock); } @@ -128,7 +129,6 @@ psm_shmem_request(void) #endif RequestAddinShmemSpace(sizeof(ShardMap)); - RequestNamedLWLockTranche("shard_map_lock", 1); } static void @@ -151,74 +151,41 @@ static shardno_t load_shard_map(shardno_t shard_no, char* connstr) { shardno_t n_shards; - bool exclusive_lock = false; - LWLockAcquire(shard_map_lock, LW_SHARED); + uint64 begin_update_counter; + uint64 end_update_counter; - if (shard_map_update_counter != shard_map->update_counter || config_updated) + /* + * There is race condition here between backendc and postmaster which can update shard map. + * We recheck update couner after copying connection string to check that configuration was not changed. + */ + do { - ParseConnString: - { - char const* shard_connstr = page_server_connstring; - char const* sep; - size_t connstr_len; - int i = 0; - bool shard_map_changed = false; - do - { - sep = strchr(shard_connstr, ','); - connstr_len = sep != NULL ? sep - shard_connstr : strlen(shard_connstr); - if (connstr_len == 0) - break; /* trailing comma */ - if (i >= MAX_SHARDS) - elog(ERROR, "Too many shards"); - if (connstr_len >= MAX_PS_CONNSTR_LEN) - elog(ERROR, "Connection string too long"); - if (i >= shard_map->n_shards || - strcmp(shard_map->shard_connstr[i], shard_connstr) != 0) - { - if (!exclusive_lock) - { - /* Retry under exclsuive lock */ - LWLockRelease(shard_map_lock); - LWLockAcquire(shard_map_lock, LW_EXCLUSIVE); - exclusive_lock = true; - goto ParseConnString; - } - shard_map_changed = true; - memcpy(shard_map->shard_connstr[i], shard_connstr, connstr_len+1); - } - shard_connstr = sep + 1; - i += 1; - } while (sep != NULL); + begin_update_counter = pg_atomic_read_u64(&shard_map->begin_update_counter); + end_update_counter = pg_atomic_read_u64(&shard_map->end_update_counter); - if (i == 0) - elog(ERROR, "No shards were speciified"); + n_shards = shard_map->n_shards; + if (shard_no >= n_shards) + elog(ERROR, "Shard %d is greater or equal than number of shards %d", shard_no, n_shards); + + if (connstr) + strncpy(connstr, shard_map->shard_connstr[shard_no], MAX_PS_CONNSTR_LEN); - if (shard_map_changed) - { - shard_map->update_counter += 1; - shard_map->n_shards = i; - } - } - if (shard_map_update_counter != shard_map->update_counter) - { - for (shardno_t i = 0; i < max_attached_shard_no; i++) - { - if (page_servers[i].conn) - pageserver_disconnect(i); - } - shard_map_update_counter = shard_map->update_counter; - } - config_updated = false; } - n_shards = shard_map->n_shards; - if (shard_no >= n_shards) - elog(ERROR, "Shard %d is greater or equal than number of shards %d", shard_no, n_shards); + while (begin_update_counter != end_update_counter + || begin_update_counter != pg_atomic_read_u64(&shard_map->begin_update_counter) + || end_update_counter != pg_atomic_read_u64(&shard_map->end_update_counter)); - if (connstr) - strcpy(connstr, shard_map->shard_connstr[shard_no]); - - LWLockRelease(shard_map_lock); + if (shard_map_update_counter != end_update_counter) + { + /* Reset all connections if connection strings are changed */ + for (shardno_t i = 0; i < max_attached_shard_no; i++) + { + if (page_servers[i].conn) + pageserver_disconnect(i); + } + max_attached_shard_no = 0; + shard_map_update_counter = end_update_counter; + } return n_shards; } @@ -577,7 +544,58 @@ check_neon_id(char **newval, void **extra, GucSource source) static void AssignPageserverConnstring(const char *newval, void *extra) { - config_updated = true; + /* + * Load shard map only at Postmaster. + * If old page server is not available, then backends can be blocked in attempts to reconnect to it and do not reload config in this loop + */ + if (shard_map != NULL && (MyProcPid == PostmasterPid || shard_map->n_shards == 0)) + { + char const* shard_connstr = newval; + char const* sep; + size_t connstr_len; + int i = 0; + bool shard_map_changed = false; + do + { + sep = strchr(shard_connstr, ','); + connstr_len = sep != NULL ? sep - shard_connstr : strlen(shard_connstr); + if (connstr_len == 0) + break; /* trailing comma */ + if (i >= MAX_SHARDS) + { + elog(LOG, "Too many shards"); + return; + } + if (connstr_len >= MAX_PS_CONNSTR_LEN) + { + elog(LOG, "Connection string too long"); + return; + } + if (i >= shard_map->n_shards || + strcmp(shard_map->shard_connstr[i], shard_connstr) != 0) + { + if (!shard_map_changed) + { + pg_atomic_add_fetch_u64(&shard_map->begin_update_counter, 1); + shard_map_changed = true; + } + memcpy(shard_map->shard_connstr[i], shard_connstr, connstr_len+1); + } + shard_connstr = sep + 1; + i += 1; + } while (sep != NULL); + + if (i == 0) + { + elog(LOG, "No shards were specified"); + return; + } + if (shard_map_changed) + { + shard_map->n_shards = i; + pg_atomic_add_fetch_u64(&shard_map->end_update_counter, 1); + } + } } /* diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index b6ac759d96..b581c89c81 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -920,7 +920,7 @@ page_server_request(void const *req) shard_no = get_shard_number(&tag); /* - * TODO: temporary workarround - we stream all WAL only to shard 0 so metadata and forks other than main + * TODO: temporary workarround - we stream all WAL only to shard 0, so metadata and forks other than main * should be requested from shard 0. We still need to call get_shard_no() to check if shard map is up-to-date */ if (((NeonRequest *) req)->tag != T_NeonGetPageRequest || ((NeonGetPageRequest *) req)->forknum != MAIN_FORKNUM)