From e720b5f9d4950f6fb7c4d90cc5e0a684d240c0f4 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 9 Nov 2023 18:58:19 +0200 Subject: [PATCH] Load shardmap from postgresql.conf --- pgxn/neon/control_plane_connector.c | 97 ------------------------ pgxn/neon/control_plane_connector.h | 1 - pgxn/neon/libpagestore.c | 111 +++++++++++++++++++--------- pgxn/neon/pagestore_client.h | 12 --- 4 files changed, 76 insertions(+), 145 deletions(-) diff --git a/pgxn/neon/control_plane_connector.c b/pgxn/neon/control_plane_connector.c index 99d7656dba..0a6a6d9bf5 100644 --- a/pgxn/neon/control_plane_connector.c +++ b/pgxn/neon/control_plane_connector.c @@ -225,103 +225,6 @@ ErrorWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata) } -static size_t -ResponseWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata) -{ - appendBinaryStringInfo((StringInfo)userdata, ptr, size*nmemb); - return nmemb; -} - -void -RequestShardMapFromControlPlane(ShardMap* shard_map) -{ - shard_map->n_shards = 0; - if (!ConsoleURL) - { - elog(LOG, "ConsoleURL not set, skipping forwarding"); - return; - } - StringInfoData resp; - initStringInfo(&resp); - - curl_easy_setopt(CurlHandle, CURLOPT_CUSTOMREQUEST, "GET"); - curl_easy_setopt(CurlHandle, CURLOPT_URL, ConsoleURL); - curl_easy_setopt(CurlHandle, CURLOPT_ERRORBUFFER, CurlErrorBuf); - curl_easy_setopt(CurlHandle, CURLOPT_TIMEOUT, 3L /* seconds */ ); - curl_easy_setopt(CurlHandle, CURLOPT_WRITEDATA, &resp); - curl_easy_setopt(CurlHandle, CURLOPT_WRITEFUNCTION, ResponseWriteCallback); - - const int num_retries = 5; - int curl_status; - - for (int i = 0; i < num_retries; i++) - { - if ((curl_status = curl_easy_perform(CurlHandle)) == CURLE_OK) - break; - elog(LOG, "Curl request failed on attempt %d: %s", i, CurlErrorBuf); - pg_usleep(1000 * 1000); - } - if (curl_status != CURLE_OK) - { - curl_easy_cleanup(CurlHandle); - elog(ERROR, "Failed to perform curl request: %s", CurlErrorBuf); - } - else - { - long response_code; - if (curl_easy_getinfo(CurlHandle, CURLINFO_RESPONSE_CODE, &response_code) != CURLE_UNKNOWN_OPTION) - { - if (response_code != 200) - { - bool error_exists = resp.len != 0; - if(error_exists) - { - elog(ERROR, - "[PG_LLM] Received HTTP code %ld from OpenAI: %s", - response_code, - resp.data); - } - else - { - elog(ERROR, - "[PG_LLM] Received HTTP code %ld from OpenAI", - response_code); - } - } - } - curl_easy_cleanup(CurlHandle); - - JsonbContainer *jsonb = (JsonbContainer *)DatumGetPointer(DirectFunctionCall1(jsonb_in, CStringGetDatum(resp.data))); - JsonbValue v; - JsonbIterator *it; - JsonbIteratorToken r; - - it = JsonbIteratorInit(jsonb); - r = JsonbIteratorNext(&it, &v, true); - if (r != WJB_BEGIN_ARRAY) - elog(ERROR, "Array of connection strings expected"); - - while ((r = JsonbIteratorNext(&it, &v, true)) != WJB_DONE) - { - if (r != WJB_ELEM) - continue; - - if (shard_map->n_shards >= MAX_SHARDS) - elog(ERROR, "Too many shards"); - - if (v.type != jbvString) - elog(ERROR, "Connection string expected"); - - strncpy(shard_map->shard_connstr[shard_map->n_shards++], - v.val.string.val, - MAX_PS_CONNSTR_LEN); - } - shard_map->update_counter += 1; - pfree(resp.data); - } -} - - static void SendDeltasToControlPlane() { diff --git a/pgxn/neon/control_plane_connector.h b/pgxn/neon/control_plane_connector.h index 9c6e2b42a5..12d6a97562 100644 --- a/pgxn/neon/control_plane_connector.h +++ b/pgxn/neon/control_plane_connector.h @@ -2,6 +2,5 @@ #define CONTROL_PLANE_CONNECTOR_H void InitControlPlaneConnector(); -void RequestShardMapFromControlPlane(ShardMap* shard_map); #endif diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index d876214236..633083809c 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -66,6 +66,18 @@ static shmem_startup_hook_type prev_shmem_startup_hook; static shmem_request_hook_type prev_shmem_request_hook; #endif +#define MAX_SHARDS 128 +#define STRIPE_SIZE (256 * 1024 / 8) /* TODO: should in betaken from control plane? */ +#define MAX_PS_CONNSTR_LEN 128 + +typedef struct +{ + size_t n_shards; + size_t update_counter; + char shard_connstr[MAX_SHARDS][MAX_PS_CONNSTR_LEN]; +} ShardMap; + + static ShardMap* shard_map; static LWLockId shard_map_lock; static size_t shard_map_update_counter; @@ -86,6 +98,7 @@ typedef struct } PageServer; static PageServer page_servers[MAX_SHARDS]; +static shardno_t max_attached_shard_no; static void psm_shmem_startup(void) @@ -133,11 +146,66 @@ psm_init(void) #endif } +/* + * Reload page map if needed and return number of shards and connection string for the specified shard + */ +static shardno_t +load_page_map(shardno_t shard_no, char* connstr) +{ + shardno_t n_shards; + + LWLockAcquire(shard_map_lock, LW_SHARED); + + while (shard_map->n_shards == 0 || shard_map_update_counter != shard_map->update_counter) + { + /* Close all existed connections */ + for (shardno_t i = 0; i < max_attached_shard_no; i++) + { + if (page_servers[i].conn) + pageserver_disconnect(i); + } + + /* Request new shard map from control plane under exclusive lock */ + LWLockRelease(shard_map_lock); + LWLockAcquire(shard_map_lock, LW_EXCLUSIVE); + if (shard_map->n_shards == 0) + { + char const* shard_connstr = page_server_connstring; + char const* sep; + size_t connstr_len; + do + { + sep = strchr(shard_connstr, ','); + connstr_len = sep != NULL ? sep - shard_connstr : strlen(shard_connstr); + if (shard_map->n_shards >= MAX_SHARDS) + elog(ERROR, "Too many shards"); + if (connstr_len >= MAX_PS_CONNSTR_LEN) + elog(ERROR, "Connection string too long"); + memcpy(shard_map->shard_connstr[shard_map->n_shards++], shard_connstr, connstr_len+1); + shard_connstr = sep + 1; + } while (sep != NULL); + + shard_map->update_counter += 1; + } + shard_map_update_counter = shard_map->update_counter; + } + n_shards = shard_map->n_shards; + if (shard_no >= n_shards) + elog(ERROR, "Shard %d is greater or equal than number of shards %d", shard_no, n_shards); + + if (connstr) + strcpy(connstr, shard_map->shard_connstr[shard_no]); + + LWLockRelease(shard_map_lock); + + return n_shards; +} + shardno_t get_shard_number(BufferTag* tag) { - shardno_t shard_no; - uint32 hash; + shardno_t n_shards = load_page_map(0, NULL); + uint32 hash; #if PG_MAJORVERSION_NUM < 16 hash = murmurhash32(tag->rnode.spcNode); @@ -151,38 +219,7 @@ get_shard_number(BufferTag* tag) hash_combine(hash, murmurhash32(tag->blockNum/STRIPE_SIZE)); #endif - LWLockAcquire(shard_map_lock, LW_SHARED); - while (shard_map->n_shards == 0 || shard_map_update_counter != shard_map->update_counter) - { - /* Close all existed connections */ - for (shard_no = 0; shard_no < shard_map->n_shards; shard_no++) - { - if (page_servers[shard_no].conn) - pageserver_disconnect(shard_no); - } - - /* Request new shard map from control plane under exclusive lock */ - LWLockRelease(shard_map_lock); - LWLockAcquire(shard_map_lock, LW_EXCLUSIVE); - if (shard_map->n_shards == 0) - { - if (*page_server_connstring) - { - shard_map->n_shards = 1; - strncpy(shard_map->shard_connstr[0], page_server_connstring, sizeof shard_map->shard_connstr[0]); - } - else - { - RequestShardMapFromControlPlane(shard_map); - } - shard_map_update_counter = shard_map->update_counter; - } - } - shard_no = hash % shard_map->n_shards; - - LWLockRelease(shard_map_lock); - - return shard_no; + return hash % n_shards; } static void @@ -210,9 +247,12 @@ pageserver_connect(shardno_t shard_no, int elevel) int n; PGconn* conn; WaitEventSet *wes; + char connstr[MAX_PS_CONNSTR_LEN]; Assert(page_servers[shard_no].conn == NULL); + (void)load_page_map(shard_no, connstr); /* refresh page map if needed */ + /* * Connect using the connection string we got from the * neon.pageserver_connstring GUC. If the NEON_AUTH_TOKEN environment @@ -231,7 +271,7 @@ pageserver_connect(shardno_t shard_no, int elevel) n++; } keywords[n] = "dbname"; - values[n] = shard_map->shard_connstr[shard_no]; + values[n] = connstr; n++; keywords[n] = NULL; values[n] = NULL; @@ -296,6 +336,7 @@ pageserver_connect(shardno_t shard_no, int elevel) neon_log(LOG, "libpagestore: connected to '%s'", shard_map->shard_connstr[shard_no]); page_servers[shard_no].conn = conn; page_servers[shard_no].wes = wes; + max_attached_shard_no = Max(shard_no+1, max_attached_shard_no); return true; } diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index f7ec50f0b8..b0d03c8cfc 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -27,18 +27,6 @@ #include "pg_config.h" -#define MAX_SHARDS 128 -#define STRIPE_SIZE (256 * 1024 / 8) /* TODO: should in betaken from control plane? */ -#define MAX_PS_CONNSTR_LEN 128 - -typedef struct -{ - size_t n_shards; - size_t update_counter; - char shard_connstr[MAX_SHARDS][MAX_PS_CONNSTR_LEN]; -} ShardMap; - - typedef enum { /* pagestore_client -> pagestore */