From f07c33186a0810e454440957405612be4cba96cf Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 20 Mar 2024 15:34:34 +0200 Subject: [PATCH] Add neon.protocol_version GUC --- libs/pageserver_api/src/models.rs | 13 ++++++++--- pgxn/neon/libpagestore.c | 10 ++++++++ pgxn/neon/pagestore_client.h | 8 +++++++ pgxn/neon/pagestore_smgr.c | 24 +++++++++++--------- test_runner/regress/test_protocol_version.py | 9 ++++++++ 5 files changed, 50 insertions(+), 14 deletions(-) create mode 100644 test_runner/regress/test_protocol_version.py diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 80026e13f0..f8e6328588 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -980,17 +980,24 @@ impl PagestreamFeMessage { // TODO: consider using protobuf or serde bincode for less error prone // serialization. let mut msg_tag = body.read_u8()?; + // + // Old version of protocol use commands with tags started with 0 and containing `latest` flag. + // New version of protocol shift command tags by 10 and pass LSN range instead of `latest` flag. + // Server should be able to handle both protocol version. As far as we are not passing no=w, + // protocol version from client to server, we make a decision based on tag range. + // So this code actually provides backward compatibility. + // let horizon = if msg_tag >= 10 { // new protocol - msg_tag -= 10; + msg_tag -= 10; // commands tags in new protocol starts with 10 Lsn::from(body.read_u64::()?) } else { // old_protocol let latest = body.read_u8()? != 0; if latest { - Lsn::MAX + Lsn::MAX // get latest version } else { - Lsn::INVALID + Lsn::INVALID // get version on specified LSN } }; let lsn = Lsn::from(body.read_u64::()?); diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 2276b4e807..edf9a8b5e3 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -49,6 +49,8 @@ char *neon_auth_token; int readahead_buffer_size = 128; int flush_every_n_requests = 8; +int neon_protocol_version; + static int n_reconnect_attempts = 0; static int max_reconnect_attempts = 60; static int stripe_size; @@ -844,6 +846,14 @@ pg_init_libpagestore(void) PGC_USERSET, 0, /* no flags required */ NULL, (GucIntAssignHook) &readahead_buffer_resize, NULL); + DefineCustomIntVariable("neon.protocol_version", + "Version of compute<->page server protocol", + NULL, + &neon_protocol_version, + NEON_PROTOCOL_VERSION, 1, INT_MAX, + PGC_USERSET, + 0, /* no flags required */ + NULL, NULL, NULL); relsize_hash_init(); diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 4e4315c433..a0e053886e 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -28,6 +28,13 @@ #define MAX_SHARDS 128 #define MAX_PAGESERVER_CONNSTRING_SIZE 256 +/* + * Right now protocal version is not set to the server. + * So it is ciritical that format of existed commands is not changed. + * New protocl versions can just add new commands. + */ +#define NEON_PROTOCOL_VERSION 2 + typedef enum { /* pagestore_client -> pagestore */ @@ -193,6 +200,7 @@ extern int readahead_buffer_size; extern char *neon_timeline; extern char *neon_tenant; extern int32 max_cluster_size; +extern int neon_protocol_version; extern shardno_t get_shard_number(BufferTag* tag); diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 8b32224796..34cbf7c4b9 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -1013,7 +1013,19 @@ nm_pack_request(NeonRequest *msg) StringInfoData s; initStringInfo(&s); - pq_sendbyte(&s, msg->tag); + + if (neon_protocol_version >= 2) + { + pq_sendbyte(&s, msg->tag); + pq_sendint64(&s, msg->horizon); + } + else + { + /* Old protocol with latest flag */ + pq_sendbyte(&s, msg->tag - T_NeonExistsRequest); /* old protocol command tags start from zero */ + pq_sendbyte(&s, msg->horizon == MAX_LSN); + } + pq_sendint64(&s, msg->lsn); switch (messageTag(msg)) { @@ -1022,8 +1034,6 @@ nm_pack_request(NeonRequest *msg) { NeonExistsRequest *msg_req = (NeonExistsRequest *) msg; - pq_sendint64(&s, msg_req->req.horizon); - pq_sendint64(&s, msg_req->req.lsn); pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo)); pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo)); pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo)); @@ -1035,8 +1045,6 @@ nm_pack_request(NeonRequest *msg) { NeonNblocksRequest *msg_req = (NeonNblocksRequest *) msg; - pq_sendint64(&s, msg_req->req.horizon); - pq_sendint64(&s, msg_req->req.lsn); pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo)); pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo)); pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo)); @@ -1048,8 +1056,6 @@ nm_pack_request(NeonRequest *msg) { NeonDbSizeRequest *msg_req = (NeonDbSizeRequest *) msg; - pq_sendint64(&s, msg_req->req.horizon); - pq_sendint64(&s, msg_req->req.lsn); pq_sendint32(&s, msg_req->dbNode); break; @@ -1058,8 +1064,6 @@ nm_pack_request(NeonRequest *msg) { NeonGetPageRequest *msg_req = (NeonGetPageRequest *) msg; - pq_sendint64(&s, msg_req->req.horizon); - pq_sendint64(&s, msg_req->req.lsn); pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo)); pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo)); pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo)); @@ -1073,8 +1077,6 @@ nm_pack_request(NeonRequest *msg) { NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg; - pq_sendint64(&s, msg_req->req.horizon); - pq_sendint64(&s, msg_req->req.lsn); pq_sendbyte(&s, msg_req->kind); pq_sendint32(&s, msg_req->segno); diff --git a/test_runner/regress/test_protocol_version.py b/test_runner/regress/test_protocol_version.py new file mode 100644 index 0000000000..af1ca756dd --- /dev/null +++ b/test_runner/regress/test_protocol_version.py @@ -0,0 +1,9 @@ +from fixtures.neon_fixtures import NeonEnv + + +def test_protocol_version(neon_simple_env: NeonEnv): + env = neon_simple_env + endpoint = env.endpoints.create_start("main", config_lines=["neon.protocol_version=1"]) + cur = endpoint.connect().cursor() + cur.execute("show neon.protocol_version") + assert cur.fetchone() == ('1',)