Load shardmap from postgresql.conf

This commit is contained in:
Konstantin Knizhnik
2023-11-09 18:58:19 +02:00
parent 039fa60446
commit e720b5f9d4
4 changed files with 76 additions and 145 deletions

View File

@@ -225,103 +225,6 @@ ErrorWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
}
static size_t
ResponseWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
{
appendBinaryStringInfo((StringInfo)userdata, ptr, size*nmemb);
return nmemb;
}
void
RequestShardMapFromControlPlane(ShardMap* shard_map)
{
shard_map->n_shards = 0;
if (!ConsoleURL)
{
elog(LOG, "ConsoleURL not set, skipping forwarding");
return;
}
StringInfoData resp;
initStringInfo(&resp);
curl_easy_setopt(CurlHandle, CURLOPT_CUSTOMREQUEST, "GET");
curl_easy_setopt(CurlHandle, CURLOPT_URL, ConsoleURL);
curl_easy_setopt(CurlHandle, CURLOPT_ERRORBUFFER, CurlErrorBuf);
curl_easy_setopt(CurlHandle, CURLOPT_TIMEOUT, 3L /* seconds */ );
curl_easy_setopt(CurlHandle, CURLOPT_WRITEDATA, &resp);
curl_easy_setopt(CurlHandle, CURLOPT_WRITEFUNCTION, ResponseWriteCallback);
const int num_retries = 5;
int curl_status;
for (int i = 0; i < num_retries; i++)
{
if ((curl_status = curl_easy_perform(CurlHandle)) == CURLE_OK)
break;
elog(LOG, "Curl request failed on attempt %d: %s", i, CurlErrorBuf);
pg_usleep(1000 * 1000);
}
if (curl_status != CURLE_OK)
{
curl_easy_cleanup(CurlHandle);
elog(ERROR, "Failed to perform curl request: %s", CurlErrorBuf);
}
else
{
long response_code;
if (curl_easy_getinfo(CurlHandle, CURLINFO_RESPONSE_CODE, &response_code) != CURLE_UNKNOWN_OPTION)
{
if (response_code != 200)
{
bool error_exists = resp.len != 0;
if(error_exists)
{
elog(ERROR,
"[PG_LLM] Received HTTP code %ld from OpenAI: %s",
response_code,
resp.data);
}
else
{
elog(ERROR,
"[PG_LLM] Received HTTP code %ld from OpenAI",
response_code);
}
}
}
curl_easy_cleanup(CurlHandle);
JsonbContainer *jsonb = (JsonbContainer *)DatumGetPointer(DirectFunctionCall1(jsonb_in, CStringGetDatum(resp.data)));
JsonbValue v;
JsonbIterator *it;
JsonbIteratorToken r;
it = JsonbIteratorInit(jsonb);
r = JsonbIteratorNext(&it, &v, true);
if (r != WJB_BEGIN_ARRAY)
elog(ERROR, "Array of connection strings expected");
while ((r = JsonbIteratorNext(&it, &v, true)) != WJB_DONE)
{
if (r != WJB_ELEM)
continue;
if (shard_map->n_shards >= MAX_SHARDS)
elog(ERROR, "Too many shards");
if (v.type != jbvString)
elog(ERROR, "Connection string expected");
strncpy(shard_map->shard_connstr[shard_map->n_shards++],
v.val.string.val,
MAX_PS_CONNSTR_LEN);
}
shard_map->update_counter += 1;
pfree(resp.data);
}
}
static void
SendDeltasToControlPlane()
{

View File

@@ -2,6 +2,5 @@
#define CONTROL_PLANE_CONNECTOR_H
void InitControlPlaneConnector();
void RequestShardMapFromControlPlane(ShardMap* shard_map);
#endif

View File

@@ -66,6 +66,18 @@ static shmem_startup_hook_type prev_shmem_startup_hook;
static shmem_request_hook_type prev_shmem_request_hook;
#endif
#define MAX_SHARDS 128
#define STRIPE_SIZE (256 * 1024 / 8) /* TODO: should in betaken from control plane? */
#define MAX_PS_CONNSTR_LEN 128
typedef struct
{
size_t n_shards;
size_t update_counter;
char shard_connstr[MAX_SHARDS][MAX_PS_CONNSTR_LEN];
} ShardMap;
static ShardMap* shard_map;
static LWLockId shard_map_lock;
static size_t shard_map_update_counter;
@@ -86,6 +98,7 @@ typedef struct
} PageServer;
static PageServer page_servers[MAX_SHARDS];
static shardno_t max_attached_shard_no;
static void
psm_shmem_startup(void)
@@ -133,11 +146,66 @@ psm_init(void)
#endif
}
/*
* Reload page map if needed and return number of shards and connection string for the specified shard
*/
static shardno_t
load_page_map(shardno_t shard_no, char* connstr)
{
shardno_t n_shards;
LWLockAcquire(shard_map_lock, LW_SHARED);
while (shard_map->n_shards == 0 || shard_map_update_counter != shard_map->update_counter)
{
/* Close all existed connections */
for (shardno_t i = 0; i < max_attached_shard_no; i++)
{
if (page_servers[i].conn)
pageserver_disconnect(i);
}
/* Request new shard map from control plane under exclusive lock */
LWLockRelease(shard_map_lock);
LWLockAcquire(shard_map_lock, LW_EXCLUSIVE);
if (shard_map->n_shards == 0)
{
char const* shard_connstr = page_server_connstring;
char const* sep;
size_t connstr_len;
do
{
sep = strchr(shard_connstr, ',');
connstr_len = sep != NULL ? sep - shard_connstr : strlen(shard_connstr);
if (shard_map->n_shards >= MAX_SHARDS)
elog(ERROR, "Too many shards");
if (connstr_len >= MAX_PS_CONNSTR_LEN)
elog(ERROR, "Connection string too long");
memcpy(shard_map->shard_connstr[shard_map->n_shards++], shard_connstr, connstr_len+1);
shard_connstr = sep + 1;
} while (sep != NULL);
shard_map->update_counter += 1;
}
shard_map_update_counter = shard_map->update_counter;
}
n_shards = shard_map->n_shards;
if (shard_no >= n_shards)
elog(ERROR, "Shard %d is greater or equal than number of shards %d", shard_no, n_shards);
if (connstr)
strcpy(connstr, shard_map->shard_connstr[shard_no]);
LWLockRelease(shard_map_lock);
return n_shards;
}
shardno_t
get_shard_number(BufferTag* tag)
{
shardno_t shard_no;
uint32 hash;
shardno_t n_shards = load_page_map(0, NULL);
uint32 hash;
#if PG_MAJORVERSION_NUM < 16
hash = murmurhash32(tag->rnode.spcNode);
@@ -151,38 +219,7 @@ get_shard_number(BufferTag* tag)
hash_combine(hash, murmurhash32(tag->blockNum/STRIPE_SIZE));
#endif
LWLockAcquire(shard_map_lock, LW_SHARED);
while (shard_map->n_shards == 0 || shard_map_update_counter != shard_map->update_counter)
{
/* Close all existed connections */
for (shard_no = 0; shard_no < shard_map->n_shards; shard_no++)
{
if (page_servers[shard_no].conn)
pageserver_disconnect(shard_no);
}
/* Request new shard map from control plane under exclusive lock */
LWLockRelease(shard_map_lock);
LWLockAcquire(shard_map_lock, LW_EXCLUSIVE);
if (shard_map->n_shards == 0)
{
if (*page_server_connstring)
{
shard_map->n_shards = 1;
strncpy(shard_map->shard_connstr[0], page_server_connstring, sizeof shard_map->shard_connstr[0]);
}
else
{
RequestShardMapFromControlPlane(shard_map);
}
shard_map_update_counter = shard_map->update_counter;
}
}
shard_no = hash % shard_map->n_shards;
LWLockRelease(shard_map_lock);
return shard_no;
return hash % n_shards;
}
static void
@@ -210,9 +247,12 @@ pageserver_connect(shardno_t shard_no, int elevel)
int n;
PGconn* conn;
WaitEventSet *wes;
char connstr[MAX_PS_CONNSTR_LEN];
Assert(page_servers[shard_no].conn == NULL);
(void)load_page_map(shard_no, connstr); /* refresh page map if needed */
/*
* Connect using the connection string we got from the
* neon.pageserver_connstring GUC. If the NEON_AUTH_TOKEN environment
@@ -231,7 +271,7 @@ pageserver_connect(shardno_t shard_no, int elevel)
n++;
}
keywords[n] = "dbname";
values[n] = shard_map->shard_connstr[shard_no];
values[n] = connstr;
n++;
keywords[n] = NULL;
values[n] = NULL;
@@ -296,6 +336,7 @@ pageserver_connect(shardno_t shard_no, int elevel)
neon_log(LOG, "libpagestore: connected to '%s'", shard_map->shard_connstr[shard_no]);
page_servers[shard_no].conn = conn;
page_servers[shard_no].wes = wes;
max_attached_shard_no = Max(shard_no+1, max_attached_shard_no);
return true;
}

View File

@@ -27,18 +27,6 @@
#include "pg_config.h"
#define MAX_SHARDS 128
#define STRIPE_SIZE (256 * 1024 / 8) /* TODO: should in betaken from control plane? */
#define MAX_PS_CONNSTR_LEN 128
typedef struct
{
size_t n_shards;
size_t update_counter;
char shard_connstr[MAX_SHARDS][MAX_PS_CONNSTR_LEN];
} ShardMap;
typedef enum
{
/* pagestore_client -> pagestore */