Impoleent PS communicator

This commit is contained in:
Konstantin Knizhnik
2025-02-16 18:14:34 +02:00
parent 646e011c4d
commit 782ad8fda4
3 changed files with 633 additions and 2389 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -54,7 +54,13 @@ typedef uint64 NeonRequestId;
typedef struct
{
NeonMessageTag tag;
NeonRequestId reqid;
union {
struct {
int procno; /* process number */
Buffer bufid; /* InvalidBuffer for prefetch */
} recepient;
NeonRequestId reqid; /* two fields above temporary replace reqid, just to preserve protocol */
} u;
XLogRecPtr lsn;
XLogRecPtr not_modified_since;
} NeonMessage;
@@ -140,6 +146,30 @@ typedef struct
int segno;
} NeonGetSlruSegmentRequest;
typedef union {
NeonRequest hdr;
NeonNblocksRequest exists;
NeonNblocksRequest nblocks;
NeonDbSizeRequest dbsize;
NeonGetPageRequest page;
NeonGetSlruSegmentRequest slru;
} NeonCommunicatorRequest;
typedef struct
{
NeonMessageTag tag;
int64 value;
} NeonCommunicatorResponse;
typedef struct
{
pg_atomic_uint64 write_pos;
pg_atomic_uint64 read_pos;
Latch latch;
NeonCommunicatorRequest* requests;
} NeonCommunicatorChannel;
/* supertype of all the Neon*Response structs below */
typedef NeonMessage NeonResponse;
@@ -161,6 +191,7 @@ typedef struct
char page[FLEXIBLE_ARRAY_MEMBER];
} NeonGetPageResponse;
#define PS_GETPAGERESPONSE_SIZE (MAXALIGN(offsetof(NeonGetPageResponse, page) + BLCKSZ))
typedef struct
@@ -184,50 +215,14 @@ typedef struct
} NeonGetSlruSegmentResponse;
extern StringInfoData nm_pack_request(NeonRequest *msg);
extern void nm_pack_request(StringInfo s, NeonRequest *msg);
extern NeonResponse *nm_unpack_response(StringInfo s);
extern char *nm_to_string(NeonMessage *msg);
/*
* API
*/
typedef uint16 shardno_t;
typedef struct
{
/*
* Send this request to the PageServer associated with this shard.
*/
bool (*send) (shardno_t shard_no, NeonRequest * request);
/*
* Blocking read for the next response of this shard.
*
* When a CANCEL signal is handled, the connection state will be
* unmodified.
*/
NeonResponse *(*receive) (shardno_t shard_no);
/*
* Try get the next response from the TCP buffers, if any.
* Returns NULL when the data is not yet available.
*/
NeonResponse *(*try_receive) (shardno_t shard_no);
/*
* Make sure all requests are sent to PageServer.
*/
bool (*flush) (shardno_t shard_no);
/*
* Disconnect from this pageserver shard.
*/
void (*disconnect) (shardno_t shard_no);
} page_server_api;
extern void prefetch_on_ps_disconnect(void);
extern page_server_api *page_server;
extern char *page_server_connstring;
extern int flush_every_n_requests;
extern int readahead_buffer_size;
extern char *neon_timeline;
extern char *neon_tenant;
@@ -319,4 +314,9 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
return lfc_writev(rinfo, forkNum, blkno, &buffer, 1);
}
extern void communicator_send_request(int shard, NeonCommunicationRequest* req);
extern int64 communicator_receive_response(void);
extern int64 communicator_request(int shard, NeonCommunicationRequest* req);
#endif

File diff suppressed because it is too large Load Diff