diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index c0ac5c27d0..5411d26faa 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -70,20 +70,18 @@ static shmem_request_hook_type prev_shmem_request_hook; typedef struct { size_t n_shards; - size_t update_counter; + pg_atomic_uint64 update_counter; char shard_connstr[MAX_SHARDS][MAX_PS_CONNSTR_LEN]; } ShardMap; static ShardMap* shard_map; -static LWLockId shard_map_lock; -static size_t shard_map_update_counter; -static bool config_updated; +static uint64 shard_map_update_counter; typedef struct { /* - * connection for each shard + * Connection for each shard */ PGconn *conn; /* @@ -112,9 +110,8 @@ psm_shmem_startup(void) shard_map = (ShardMap*)ShmemInitStruct("shard_map", sizeof(ShardMap), &found); if (!found) { - shard_map_lock = (LWLockId)GetNamedLWLockTranche("shard_map_lock"); shard_map->n_shards = 0; - shard_map->update_counter = 1; /* force update of shared map on forst access */ + pg_atomic_init_u64(&shard_map->update_counter, 0); } LWLockRelease(AddinShmemInitLock); } @@ -128,7 +125,6 @@ psm_shmem_request(void) #endif RequestAddinShmemSpace(sizeof(ShardMap)); - RequestNamedLWLockTranche("shard_map_lock", 1); } static void @@ -151,75 +147,32 @@ static shardno_t load_shard_map(shardno_t shard_no, char* connstr) { shardno_t n_shards; - bool exclusive_lock = false; - LWLockAcquire(shard_map_lock, LW_SHARED); + uint64 update_counter = pg_atomic_read_u64(&shard_map->update_counter); - if (shard_map_update_counter != shard_map->update_counter || config_updated) + /* + * There is race condition here between backendc and postmaster which can update shard map. + * We recheck update couner after copying connection string to check that configuration was not changed. + */ + while (shard_map_update_counter != update_counter) { - ParseConnString: + /* Reset all connections */ + for (shardno_t i = 0; i < max_attached_shard_no; i++) { - char const* shard_connstr = page_server_connstring; - char const* sep; - size_t connstr_len; - int i = 0; - bool shard_map_changed = false; - do - { - sep = strchr(shard_connstr, ','); - connstr_len = sep != NULL ? sep - shard_connstr : strlen(shard_connstr); - if (connstr_len == 0) - break; /* trailing comma */ - if (i >= MAX_SHARDS) - elog(ERROR, "Too many shards"); - if (connstr_len >= MAX_PS_CONNSTR_LEN) - elog(ERROR, "Connection string too long"); - if (i >= shard_map->n_shards || - strcmp(shard_map->shard_connstr[i], shard_connstr) != 0) - { - if (!exclusive_lock) - { - /* Retry under exclsuive lock */ - LWLockRelease(shard_map_lock); - LWLockAcquire(shard_map_lock, LW_EXCLUSIVE); - exclusive_lock = true; - goto ParseConnString; - } - shard_map_changed = true; - memcpy(shard_map->shard_connstr[i], shard_connstr, connstr_len+1); - } - shard_connstr = sep + 1; - i += 1; - } while (sep != NULL); - - if (i == 0) - elog(ERROR, "No shards were specified"); - - if (shard_map_changed) - { - shard_map->update_counter += 1; - shard_map->n_shards = i; - } + if (page_servers[i].conn) + pageserver_disconnect(i); } - if (shard_map_update_counter != shard_map->update_counter) - { - for (shardno_t i = 0; i < max_attached_shard_no; i++) - { - if (page_servers[i].conn) - pageserver_disconnect(i); - } - shard_map_update_counter = shard_map->update_counter; - } - config_updated = false; + max_attached_shard_no = 0; + + shard_map_update_counter = update_counter; + n_shards = shard_map->n_shards; + if (shard_no >= n_shards) + elog(ERROR, "Shard %d is greater or equal than number of shards %d", shard_no, n_shards); + + if (connstr) + strcpy(connstr, shard_map->shard_connstr[shard_no]); + + update_counter = pg_atomic_read_u64(&shard_map->update_counter); } - n_shards = shard_map->n_shards; - if (shard_no >= n_shards) - elog(ERROR, "Shard %d is greater or equal than number of shards %d", shard_no, n_shards); - - if (connstr) - strcpy(connstr, shard_map->shard_connstr[shard_no]); - - LWLockRelease(shard_map_lock); - return n_shards; } @@ -582,6 +535,59 @@ check_neon_id(char **newval, void **extra, GucSource source) return **newval == '\0' || HexDecodeString(id, *newval, 16); } +static void +AssignPageserverConnstring(const char *newval, void *extra) +{ + /* + * Load shard map only at Postmaster. + * If old page server is not available, then backends can be blocked in attempts to reconnect to it and do not reload config in this loop + */ + if (MyProcPid == PostmasterPid) + { + char const* shard_connstr = newval; + char const* sep; + size_t connstr_len; + int i = 0; + bool shard_map_changed = false; + do + { + sep = strchr(shard_connstr, ','); + connstr_len = sep != NULL ? sep - shard_connstr : strlen(shard_connstr); + if (connstr_len == 0) + break; /* trailing comma */ + if (i >= MAX_SHARDS) + { + elog(LOG, "Too many shards"); + return; + } + if (connstr_len >= MAX_PS_CONNSTR_LEN) + { + elog(LOG, "Connection string too long"); + return; + } + if (i >= shard_map->n_shards || + strcmp(shard_map->shard_connstr[i], shard_connstr) != 0) + { + shard_map_changed = true; + memcpy(shard_map->shard_connstr[i], shard_connstr, connstr_len+1); + } + shard_connstr = sep + 1; + i += 1; + } while (sep != NULL); + + if (i == 0) + { + elog(LOG, "No shards were specified"); + return; + } + if (shard_map_changed) + { + shard_map->n_shards = i; + pg_atomic_add_fetch_u64(&shard_map->update_counter, 1); + } + } +} + /* * Module initialization function */