Compare commits

...

13 Commits

Author SHA1 Message Date
Konstantin Knizhnik
f9dbed33a7 Replace not-thread safe LWlocks in LFC with pthread sync primitives 2025-02-21 08:43:43 +02:00
Konstantin Knizhnik
d677e85a14 Undo atomics optimization 2025-02-20 14:53:08 +02:00
Konstantin Knizhnik
598463bffe Fix issues with prtefetch 2025-02-20 10:16:54 +02:00
Konstantin Knizhnik
6d84c4057d Fix issues with prtefetch 2025-02-20 10:14:51 +02:00
Konstantin Knizhnik
43497cd7c0 Save getpage result in LFC 2025-02-19 17:58:21 +02:00
Konstantin Knizhnik
bc07358034 First working version 2025-02-19 16:59:07 +02:00
Konstantin Knizhnik
426101e38f Use pthread cond instead of Postgres latch for notification of coordinator 2025-02-18 19:19:32 +02:00
Konstantin Knizhnik
914cc529ba Fix more bugs 2025-02-18 10:18:13 +02:00
Konstantin Knizhnik
06ebb42db5 Bug fixing 2025-02-17 22:32:49 +02:00
Konstantin Knizhnik
93c92eb170 Fix bugs 2025-02-16 21:46:58 +02:00
Konstantin Knizhnik
bd67d1b7c8 Fix bugs 2025-02-16 21:46:44 +02:00
Konstantin Knizhnik
cfb1b63351 Fix bugs 2025-02-16 21:46:24 +02:00
Konstantin Knizhnik
782ad8fda4 Impoleent PS communicator 2025-02-16 18:14:34 +02:00
6 changed files with 1394 additions and 3122 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -56,6 +56,7 @@ uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE;
uint32 WAIT_EVENT_NEON_LFC_READ;
uint32 WAIT_EVENT_NEON_LFC_TRUNCATE;
uint32 WAIT_EVENT_NEON_LFC_WRITE;
uint32 WAIT_EVENT_NEON_LFC_CV_WAIT;
uint32 WAIT_EVENT_NEON_PS_STARTING;
uint32 WAIT_EVENT_NEON_PS_CONFIGURING;
uint32 WAIT_EVENT_NEON_PS_SEND;
@@ -538,6 +539,7 @@ neon_shmem_startup_hook(void)
WAIT_EVENT_NEON_LFC_READ = WaitEventExtensionNew("Neon/FileCache_Read");
WAIT_EVENT_NEON_LFC_TRUNCATE = WaitEventExtensionNew("Neon/FileCache_Truncate");
WAIT_EVENT_NEON_LFC_WRITE = WaitEventExtensionNew("Neon/FileCache_Write");
WAIT_EVENT_NEON_LFC_CV_WAIT = WaitEventExtensionNew("Neon/FileCache_CvWait");
WAIT_EVENT_NEON_PS_STARTING = WaitEventExtensionNew("Neon/PS_Starting");
WAIT_EVENT_NEON_PS_CONFIGURING = WaitEventExtensionNew("Neon/PS_Configuring");
WAIT_EVENT_NEON_PS_SEND = WaitEventExtensionNew("Neon/PS_SendIO");

View File

@@ -28,6 +28,7 @@ extern uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE;
extern uint32 WAIT_EVENT_NEON_LFC_READ;
extern uint32 WAIT_EVENT_NEON_LFC_TRUNCATE;
extern uint32 WAIT_EVENT_NEON_LFC_WRITE;
extern uint32 WAIT_EVENT_NEON_LFC_CV_WAIT;
extern uint32 WAIT_EVENT_NEON_PS_STARTING;
extern uint32 WAIT_EVENT_NEON_PS_CONFIGURING;
extern uint32 WAIT_EVENT_NEON_PS_SEND;
@@ -38,6 +39,7 @@ extern uint32 WAIT_EVENT_NEON_WAL_DL;
#define WAIT_EVENT_NEON_LFC_READ WAIT_EVENT_BUFFILE_READ
#define WAIT_EVENT_NEON_LFC_TRUNCATE WAIT_EVENT_BUFFILE_TRUNCATE
#define WAIT_EVENT_NEON_LFC_WRITE WAIT_EVENT_BUFFILE_WRITE
#define WAIT_EVENT_NEON_LFC_CV_WAIT WAIT_EVENT_BUFFILE_READ
#define WAIT_EVENT_NEON_PS_STARTING PG_WAIT_EXTENSION
#define WAIT_EVENT_NEON_PS_CONFIGURING PG_WAIT_EXTENSION
#define WAIT_EVENT_NEON_PS_SEND PG_WAIT_EXTENSION

