Add neon.protocol_version GUC

This commit is contained in:
Konstantin Knizhnik
2024-03-20 15:34:34 +02:00
parent 15c0e1351a
commit f07c33186a
5 changed files with 50 additions and 14 deletions

View File

@@ -980,17 +980,24 @@ impl PagestreamFeMessage {
// TODO: consider using protobuf or serde bincode for less error prone
// serialization.
let mut msg_tag = body.read_u8()?;
//
// Old version of protocol use commands with tags started with 0 and containing `latest` flag.
// New version of protocol shift command tags by 10 and pass LSN range instead of `latest` flag.
// Server should be able to handle both protocol version. As far as we are not passing no=w,
// protocol version from client to server, we make a decision based on tag range.
// So this code actually provides backward compatibility.
//
let horizon = if msg_tag >= 10 {
// new protocol
msg_tag -= 10;
msg_tag -= 10; // commands tags in new protocol starts with 10
Lsn::from(body.read_u64::<BigEndian>()?)
} else {
// old_protocol
let latest = body.read_u8()? != 0;
if latest {
Lsn::MAX
Lsn::MAX // get latest version
} else {
Lsn::INVALID
Lsn::INVALID // get version on specified LSN
}
};
let lsn = Lsn::from(body.read_u64::<BigEndian>()?);

View File

@@ -49,6 +49,8 @@ char *neon_auth_token;
int readahead_buffer_size = 128;
int flush_every_n_requests = 8;
int neon_protocol_version;
static int n_reconnect_attempts = 0;
static int max_reconnect_attempts = 60;
static int stripe_size;
@@ -844,6 +846,14 @@ pg_init_libpagestore(void)
PGC_USERSET,
0, /* no flags required */
NULL, (GucIntAssignHook) &readahead_buffer_resize, NULL);
DefineCustomIntVariable("neon.protocol_version",
"Version of compute<->page server protocol",
NULL,
&neon_protocol_version,
NEON_PROTOCOL_VERSION, 1, INT_MAX,
PGC_USERSET,
0, /* no flags required */
NULL, NULL, NULL);
relsize_hash_init();

View File

@@ -28,6 +28,13 @@
#define MAX_SHARDS 128
#define MAX_PAGESERVER_CONNSTRING_SIZE 256
/*
* Right now protocal version is not set to the server.
* So it is ciritical that format of existed commands is not changed.
* New protocl versions can just add new commands.
*/
#define NEON_PROTOCOL_VERSION 2
typedef enum
{
/* pagestore_client -> pagestore */
@@ -193,6 +200,7 @@ extern int readahead_buffer_size;
extern char *neon_timeline;
extern char *neon_tenant;
extern int32 max_cluster_size;
extern int neon_protocol_version;
extern shardno_t get_shard_number(BufferTag* tag);

View File

@@ -1013,7 +1013,19 @@ nm_pack_request(NeonRequest *msg)
StringInfoData s;
initStringInfo(&s);
pq_sendbyte(&s, msg->tag);
if (neon_protocol_version >= 2)
{
pq_sendbyte(&s, msg->tag);
pq_sendint64(&s, msg->horizon);
}
else
{
/* Old protocol with latest flag */
pq_sendbyte(&s, msg->tag - T_NeonExistsRequest); /* old protocol command tags start from zero */
pq_sendbyte(&s, msg->horizon == MAX_LSN);
}
pq_sendint64(&s, msg->lsn);
switch (messageTag(msg))
{
@@ -1022,8 +1034,6 @@ nm_pack_request(NeonRequest *msg)
{
NeonExistsRequest *msg_req = (NeonExistsRequest *) msg;
pq_sendint64(&s, msg_req->req.horizon);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
@@ -1035,8 +1045,6 @@ nm_pack_request(NeonRequest *msg)
{
NeonNblocksRequest *msg_req = (NeonNblocksRequest *) msg;
pq_sendint64(&s, msg_req->req.horizon);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
@@ -1048,8 +1056,6 @@ nm_pack_request(NeonRequest *msg)
{
NeonDbSizeRequest *msg_req = (NeonDbSizeRequest *) msg;
pq_sendint64(&s, msg_req->req.horizon);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, msg_req->dbNode);
break;
@@ -1058,8 +1064,6 @@ nm_pack_request(NeonRequest *msg)
{
NeonGetPageRequest *msg_req = (NeonGetPageRequest *) msg;
pq_sendint64(&s, msg_req->req.horizon);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
@@ -1073,8 +1077,6 @@ nm_pack_request(NeonRequest *msg)
{
NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg;
pq_sendint64(&s, msg_req->req.horizon);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendbyte(&s, msg_req->kind);
pq_sendint32(&s, msg_req->segno);

View File

@@ -0,0 +1,9 @@
from fixtures.neon_fixtures import NeonEnv
def test_protocol_version(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint = env.endpoints.create_start("main", config_lines=["neon.protocol_version=1"])
cur = endpoint.connect().cursor()
cur.execute("show neon.protocol_version")
assert cur.fetchone() == ('1',)