Add support for PS sharding in compute (#6205)

refer #5508

replaces #5837

## Problem

This PR implements sharding support at compute side. Relations are
splinted in stripes and `get_page` requests are redirected to the
particular shard where stripe is located. All other requests (i.e. get
relation or database size) are always send to shard 0.

## Summary of changes

Support of sharding at compute side include three things:
1. Make it possible to specify and change in runtime connection to more
retain one page server
2. Send `get_page` request to the particular shard (determined by hash
of page key)
3. Support multiple servers in prefetch ring requests

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
Co-authored-by: John Spray <john@neon.tech>
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
This commit is contained in:
Konstantin Knizhnik
2024-01-25 15:53:31 +02:00
committed by GitHub
parent 463b6a26b5
commit 19ed230708
3 changed files with 381 additions and 149 deletions

View File

@@ -15,6 +15,7 @@
#include "postgres.h"
#include "access/xlog.h"
#include "common/hashfn.h"
#include "fmgr.h"
#include "libpq-fe.h"
#include "libpq/libpq.h"
@@ -38,17 +39,6 @@
#define MIN_RECONNECT_INTERVAL_USEC 1000
#define MAX_RECONNECT_INTERVAL_USEC 1000000
bool connected = false;
PGconn *pageserver_conn = NULL;
/*
* WaitEventSet containing:
* - WL_SOCKET_READABLE on pageserver_conn,
* - WL_LATCH_SET on MyLatch, and
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *pageserver_conn_wes = NULL;
/* GUCs */
char *neon_timeline;
char *neon_tenant;
@@ -59,17 +49,25 @@ char *neon_auth_token;
int readahead_buffer_size = 128;
int flush_every_n_requests = 8;
static int n_reconnect_attempts = 0;
static int max_reconnect_attempts = 60;
static int n_reconnect_attempts = 0;
static int max_reconnect_attempts = 60;
static int stripe_size;
#define MAX_PAGESERVER_CONNSTRING_SIZE 256
typedef struct
{
char connstring[MAX_SHARDS][MAX_PAGESERVER_CONNSTRING_SIZE];
size_t num_shards;
} ShardMap;
/*
* PagestoreShmemState is kept in shared memory. It contains the connection
* strings for each shard.
*
* The "neon.pageserver_connstring" GUC is marked with the PGC_SIGHUP option,
* allowing it to be changed using pg_reload_conf(). The control plane can
* update the connection string if the pageserver crashes, is relocated, or
* new shards are added. A copy of the current value of the GUC is kept in
* shared memory, updated by the postmaster, because regular backends don't
* new shards are added. A parsed copy of the current value of the GUC is kept
* in shared memory, updated by the postmaster, because regular backends don't
* reload the config during query execution, but we might need to re-establish
* the pageserver connection with the new connection string even in the middle
* of a query.
@@ -84,7 +82,7 @@ typedef struct
{
pg_atomic_uint64 begin_update_counter;
pg_atomic_uint64 end_update_counter;
char pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];
ShardMap shard_map;
} PagestoreShmemState;
#if PG_VERSION_NUM >= 150000
@@ -94,10 +92,25 @@ static void walproposer_shmem_request(void);
static shmem_startup_hook_type prev_shmem_startup_hook;
static PagestoreShmemState *pagestore_shared;
static uint64 pagestore_local_counter = 0;
static char local_pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];
static bool pageserver_flush(void);
static void pageserver_disconnect(void);
/* This backend's per-shard connections */
typedef struct
{
PGconn *conn;
/*---
* WaitEventSet containing:
* - WL_SOCKET_READABLE on 'conn'
* - WL_LATCH_SET on MyLatch, and
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *wes;
} PageServer;
static PageServer page_servers[MAX_SHARDS];
static bool pageserver_flush(shardno_t shard_no);
static void pageserver_disconnect(shardno_t shard_no);
static bool
PagestoreShmemIsValid(void)
@@ -105,89 +118,213 @@ PagestoreShmemIsValid(void)
return pagestore_shared && UsedShmemSegAddr;
}
/*
* Parse a comma-separated list of connection strings into a ShardMap.
*
* If 'result' is NULL, just checks that the input is valid. If the input is
* not valid, returns false. The contents of *result are undefined in
* that case, and must not be relied on.
*/
static bool
ParseShardMap(const char *connstr, ShardMap *result)
{
const char *p;
int nshards = 0;
if (result)
memset(result, 0, sizeof(ShardMap));
p = connstr;
nshards = 0;
for (;;)
{
const char *sep;
size_t connstr_len;
sep = strchr(p, ',');
connstr_len = sep != NULL ? sep - p : strlen(p);
if (connstr_len == 0 && sep == NULL)
break; /* ignore trailing comma */
if (nshards >= MAX_SHARDS)
{
neon_log(LOG, "Too many shards");
return false;
}
if (connstr_len >= MAX_PAGESERVER_CONNSTRING_SIZE)
{
neon_log(LOG, "Connection string too long");
return false;
}
if (result)
{
memcpy(result->connstring[nshards], p, connstr_len);
result->connstring[nshards][connstr_len] = '\0';
}
nshards++;
if (sep == NULL)
break;
p = sep + 1;
}
if (result)
result->num_shards = nshards;
return true;
}
static bool
CheckPageserverConnstring(char **newval, void **extra, GucSource source)
{
return strlen(*newval) < MAX_PAGESERVER_CONNSTRING_SIZE;
char *p = *newval;
return ParseShardMap(p, NULL);
}
static void
AssignPageserverConnstring(const char *newval, void *extra)
{
ShardMap shard_map;
/*
* Only postmaster updates the copy in shared memory.
*/
if (!PagestoreShmemIsValid() || IsUnderPostmaster)
return;
pg_atomic_add_fetch_u64(&pagestore_shared->begin_update_counter, 1);
pg_write_barrier();
strlcpy(pagestore_shared->pageserver_connstring, newval, MAX_PAGESERVER_CONNSTRING_SIZE);
pg_write_barrier();
pg_atomic_add_fetch_u64(&pagestore_shared->end_update_counter, 1);
}
static bool
CheckConnstringUpdated(void)
{
if (!PagestoreShmemIsValid())
return false;
return pagestore_local_counter < pg_atomic_read_u64(&pagestore_shared->begin_update_counter);
if (!ParseShardMap(newval, &shard_map))
{
/*
* shouldn't happen, because we already checked the value in
* CheckPageserverConnstring
*/
elog(ERROR, "could not parse shard map");
}
if (memcmp(&pagestore_shared->shard_map, &shard_map, sizeof(ShardMap)) != 0)
{
pg_atomic_add_fetch_u64(&pagestore_shared->begin_update_counter, 1);
pg_write_barrier();
memcpy(&pagestore_shared->shard_map, &shard_map, sizeof(ShardMap));
pg_write_barrier();
pg_atomic_add_fetch_u64(&pagestore_shared->end_update_counter, 1);
}
else
{
/* no change */
}
}
/*
* Get the current number of shards, and/or the connection string for a
* particular shard from the shard map in shared memory.
*
* If num_shards_p is not NULL, it is set to the current number of shards.
*
* If connstr_p is not NULL, the connection string for 'shard_no' is copied to
* it. It must point to a buffer at least MAX_PAGESERVER_CONNSTRING_SIZE bytes
* long.
*
* As a side-effect, if the shard map in shared memory had changed since the
* last call, terminates all existing connections to all pageservers.
*/
static void
ReloadConnstring(void)
load_shard_map(shardno_t shard_no, char *connstr_p, shardno_t *num_shards_p)
{
uint64 begin_update_counter;
uint64 end_update_counter;
if (!PagestoreShmemIsValid())
return;
ShardMap *shard_map = &pagestore_shared->shard_map;
shardno_t num_shards;
/*
* Copy the current settnig from shared to local memory. Postmaster can
* update the value concurrently, in which case we would copy a garbled
* mix of the old and new values. We will detect it because the counter's
* won't match, and retry. But it's important that we don't do anything
* within the retry-loop that would depend on the string having valid
* contents.
* Postmaster can update the shared memory values concurrently, in which
* case we would copy a garbled mix of the old and new values. We will
* detect it because the counter's won't match, and retry. But it's
* important that we don't do anything within the retry-loop that would
* depend on the string having valid contents.
*/
do
{
begin_update_counter = pg_atomic_read_u64(&pagestore_shared->begin_update_counter);
end_update_counter = pg_atomic_read_u64(&pagestore_shared->end_update_counter);
pg_read_barrier();
strlcpy(local_pageserver_connstring, pagestore_shared->pageserver_connstring, sizeof(local_pageserver_connstring));
pg_read_barrier();
num_shards = shard_map->num_shards;
if (connstr_p && shard_no < MAX_SHARDS)
strlcpy(connstr_p, shard_map->connstring[shard_no], MAX_PAGESERVER_CONNSTRING_SIZE);
pg_memory_barrier();
}
while (begin_update_counter != end_update_counter
|| begin_update_counter != pg_atomic_read_u64(&pagestore_shared->begin_update_counter)
|| end_update_counter != pg_atomic_read_u64(&pagestore_shared->end_update_counter));
pagestore_local_counter = end_update_counter;
if (connstr_p && shard_no >= num_shards)
neon_log(ERROR, "Shard %d is greater or equal than number of shards %d",
shard_no, num_shards);
/*
* If any of the connection strings changed, reset all connections.
*/
if (pagestore_local_counter != end_update_counter)
{
for (shardno_t i = 0; i < MAX_SHARDS; i++)
{
if (page_servers[i].conn)
pageserver_disconnect(i);
}
pagestore_local_counter = end_update_counter;
}
if (num_shards_p)
*num_shards_p = num_shards;
}
#define MB (1024*1024)
shardno_t
get_shard_number(BufferTag *tag)
{
shardno_t n_shards;
uint32 hash;
load_shard_map(0, NULL, &n_shards);
#if PG_MAJORVERSION_NUM < 16
hash = murmurhash32(tag->rnode.relNode);
hash = hash_combine(hash, murmurhash32(tag->blockNum / stripe_size));
#else
hash = murmurhash32(tag->relNumber);
hash = hash_combine(hash, murmurhash32(tag->blockNum / stripe_size));
#endif
return hash % n_shards;
}
static bool
pageserver_connect(int elevel)
pageserver_connect(shardno_t shard_no, int elevel)
{
char *query;
int ret;
const char *keywords[3];
const char *values[3];
int n;
PGconn *conn;
WaitEventSet *wes;
char connstr[MAX_PAGESERVER_CONNSTRING_SIZE];
static TimestampTz last_connect_time = 0;
static uint64_t delay_us = MIN_RECONNECT_INTERVAL_USEC;
TimestampTz now;
uint64_t us_since_last_connect;
Assert(!connected);
Assert(page_servers[shard_no].conn == NULL);
if (CheckConnstringUpdated())
{
ReloadConnstring();
}
/*
* Get the connection string for this shard. If the shard map has been
* updated since we last looked, this will also disconnect any existing
* pageserver connections as a side effect.
*/
load_shard_map(shard_no, connstr, NULL);
now = GetCurrentTimestamp();
us_since_last_connect = now - last_connect_time;
@@ -223,76 +360,84 @@ pageserver_connect(int elevel)
n++;
}
keywords[n] = "dbname";
values[n] = local_pageserver_connstring;
values[n] = connstr;
n++;
keywords[n] = NULL;
values[n] = NULL;
n++;
pageserver_conn = PQconnectdbParams(keywords, values, 1);
conn = PQconnectdbParams(keywords, values, 1);
if (PQstatus(pageserver_conn) == CONNECTION_BAD)
if (PQstatus(conn) == CONNECTION_BAD)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
char *msg = pchomp(PQerrorMessage(conn));
PQfinish(pageserver_conn);
pageserver_conn = NULL;
PQfinish(conn);
ereport(elevel,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
errmsg(NEON_TAG "could not establish connection to pageserver"),
errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", shard_no),
errdetail_internal("%s", msg)));
pfree(msg);
return false;
}
query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
ret = PQsendQuery(pageserver_conn, query);
ret = PQsendQuery(conn, query);
pfree(query);
if (ret != 1)
{
PQfinish(pageserver_conn);
pageserver_conn = NULL;
neon_log(elevel, "could not send pagestream command to pageserver");
PQfinish(conn);
neon_shard_log(shard_no, elevel, "could not send pagestream command to pageserver");
return false;
}
pageserver_conn_wes = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(pageserver_conn_wes, WL_LATCH_SET, PGINVALID_SOCKET,
wes = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(pageserver_conn_wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
AddWaitEventToSet(wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
AddWaitEventToSet(pageserver_conn_wes, WL_SOCKET_READABLE, PQsocket(pageserver_conn), NULL, NULL);
AddWaitEventToSet(wes, WL_SOCKET_READABLE, PQsocket(conn), NULL, NULL);
while (PQisBusy(pageserver_conn))
PG_TRY();
{
WaitEvent event;
/* Sleep until there's something to do */
(void) WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Data available in socket? */
if (event.events & WL_SOCKET_READABLE)
while (PQisBusy(conn))
{
if (!PQconsumeInput(pageserver_conn))
WaitEvent event;
/* Sleep until there's something to do */
(void) WaitEventSetWait(wes, -1L, &event, 1, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Data available in socket? */
if (event.events & WL_SOCKET_READABLE)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
if (!PQconsumeInput(conn))
{
char *msg = pchomp(PQerrorMessage(conn));
PQfinish(pageserver_conn);
pageserver_conn = NULL;
FreeWaitEventSet(pageserver_conn_wes);
pageserver_conn_wes = NULL;
PQfinish(conn);
FreeWaitEventSet(wes);
neon_log(elevel, "could not complete handshake with pageserver: %s",
msg);
return false;
neon_shard_log(shard_no, elevel, "could not complete handshake with pageserver: %s",
msg);
return false;
}
}
}
}
PG_CATCH();
{
PQfinish(conn);
FreeWaitEventSet(wes);
PG_RE_THROW();
}
PG_END_TRY();
neon_log(LOG, "libpagestore: connected to '%s'", page_server_connstring);
neon_shard_log(shard_no, LOG, "libpagestore: connected to '%s'", connstr);
page_servers[shard_no].conn = conn;
page_servers[shard_no].wes = wes;
connected = true;
return true;
}
@@ -300,9 +445,10 @@ pageserver_connect(int elevel)
* A wrapper around PQgetCopyData that checks for interrupts while sleeping.
*/
static int
call_PQgetCopyData(char **buffer)
call_PQgetCopyData(shardno_t shard_no, char **buffer)
{
int ret;
PGconn *pageserver_conn = page_servers[shard_no].conn;
retry:
ret = PQgetCopyData(pageserver_conn, buffer, 1 /* async */ );
@@ -312,7 +458,7 @@ retry:
WaitEvent event;
/* Sleep until there's something to do */
(void) WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
(void) WaitEventSetWait(page_servers[shard_no].wes, -1L, &event, 1, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
@@ -324,7 +470,7 @@ retry:
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
neon_log(LOG, "could not get response from pageserver: %s", msg);
neon_shard_log(shard_no, LOG, "could not get response from pageserver: %s", msg);
pfree(msg);
return -1;
}
@@ -338,7 +484,7 @@ retry:
static void
pageserver_disconnect(void)
pageserver_disconnect(shardno_t shard_no)
{
/*
* If anything goes wrong while we were sending a request, it's not clear
@@ -347,38 +493,38 @@ pageserver_disconnect(void)
* time later after we have already sent a new unrelated request. Close
* the connection to avoid getting confused.
*/
if (connected)
if (page_servers[shard_no].conn)
{
neon_log(LOG, "dropping connection to page server due to error");
PQfinish(pageserver_conn);
pageserver_conn = NULL;
connected = false;
neon_shard_log(shard_no, LOG, "dropping connection to page server due to error");
PQfinish(page_servers[shard_no].conn);
page_servers[shard_no].conn = NULL;
/*
* If the connection to any pageserver is lost, we throw away the
* whole prefetch queue, even for other pageservers. It should not
* cause big problems, because connection loss is supposed to be a
* rare event.
*/
prefetch_on_ps_disconnect();
}
if (pageserver_conn_wes != NULL)
if (page_servers[shard_no].wes != NULL)
{
FreeWaitEventSet(pageserver_conn_wes);
pageserver_conn_wes = NULL;
FreeWaitEventSet(page_servers[shard_no].wes);
page_servers[shard_no].wes = NULL;
}
}
static bool
pageserver_send(NeonRequest *request)
pageserver_send(shardno_t shard_no, NeonRequest *request)
{
StringInfoData req_buff;
if (CheckConnstringUpdated())
{
pageserver_disconnect();
ReloadConnstring();
}
PGconn *pageserver_conn = page_servers[shard_no].conn;
/* If the connection was lost for some reason, reconnect */
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD)
if (pageserver_conn && PQstatus(pageserver_conn) == CONNECTION_BAD)
{
neon_log(LOG, "pageserver_send disconnect bad connection");
pageserver_disconnect();
neon_shard_log(shard_no, LOG, "pageserver_send disconnect bad connection");
pageserver_disconnect(shard_no);
}
req_buff = nm_pack_request(request);
@@ -392,9 +538,9 @@ pageserver_send(NeonRequest *request)
* https://github.com/neondatabase/neon/issues/1138 So try to reestablish
* connection in case of failure.
*/
if (!connected)
if (!page_servers[shard_no].conn)
{
while (!pageserver_connect(n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
while (!pageserver_connect(shard_no, n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
{
HandleMainLoopInterrupts();
n_reconnect_attempts += 1;
@@ -402,6 +548,8 @@ pageserver_send(NeonRequest *request)
n_reconnect_attempts = 0;
}
pageserver_conn = page_servers[shard_no].conn;
/*
* Send request.
*
@@ -414,8 +562,8 @@ pageserver_send(NeonRequest *request)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect();
neon_log(LOG, "pageserver_send disconnect because failed to send page request (try to reconnect): %s", msg);
pageserver_disconnect(shard_no);
neon_shard_log(shard_no, LOG, "pageserver_send disconnect because failed to send page request (try to reconnect): %s", msg);
pfree(msg);
pfree(req_buff.data);
return false;
@@ -427,19 +575,20 @@ pageserver_send(NeonRequest *request)
{
char *msg = nm_to_string((NeonMessage *) request);
neon_log(PageStoreTrace, "sent request: %s", msg);
neon_shard_log(shard_no, PageStoreTrace, "sent request: %s", msg);
pfree(msg);
}
return true;
}
static NeonResponse *
pageserver_receive(void)
pageserver_receive(shardno_t shard_no)
{
StringInfoData resp_buff;
NeonResponse *resp;
PGconn *pageserver_conn = page_servers[shard_no].conn;
if (!connected)
if (!pageserver_conn)
return NULL;
PG_TRY();
@@ -447,7 +596,7 @@ pageserver_receive(void)
/* read response */
int rc;
rc = call_PQgetCopyData(&resp_buff.data);
rc = call_PQgetCopyData(shard_no, &resp_buff.data);
if (rc >= 0)
{
resp_buff.len = rc;
@@ -459,33 +608,33 @@ pageserver_receive(void)
{
char *msg = nm_to_string((NeonMessage *) resp);
neon_log(PageStoreTrace, "got response: %s", msg);
neon_shard_log(shard_no, PageStoreTrace, "got response: %s", msg);
pfree(msg);
}
}
else if (rc == -1)
{
neon_log(LOG, "pageserver_receive disconnect because call_PQgetCopyData returns -1: %s", pchomp(PQerrorMessage(pageserver_conn)));
pageserver_disconnect();
neon_shard_log(shard_no, LOG, "pageserver_receive disconnect because call_PQgetCopyData returns -1: %s", pchomp(PQerrorMessage(pageserver_conn)));
pageserver_disconnect(shard_no);
resp = NULL;
}
else if (rc == -2)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect();
neon_log(ERROR, "pageserver_receive disconnect because could not read COPY data: %s", msg);
pageserver_disconnect(shard_no);
neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect because could not read COPY data: %s", msg);
}
else
{
pageserver_disconnect();
neon_log(ERROR, "pageserver_receive disconnect because unexpected PQgetCopyData return value: %d", rc);
pageserver_disconnect(shard_no);
neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect because unexpected PQgetCopyData return value: %d", rc);
}
}
PG_CATCH();
{
neon_log(LOG, "pageserver_receive disconnect due to caught exception");
pageserver_disconnect();
neon_shard_log(shard_no, LOG, "pageserver_receive disconnect due to caught exception");
pageserver_disconnect(shard_no);
PG_RE_THROW();
}
PG_END_TRY();
@@ -495,11 +644,13 @@ pageserver_receive(void)
static bool
pageserver_flush(void)
pageserver_flush(shardno_t shard_no)
{
if (!connected)
PGconn *pageserver_conn = page_servers[shard_no].conn;
if (!pageserver_conn)
{
neon_log(WARNING, "Tried to flush while disconnected");
neon_shard_log(shard_no, WARNING, "Tried to flush while disconnected");
}
else
{
@@ -507,8 +658,8 @@ pageserver_flush(void)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect();
neon_log(LOG, "pageserver_flush disconnect because failed to flush page requests: %s", msg);
pageserver_disconnect(shard_no);
neon_shard_log(shard_no, LOG, "pageserver_flush disconnect because failed to flush page requests: %s", msg);
pfree(msg);
return false;
}
@@ -550,6 +701,7 @@ PagestoreShmemInit(void)
{
pg_atomic_init_u64(&pagestore_shared->begin_update_counter, 0);
pg_atomic_init_u64(&pagestore_shared->end_update_counter, 0);
memset(&pagestore_shared->shard_map, 0, sizeof(ShardMap));
AssignPageserverConnstring(page_server_connstring, NULL);
}
LWLockRelease(AddinShmemInitLock);
@@ -624,6 +776,15 @@ pg_init_libpagestore(void)
0, /* no flags required */
check_neon_id, NULL, NULL);
DefineCustomIntVariable("neon.stripe_size",
"sharding stripe size",
NULL,
&stripe_size,
32768, 1, INT_MAX,
PGC_SIGHUP,
GUC_UNIT_BLOCKS,
NULL, NULL, NULL);
DefineCustomIntVariable("neon.max_cluster_size",
"cluster size limit",
NULL,

View File

@@ -20,9 +20,13 @@
#include "lib/stringinfo.h"
#include "libpq/pqformat.h"
#include "storage/block.h"
#include "storage/buf_internals.h"
#include "storage/smgr.h"
#include "utils/memutils.h"
#define MAX_SHARDS 128
#define MAX_PAGESERVER_CONNSTRING_SIZE 256
typedef enum
{
/* pagestore_client -> pagestore */
@@ -51,6 +55,9 @@ typedef struct
#define neon_log(tag, fmt, ...) ereport(tag, \
(errmsg(NEON_TAG fmt, ##__VA_ARGS__), \
errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0)))
#define neon_shard_log(shard_no, tag, fmt, ...) ereport(tag, \
(errmsg(NEON_TAG "[shard %d] " fmt, shard_no, ##__VA_ARGS__), \
errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0)))
/*
* supertype of all the Neon*Request structs below
@@ -141,11 +148,13 @@ extern char *nm_to_string(NeonMessage *msg);
* API
*/
typedef unsigned shardno_t;
typedef struct
{
bool (*send) (NeonRequest *request);
NeonResponse *(*receive) (void);
bool (*flush) (void);
bool (*send) (shardno_t shard_no, NeonRequest * request);
NeonResponse *(*receive) (shardno_t shard_no);
bool (*flush) (shardno_t shard_no);
} page_server_api;
extern void prefetch_on_ps_disconnect(void);
@@ -159,6 +168,8 @@ extern char *neon_timeline;
extern char *neon_tenant;
extern int32 max_cluster_size;
extern shardno_t get_shard_number(BufferTag* tag);
extern const f_smgr *smgr_neon(BackendId backend, NRelFileInfo rinfo);
extern void smgr_init_neon(void);
extern void readahead_buffer_resize(int newsize, void *extra);

View File

@@ -172,6 +172,7 @@ typedef struct PrefetchRequest
XLogRecPtr actual_request_lsn;
NeonResponse *response; /* may be null */
PrefetchStatus status;
shardno_t shard_no;
uint64 my_ring_index;
} PrefetchRequest;
@@ -239,10 +240,17 @@ typedef struct PrefetchState
* also unused */
/* the buffers */
prfh_hash *prf_hash;
prfh_hash *prf_hash;
int max_shard_no;
/* Mark shards involved in prefetch */
uint8 shard_bitmap[(MAX_SHARDS + 7)/8];
PrefetchRequest prf_buffer[]; /* prefetch buffers */
} PrefetchState;
#define BITMAP_ISSET(bm, bit) ((bm)[(bit) >> 3] & (1 << ((bit) & 7)))
#define BITMAP_SET(bm, bit) (bm)[(bit) >> 3] |= (1 << ((bit) & 7))
#define BITMAP_CLR(bm, bit) (bm)[(bit) >> 3] &= ~(1 << ((bit) & 7))
static PrefetchState *MyPState;
#define GetPrfSlot(ring_index) ( \
@@ -327,6 +335,7 @@ compact_prefetch_buffers(void)
Assert(target_slot->status == PRFS_UNUSED);
target_slot->buftag = source_slot->buftag;
target_slot->shard_no = source_slot->shard_no;
target_slot->status = source_slot->status;
target_slot->response = source_slot->response;
target_slot->effective_request_lsn = source_slot->effective_request_lsn;
@@ -494,6 +503,23 @@ prefetch_cleanup_trailing_unused(void)
}
}
static bool
prefetch_flush_requests(void)
{
for (shardno_t shard_no = 0; shard_no < MyPState->max_shard_no; shard_no++)
{
if (BITMAP_ISSET(MyPState->shard_bitmap, shard_no))
{
if (!page_server->flush(shard_no))
return false;
BITMAP_CLR(MyPState->shard_bitmap, shard_no);
}
}
MyPState->max_shard_no = 0;
return true;
}
/*
* Wait for slot of ring_index to have received its response.
* The caller is responsible for making sure the request buffer is flushed.
@@ -509,7 +535,7 @@ prefetch_wait_for(uint64 ring_index)
if (MyPState->ring_flush <= ring_index &&
MyPState->ring_unused > MyPState->ring_flush)
{
if (!page_server->flush())
if (!prefetch_flush_requests())
return false;
MyPState->ring_flush = MyPState->ring_unused;
}
@@ -547,7 +573,7 @@ prefetch_read(PrefetchRequest *slot)
Assert(slot->my_ring_index == MyPState->ring_receive);
old = MemoryContextSwitchTo(MyPState->errctx);
response = (NeonResponse *) page_server->receive();
response = (NeonResponse *) page_server->receive(slot->shard_no);
MemoryContextSwitchTo(old);
if (response)
{
@@ -704,12 +730,14 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_unused);
while (!page_server->send((NeonRequest *) &request));
while (!page_server->send(slot->shard_no, (NeonRequest *) &request));
/* update prefetch state */
MyPState->n_requests_inflight += 1;
MyPState->n_unused -= 1;
MyPState->ring_unused += 1;
BITMAP_SET(MyPState->shard_bitmap, slot->shard_no);
MyPState->max_shard_no = Max(slot->shard_no+1, MyPState->max_shard_no);
/* update slot state */
slot->status = PRFS_REQUESTED;
@@ -880,6 +908,7 @@ Retry:
* function reads the buffer tag from the slot.
*/
slot->buftag = tag;
slot->shard_no = get_shard_number(&tag);
slot->my_ring_index = ring_index;
prefetch_do_request(slot, force_latest, force_lsn);
@@ -890,7 +919,7 @@ Retry:
if (flush_every_n_requests > 0 &&
MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
{
if (!page_server->flush())
if (!prefetch_flush_requests())
{
/*
* Prefetch set is reset in case of error, so we should try to
@@ -908,13 +937,44 @@ static NeonResponse *
page_server_request(void const *req)
{
NeonResponse *resp;
BufferTag tag = {0};
shardno_t shard_no;
switch (((NeonRequest *) req)->tag)
{
case T_NeonExistsRequest:
CopyNRelFileInfoToBufTag(tag, ((NeonExistsRequest *) req)->rinfo);
break;
case T_NeonNblocksRequest:
CopyNRelFileInfoToBufTag(tag, ((NeonNblocksRequest *) req)->rinfo);
break;
case T_NeonDbSizeRequest:
NInfoGetDbOid(BufTagGetNRelFileInfo(tag)) = ((NeonDbSizeRequest *) req)->dbNode;
break;
case T_NeonGetPageRequest:
CopyNRelFileInfoToBufTag(tag, ((NeonGetPageRequest *) req)->rinfo);
tag.blockNum = ((NeonGetPageRequest *) req)->blkno;
break;
default:
neon_log(ERROR, "Unexpected request tag: %d", ((NeonRequest *) req)->tag);
}
shard_no = get_shard_number(&tag);
/*
* Current sharding model assumes that all metadata is present only at shard 0.
* We still need to call get_shard_no() to check if shard map is up-to-date.
*/
if (((NeonRequest *) req)->tag != T_NeonGetPageRequest || ((NeonGetPageRequest *) req)->forknum != MAIN_FORKNUM)
{
shard_no = 0;
}
do
{
while (!page_server->send((NeonRequest *) req) || !page_server->flush());
MyPState->ring_flush = MyPState->ring_unused;
while (!page_server->send(shard_no, (NeonRequest *) req) || !page_server->flush(shard_no));
consume_prefetch_responses();
resp = page_server->receive();
resp = page_server->receive(shard_no);
} while (resp == NULL);
return resp;
@@ -2098,8 +2158,8 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
case T_NeonErrorResponse:
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X",
blkno,
errmsg(NEON_TAG "[shard %d] could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X",
slot->shard_no, blkno,
RelFileInfoFmt(rinfo),
forkNum,
(uint32) (request_lsn >> 32), (uint32) request_lsn),