View File

@@ -50,11 +50,18 @@ typedef enum
typedef uint64 NeonRequestId;
/* base struct for c-style inheritance */
typedef struct
{
NeonMessageTag tag;
NeonRequestId reqid;
union {
struct {
int procno; /* process number */
Buffer bufid; /* InvalidBuffer for prefetch */
} recepient;
NeonRequestId reqid; /* two fields above temporary replace reqid, just to preserve protocol */
} u;
XLogRecPtr lsn;
XLogRecPtr not_modified_since;
} NeonMessage;
@@ -140,6 +147,30 @@ typedef struct
int segno;
} NeonGetSlruSegmentRequest;
typedef union {
NeonRequest hdr;
NeonNblocksRequest exists;
NeonNblocksRequest nblocks;
NeonDbSizeRequest dbsize;
NeonGetPageRequest page;
NeonGetSlruSegmentRequest slru;
} NeonCommunicatorRequest;
typedef struct
{
NeonMessageTag tag;
int64 value;
} NeonCommunicatorResponse;
typedef struct
{
uint64 write_pos;
pthread_mutex_t mutex;
pthread_cond_t cond;
NeonCommunicatorRequest* requests;
} NeonCommunicatorChannel;
/* supertype of all the Neon*Response structs below */
typedef NeonMessage NeonResponse;
@@ -161,6 +192,7 @@ typedef struct
char page[FLEXIBLE_ARRAY_MEMBER];
} NeonGetPageResponse;
#define PS_GETPAGERESPONSE_SIZE (MAXALIGN(offsetof(NeonGetPageResponse, page) + BLCKSZ))
typedef struct
@@ -184,57 +216,22 @@ typedef struct
} NeonGetSlruSegmentResponse;
extern StringInfoData nm_pack_request(NeonRequest *msg);
extern void nm_pack_request(StringInfo s, NeonRequest *msg);
extern NeonResponse *nm_unpack_response(StringInfo s);
extern char *nm_to_string(NeonMessage *msg);
/*
* API
*/
typedef uint16 shardno_t;
typedef struct
{
/*
* Send this request to the PageServer associated with this shard.
*/
bool (*send) (shardno_t shard_no, NeonRequest * request);
/*
* Blocking read for the next response of this shard.
*
* When a CANCEL signal is handled, the connection state will be
* unmodified.
*/
NeonResponse *(*receive) (shardno_t shard_no);
/*
* Try get the next response from the TCP buffers, if any.
* Returns NULL when the data is not yet available.
*/
NeonResponse *(*try_receive) (shardno_t shard_no);
/*
* Make sure all requests are sent to PageServer.
*/
bool (*flush) (shardno_t shard_no);
/*
* Disconnect from this pageserver shard.
*/
void (*disconnect) (shardno_t shard_no);
} page_server_api;
extern void prefetch_on_ps_disconnect(void);
extern page_server_api *page_server;
extern char *page_server_connstring;
extern int flush_every_n_requests;
extern int readahead_buffer_size;
extern char *neon_timeline;
extern char *neon_tenant;
extern int32 max_cluster_size;
extern int neon_protocol_version;
extern bool lfc_store_prefetch_result;
extern shardno_t get_shard_number(BufferTag* tag);
extern int get_shard_number(NRelFileInfo rinfo, BlockNumber blocknum);
extern const f_smgr *smgr_neon(ProcNumber backend, NRelFileInfo rinfo);
extern void smgr_init_neon(void);
@@ -301,14 +298,17 @@ extern bool lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno);
extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, int nblocks, bits8 *bitmap);
extern void lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno);
extern void lfc_init(void);
extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
const void* buffer, XLogRecPtr lsn);
extern bool lfc_enabled(void);
static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
void *buffer)
{
bits8 rv = 0;
bits8 rv = 1;
return lfc_readv_select(rinfo, forkNum, blkno, &buffer, 1, &rv) == 1;
}
@@ -319,4 +319,9 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
return lfc_writev(rinfo, forkNum, blkno, &buffer, 1);
}
extern void communicator_send_request(int shard, NeonCommunicatorRequest* req);
extern int64 communicator_receive_response(void);
extern int64 communicator_request(int shard, NeonCommunicatorRequest* req);
#endif

File diff suppressed because it is too large Load Diff