mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-25 00:50:36 +00:00
Address review comments
This commit is contained in:
@@ -37,7 +37,6 @@
|
||||
#include "neon.h"
|
||||
#include "walproposer.h"
|
||||
#include "neon_utils.h"
|
||||
#include "control_plane_connector.h"
|
||||
|
||||
#define PageStoreTrace DEBUG5
|
||||
|
||||
@@ -62,12 +61,31 @@ bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) =
|
||||
static bool pageserver_flush(shardno_t shard_no);
|
||||
static void pageserver_disconnect(shardno_t shard_no);
|
||||
static void AssignPageserverConnstring(const char *newval, void *extra);
|
||||
static bool CheckPageserverConnstring(char **newval, void **extra, GucSource source);
|
||||
|
||||
static shmem_startup_hook_type prev_shmem_startup_hook;
|
||||
#if PG_VERSION_NUM>=150000
|
||||
static shmem_request_hook_type prev_shmem_request_hook;
|
||||
#endif
|
||||
|
||||
/*
|
||||
* ShardMap is kept in shared memory. It contains the connection strings for
|
||||
* each shard.
|
||||
*
|
||||
* There is "neon.pageserver_connstring" GUC with PGC_SIGHUP option, allowing to change it using
|
||||
* pg_reload_conf(). It is used by control plane to update shards information if page server is crashed,
|
||||
* relocated or new shards are added. This GUC variable contains comma separated list of connection strings.
|
||||
* It is copied to shared memory because config can not be loaded during query execution and we need to
|
||||
* reestablish connection to page server.
|
||||
*
|
||||
* So usually copying connection string to shared memory is done by postmaster. And other backends
|
||||
* should check update counter to determine of connection URL is changed and connection needs to be reestablished.
|
||||
*
|
||||
* But at startup shared memory is not yet initialized and so we need to copy in some other process.
|
||||
* Moreover, we can not use standard Postgres LW-locks, because postmaster has proc entry and so can not wait
|
||||
* on this primitive. This is why lockless access algorithm is implemented using two atomic counters to enforce
|
||||
* consistent reading of connection string value from shared memory.
|
||||
*/
|
||||
typedef struct
|
||||
{
|
||||
size_t n_shards;
|
||||
@@ -88,7 +106,7 @@ typedef struct
|
||||
PGconn *conn;
|
||||
/*
|
||||
* WaitEventSet containing:
|
||||
* - WL_SOCKET_READABLE on pageserver_conn,
|
||||
* - WL_SOCKET_READABLE on 'conn'
|
||||
* - WL_LATCH_SET on MyLatch, and
|
||||
* - WL_EXIT_ON_PM_DEATH.
|
||||
*/
|
||||
@@ -146,6 +164,8 @@ psm_init(void)
|
||||
|
||||
/*
|
||||
* Reload page map if needed and return number of shards and connection string for the specified shard
|
||||
* 'connstr' is an output buffer. If not NULL, it must point to a buffer at least MAX_PS_CONNSTR_LEN bytes
|
||||
* long. The connection string for the gven shard is copied to it.
|
||||
*/
|
||||
static shardno_t
|
||||
load_shard_map(shardno_t shard_no, char* connstr)
|
||||
@@ -168,7 +188,14 @@ load_shard_map(shardno_t shard_no, char* connstr)
|
||||
neon_log(ERROR, "Shard %d is greater or equal than number of shards %d", shard_no, n_shards);
|
||||
|
||||
if (connstr)
|
||||
strncpy(connstr, shard_map->shard_connstr[shard_no], MAX_PS_CONNSTR_LEN);
|
||||
{
|
||||
/*
|
||||
* We need to use strlcpy here because due to race condition string oin shared memory
|
||||
* may be not zero terminated.
|
||||
*/
|
||||
strlcpy(connstr, shard_map->shard_connstr[shard_no], MAX_PS_CONNSTR_LEN);
|
||||
pg_memory_barrier();
|
||||
}
|
||||
|
||||
}
|
||||
while (begin_update_counter != end_update_counter
|
||||
@@ -546,17 +573,56 @@ check_neon_id(char **newval, void **extra, GucSource source)
|
||||
return **newval == '\0' || HexDecodeString(id, *newval, 16);
|
||||
}
|
||||
|
||||
static bool
|
||||
CheckPageserverConnstring(char **newval, void **extra, GucSource source)
|
||||
{
|
||||
const char* shard_connstr = *newval;
|
||||
const char* sep;
|
||||
size_t connstr_len;
|
||||
int i = 0;
|
||||
do
|
||||
{
|
||||
sep = strchr(shard_connstr, ',');
|
||||
connstr_len = sep != NULL ? sep - shard_connstr : strlen(shard_connstr);
|
||||
if (connstr_len == 0)
|
||||
break; /* trailing comma */
|
||||
if (i >= MAX_SHARDS)
|
||||
{
|
||||
neon_log(LOG, "Too many shards");
|
||||
return false;
|
||||
}
|
||||
if (connstr_len >= MAX_PS_CONNSTR_LEN)
|
||||
{
|
||||
neon_log(LOG, "Connection string too long");
|
||||
return false;
|
||||
}
|
||||
shard_connstr = sep + 1;
|
||||
i += 1;
|
||||
} while (sep != NULL);
|
||||
|
||||
if (i == 0)
|
||||
{
|
||||
neon_log(LOG, "No shards were specified");
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
static void
|
||||
AssignPageserverConnstring(const char *newval, void *extra)
|
||||
{
|
||||
/*
|
||||
* Load shard map only at Postmaster.
|
||||
* If old page server is not available, then backends can be blocked in attempts to reconnect to it and do not reload config in this loop
|
||||
*
|
||||
* Copying GUC value to shared memory is usually performed by postmaster. But in case of startup,
|
||||
* shared memory is not yet initialized. So it has to be performed by any other process.
|
||||
* It is not a problem if more than one process do this initialization.
|
||||
*/
|
||||
if (shard_map != NULL && UsedShmemSegAddr != NULL && (MyProcPid == PostmasterPid || shard_map->n_shards == 0))
|
||||
{
|
||||
char const* shard_connstr = newval;
|
||||
char const* sep;
|
||||
const char* shard_connstr = newval;
|
||||
const char* sep;
|
||||
size_t connstr_len;
|
||||
int i = 0;
|
||||
bool shard_map_changed = false;
|
||||
@@ -566,16 +632,8 @@ AssignPageserverConnstring(const char *newval, void *extra)
|
||||
connstr_len = sep != NULL ? sep - shard_connstr : strlen(shard_connstr);
|
||||
if (connstr_len == 0)
|
||||
break; /* trailing comma */
|
||||
if (i >= MAX_SHARDS)
|
||||
{
|
||||
neon_log(LOG, "Too many shards");
|
||||
return;
|
||||
}
|
||||
if (connstr_len >= MAX_PS_CONNSTR_LEN)
|
||||
{
|
||||
neon_log(LOG, "Connection string too long");
|
||||
return;
|
||||
}
|
||||
Assert(i < MAX_SHARDS);
|
||||
Assert(connstr_len < MAX_PS_CONNSTR_LEN);
|
||||
if (i >= shard_map->n_shards ||
|
||||
strcmp(shard_map->shard_connstr[i], shard_connstr) != 0)
|
||||
{
|
||||
@@ -590,14 +648,11 @@ AssignPageserverConnstring(const char *newval, void *extra)
|
||||
i += 1;
|
||||
} while (sep != NULL);
|
||||
|
||||
if (i == 0)
|
||||
{
|
||||
neon_log(LOG, "No shards were specified");
|
||||
return;
|
||||
}
|
||||
Assert(i > 0);
|
||||
if (shard_map_changed)
|
||||
{
|
||||
shard_map->n_shards = i;
|
||||
pg_memory_barrier();
|
||||
pg_atomic_add_fetch_u64(&shard_map->end_update_counter, 1);
|
||||
}
|
||||
}
|
||||
@@ -616,7 +671,7 @@ pg_init_libpagestore(void)
|
||||
"",
|
||||
PGC_SIGHUP,
|
||||
0, /* no flags required */
|
||||
NULL, AssignPageserverConnstring, NULL);
|
||||
CheckPageserverConnstring, AssignPageserverConnstring, NULL);
|
||||
|
||||
DefineCustomStringVariable("neon.timeline_id",
|
||||
"Neon timeline_id the server is running on",
|
||||
|
||||
@@ -953,8 +953,8 @@ page_server_request(void const *req)
|
||||
|
||||
|
||||
/*
|
||||
* TODO: temporary workarround - we stream all WAL only to shard 0, so metadata and forks other than main
|
||||
* should be requested from shard 0. We still need to call get_shard_no() to check if shard map is up-to-date
|
||||
* Current sharding model assumes that all metadata is present only at shard 0.
|
||||
* We still need to call get_shard_no() to check if shard map is up-to-date.
|
||||
*/
|
||||
if (((NeonRequest *) req)->tag != T_NeonGetPageRequest || ((NeonGetPageRequest *) req)->forknum != MAIN_FORKNUM)
|
||||
{
|
||||
@@ -2861,7 +2861,7 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
#define FSM_TREE_DEPTH ((SlotsPerFSMPage >= 1626) ? 3 : 4)
|
||||
|
||||
/*
|
||||
* TODO: May be it is better to make correspondent fgunctio from freespace.c public?
|
||||
* TODO: May be it is better to make correspondent function from freespace.c public?
|
||||
*/
|
||||
static BlockNumber
|
||||
get_fsm_physical_block(BlockNumber heapblk)
|
||||
|
||||
Reference in New Issue
Block a user