Refotmat pgxn code, add typedefs.list that was used

This commit is contained in:
Kirill Bulatov
2022-09-13 23:58:27 +03:00
committed by Kirill Bulatov
parent ba8698bbcb
commit 260ec20a02
13 changed files with 4691 additions and 797 deletions

View File

@@ -188,10 +188,10 @@ inmem_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
{
/*
* We assume the buffer cache is large enough to hold all the buffers
* needed for most operations. Overflowing to this "in-mem smgr" in rare
* cases is OK. But if we find that we're using more than WARN_PAGES,
* print a warning so that we get alerted and get to investigate why
* we're accessing so many buffers.
* needed for most operations. Overflowing to this "in-mem smgr" in
* rare cases is OK. But if we find that we're using more than
* WARN_PAGES, print a warning so that we get alerted and get to
* investigate why we're accessing so many buffers.
*/
elog(used_pages >= WARN_PAGES ? WARNING : DEBUG1,
"inmem_write() called for %u/%u/%u.%u blk %u: used_pages %u",
@@ -207,7 +207,9 @@ inmem_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
pg = used_pages;
used_pages++;
INIT_BUFFERTAG(page_tag[pg], reln->smgr_rnode.node, forknum, blocknum);
} else {
}
else
{
elog(DEBUG1, "inmem_write() called for %u/%u/%u.%u blk %u: found at %u",
reln->smgr_rnode.node.spcNode,
reln->smgr_rnode.node.dbNode,
@@ -226,14 +228,14 @@ BlockNumber
inmem_nblocks(SMgrRelation reln, ForkNumber forknum)
{
/*
* It's not clear why a WAL redo function would call smgrnblocks().
* During recovery, at least before reaching consistency, the size of a
* relation could be arbitrarily small, if it was truncated after the
* record being replayed, or arbitrarily large if it was extended
* afterwards. But one place where it's called is in
* XLogReadBufferExtended(): it extends the relation, if it's smaller than
* the requested page. That's a waste of time in the WAL redo
* process. Pretend that all relations are maximally sized to avoid it.
* It's not clear why a WAL redo function would call smgrnblocks(). During
* recovery, at least before reaching consistency, the size of a relation
* could be arbitrarily small, if it was truncated after the record being
* replayed, or arbitrarily large if it was extended afterwards. But one
* place where it's called is in XLogReadBufferExtended(): it extends the
* relation, if it's smaller than the requested page. That's a waste of
* time in the WAL redo process. Pretend that all relations are maximally
* sized to avoid it.
*/
return MaxBlockNumber;
}

View File

@@ -153,11 +153,11 @@ static void
pageserver_disconnect(void)
{
/*
* If anything goes wrong while we were sending a request, it's not
* clear what state the connection is in. For example, if we sent the
* request but didn't receive a response yet, we might receive the
* response some time later after we have already sent a new unrelated
* request. Close the connection to avoid getting confused.
* If anything goes wrong while we were sending a request, it's not clear
* what state the connection is in. For example, if we sent the request
* but didn't receive a response yet, we might receive the response some
* time later after we have already sent a new unrelated request. Close
* the connection to avoid getting confused.
*/
if (connected)
{
@@ -191,12 +191,13 @@ pageserver_send(ZenithRequest *request)
*
* In principle, this could block if the output buffer is full, and we
* should use async mode and check for interrupts while waiting. In
* practice, our requests are small enough to always fit in the output
* and TCP buffer.
* practice, our requests are small enough to always fit in the output and
* TCP buffer.
*/
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
{
char* msg = PQerrorMessage(pageserver_conn);
char *msg = PQerrorMessage(pageserver_conn);
pageserver_disconnect();
neon_log(ERROR, "failed to send page request: %s", msg);
}
@@ -205,6 +206,7 @@ pageserver_send(ZenithRequest *request)
if (message_level_is_interesting(PageStoreTrace))
{
char *msg = zm_to_string((ZenithMessage *) request);
neon_log(PageStoreTrace, "sent request: %s", msg);
pfree(msg);
}
@@ -255,15 +257,16 @@ static void
pageserver_flush(void)
{
if (PQflush(pageserver_conn))
{
char* msg = PQerrorMessage(pageserver_conn);
{
char *msg = PQerrorMessage(pageserver_conn);
pageserver_disconnect();
neon_log(ERROR, "failed to flush page requests: %s", msg);
}
}
static ZenithResponse *
pageserver_call(ZenithRequest* request)
pageserver_call(ZenithRequest *request)
{
pageserver_send(request);
pageserver_flush();

View File

@@ -7,38 +7,40 @@
/* Header in walproposer.h -- Wrapper struct to abstract away the libpq connection */
struct WalProposerConn
{
PGconn* pg_conn;
bool is_nonblocking; /* whether the connection is non-blocking */
char *recvbuf; /* last received data from libpqprop_async_read */
PGconn *pg_conn;
bool is_nonblocking; /* whether the connection is non-blocking */
char *recvbuf; /* last received data from
* libpqprop_async_read */
};
/* Prototypes for exported functions */
static char* libpqprop_error_message(WalProposerConn* conn);
static WalProposerConnStatusType libpqprop_status(WalProposerConn* conn);
static WalProposerConn* libpqprop_connect_start(char* conninfo);
static WalProposerConnectPollStatusType libpqprop_connect_poll(WalProposerConn* conn);
static bool libpqprop_send_query(WalProposerConn* conn, char* query);
static WalProposerExecStatusType libpqprop_get_query_result(WalProposerConn* conn);
static pgsocket libpqprop_socket(WalProposerConn* conn);
static int libpqprop_flush(WalProposerConn* conn);
static void libpqprop_finish(WalProposerConn* conn);
static PGAsyncReadResult libpqprop_async_read(WalProposerConn* conn, char** buf, int* amount);
static PGAsyncWriteResult libpqprop_async_write(WalProposerConn* conn, void const* buf, size_t size);
static bool libpqprop_blocking_write(WalProposerConn* conn, void const* buf, size_t size);
static char *libpqprop_error_message(WalProposerConn * conn);
static WalProposerConnStatusType libpqprop_status(WalProposerConn * conn);
static WalProposerConn * libpqprop_connect_start(char *conninfo);
static WalProposerConnectPollStatusType libpqprop_connect_poll(WalProposerConn * conn);
static bool libpqprop_send_query(WalProposerConn * conn, char *query);
static WalProposerExecStatusType libpqprop_get_query_result(WalProposerConn * conn);
static pgsocket libpqprop_socket(WalProposerConn * conn);
static int libpqprop_flush(WalProposerConn * conn);
static void libpqprop_finish(WalProposerConn * conn);
static PGAsyncReadResult libpqprop_async_read(WalProposerConn * conn, char **buf, int *amount);
static PGAsyncWriteResult libpqprop_async_write(WalProposerConn * conn, void const *buf, size_t size);
static bool libpqprop_blocking_write(WalProposerConn * conn, void const *buf, size_t size);
static WalProposerFunctionsType PQWalProposerFunctions = {
static WalProposerFunctionsType PQWalProposerFunctions =
{
libpqprop_error_message,
libpqprop_status,
libpqprop_connect_start,
libpqprop_connect_poll,
libpqprop_send_query,
libpqprop_get_query_result,
libpqprop_socket,
libpqprop_flush,
libpqprop_finish,
libpqprop_async_read,
libpqprop_async_write,
libpqprop_blocking_write,
libpqprop_status,
libpqprop_connect_start,
libpqprop_connect_poll,
libpqprop_send_query,
libpqprop_get_query_result,
libpqprop_socket,
libpqprop_flush,
libpqprop_finish,
libpqprop_async_read,
libpqprop_async_write,
libpqprop_blocking_write,
};
/* Module initialization */
@@ -52,7 +54,7 @@ pg_init_libpqwalproposer(void)
/* Helper function */
static bool
ensure_nonblocking_status(WalProposerConn* conn, bool is_nonblocking)
ensure_nonblocking_status(WalProposerConn * conn, bool is_nonblocking)
{
/* If we're already correctly blocking or nonblocking, all good */
if (is_nonblocking == conn->is_nonblocking)
@@ -67,14 +69,14 @@ ensure_nonblocking_status(WalProposerConn* conn, bool is_nonblocking)
}
/* Exported function definitions */
static char*
libpqprop_error_message(WalProposerConn* conn)
static char *
libpqprop_error_message(WalProposerConn * conn)
{
return PQerrorMessage(conn->pg_conn);
}
static WalProposerConnStatusType
libpqprop_status(WalProposerConn* conn)
libpqprop_status(WalProposerConn * conn)
{
switch (PQstatus(conn->pg_conn))
{
@@ -87,35 +89,38 @@ libpqprop_status(WalProposerConn* conn)
}
}
static WalProposerConn*
libpqprop_connect_start(char* conninfo)
static WalProposerConn *
libpqprop_connect_start(char *conninfo)
{
WalProposerConn* conn;
PGconn* pg_conn;
WalProposerConn *conn;
PGconn *pg_conn;
pg_conn = PQconnectStart(conninfo);
/*
* Allocation of a PQconn can fail, and will return NULL. We want to fully replicate the
* behavior of PQconnectStart here.
* Allocation of a PQconn can fail, and will return NULL. We want to fully
* replicate the behavior of PQconnectStart here.
*/
if (!pg_conn)
return NULL;
/*
* And in theory this allocation can fail as well, but it's incredibly unlikely if we just
* successfully allocated a PGconn.
* And in theory this allocation can fail as well, but it's incredibly
* unlikely if we just successfully allocated a PGconn.
*
* palloc will exit on failure though, so there's not much we could do if it *did* fail.
* palloc will exit on failure though, so there's not much we could do if
* it *did* fail.
*/
conn = palloc(sizeof(WalProposerConn));
conn->pg_conn = pg_conn;
conn->is_nonblocking = false; /* connections always start in blocking mode */
conn->is_nonblocking = false; /* connections always start in blocking
* mode */
conn->recvbuf = NULL;
return conn;
}
static WalProposerConnectPollStatusType
libpqprop_connect_poll(WalProposerConn* conn)
libpqprop_connect_poll(WalProposerConn * conn)
{
WalProposerConnectPollStatusType return_val;
@@ -134,26 +139,34 @@ libpqprop_connect_poll(WalProposerConn* conn)
return_val = WP_CONN_POLLING_OK;
break;
/* There's a comment at its source about this constant being unused. We'll expect it's never
* returned. */
/*
* There's a comment at its source about this constant being
* unused. We'll expect it's never returned.
*/
case PGRES_POLLING_ACTIVE:
elog(FATAL, "Unexpected PGRES_POLLING_ACTIVE returned from PQconnectPoll");
/* This return is never actually reached, but it's here to make the compiler happy */
/*
* This return is never actually reached, but it's here to make
* the compiler happy
*/
return WP_CONN_POLLING_FAILED;
default:
Assert(false);
return_val = WP_CONN_POLLING_FAILED; /* keep the compiler quiet */
return_val = WP_CONN_POLLING_FAILED; /* keep the compiler quiet */
}
return return_val;
}
static bool
libpqprop_send_query(WalProposerConn* conn, char* query)
libpqprop_send_query(WalProposerConn * conn, char *query)
{
/* We need to be in blocking mode for sending the query to run without
* requiring a call to PQflush */
/*
* We need to be in blocking mode for sending the query to run without
* requiring a call to PQflush
*/
if (!ensure_nonblocking_status(conn, false))
return false;
@@ -165,13 +178,13 @@ libpqprop_send_query(WalProposerConn* conn, char* query)
}
static WalProposerExecStatusType
libpqprop_get_query_result(WalProposerConn* conn)
libpqprop_get_query_result(WalProposerConn * conn)
{
PGresult* result;
PGresult *result;
WalProposerExecStatusType return_val;
/* Marker variable if we need to log an unexpected success result */
char* unexpected_success = NULL;
char *unexpected_success = NULL;
/* Consume any input that we might be missing */
if (!PQconsumeInput(conn->pg_conn))
@@ -182,8 +195,11 @@ libpqprop_get_query_result(WalProposerConn* conn)
result = PQgetResult(conn->pg_conn);
/* PQgetResult returns NULL only if getting the result was successful & there's no more of the
* result to get. */
/*
* PQgetResult returns NULL only if getting the result was successful &
* there's no more of the result to get.
*/
if (!result)
{
elog(WARNING, "[libpqwalproposer] Unexpected successful end of command results");
@@ -191,7 +207,7 @@ libpqprop_get_query_result(WalProposerConn* conn)
}
/* Helper macro to reduce boilerplate */
#define UNEXPECTED_SUCCESS(msg) \
#define UNEXPECTED_SUCCESS(msg) \
return_val = WP_EXEC_UNEXPECTED_SUCCESS; \
unexpected_success = msg; \
break;
@@ -199,12 +215,12 @@ libpqprop_get_query_result(WalProposerConn* conn)
switch (PQresultStatus(result))
{
/* "true" success case */
/* "true" success case */
case PGRES_COPY_BOTH:
return_val = WP_EXEC_SUCCESS_COPYBOTH;
break;
/* Unexpected success case */
/* Unexpected success case */
case PGRES_EMPTY_QUERY:
UNEXPECTED_SUCCESS("empty query return");
case PGRES_COMMAND_OK:
@@ -220,7 +236,7 @@ libpqprop_get_query_result(WalProposerConn* conn)
case PGRES_PIPELINE_SYNC:
UNEXPECTED_SUCCESS("pipeline sync point");
/* Failure cases */
/* Failure cases */
case PGRES_BAD_RESPONSE:
case PGRES_NONFATAL_ERROR:
case PGRES_FATAL_ERROR:
@@ -230,7 +246,7 @@ libpqprop_get_query_result(WalProposerConn* conn)
default:
Assert(false);
return_val = WP_EXEC_FAILED; /* keep the compiler quiet */
return_val = WP_EXEC_FAILED; /* keep the compiler quiet */
}
if (unexpected_success)
@@ -240,19 +256,19 @@ libpqprop_get_query_result(WalProposerConn* conn)
}
static pgsocket
libpqprop_socket(WalProposerConn* conn)
libpqprop_socket(WalProposerConn * conn)
{
return PQsocket(conn->pg_conn);
}
static int
libpqprop_flush(WalProposerConn* conn)
libpqprop_flush(WalProposerConn * conn)
{
return (PQflush(conn->pg_conn));
}
static void
libpqprop_finish(WalProposerConn* conn)
libpqprop_finish(WalProposerConn * conn)
{
if (conn->recvbuf != NULL)
PQfreemem(conn->recvbuf);
@@ -267,9 +283,9 @@ libpqprop_finish(WalProposerConn* conn)
* to this function.
*/
static PGAsyncReadResult
libpqprop_async_read(WalProposerConn* conn, char** buf, int* amount)
libpqprop_async_read(WalProposerConn * conn, char **buf, int *amount)
{
int result;
int result;
if (conn->recvbuf != NULL)
{
@@ -285,12 +301,11 @@ libpqprop_async_read(WalProposerConn* conn, char** buf, int* amount)
return PG_ASYNC_READ_FAIL;
}
/* The docs for PQgetCopyData list the return values as:
* 0 if the copy is still in progress, but no "complete row" is
* available
* -1 if the copy is done
* -2 if an error occured
* (> 0) if it was successful; that value is the amount transferred.
/*
* The docs for PQgetCopyData list the return values as: 0 if the copy is
* still in progress, but no "complete row" is available -1 if the copy is
* done -2 if an error occured (> 0) if it was successful; that value is
* the amount transferred.
*
* The protocol we use between walproposer and safekeeper means that we
* *usually* wouldn't expect to see that the copy is done, but this can
@@ -304,25 +319,28 @@ libpqprop_async_read(WalProposerConn* conn, char** buf, int* amount)
*buf = NULL;
return PG_ASYNC_READ_TRY_AGAIN;
case -1:
{
/*
* If we get -1, it's probably because of a server error; the
* safekeeper won't normally send a CopyDone message.
*
* We can check PQgetResult to make sure that the server failed;
* it'll always result in PGRES_FATAL_ERROR
*/
ExecStatusType status = PQresultStatus(PQgetResult(conn->pg_conn));
{
/*
* If we get -1, it's probably because of a server error; the
* safekeeper won't normally send a CopyDone message.
*
* We can check PQgetResult to make sure that the server
* failed; it'll always result in PGRES_FATAL_ERROR
*/
ExecStatusType status = PQresultStatus(PQgetResult(conn->pg_conn));
if (status != PGRES_FATAL_ERROR)
elog(FATAL, "unexpected result status %d after failed PQgetCopyData", status);
if (status != PGRES_FATAL_ERROR)
elog(FATAL, "unexpected result status %d after failed PQgetCopyData", status);
/* If there was actually an error, it'll be properly reported by
* calls to PQerrorMessage -- we don't have to do anything else */
*amount = 0;
*buf = NULL;
return PG_ASYNC_READ_FAIL;
}
/*
* If there was actually an error, it'll be properly reported
* by calls to PQerrorMessage -- we don't have to do anything
* else
*/
*amount = 0;
*buf = NULL;
return PG_ASYNC_READ_FAIL;
}
case -2:
*amount = 0;
*buf = NULL;
@@ -336,23 +354,25 @@ libpqprop_async_read(WalProposerConn* conn, char** buf, int* amount)
}
static PGAsyncWriteResult
libpqprop_async_write(WalProposerConn* conn, void const* buf, size_t size)
libpqprop_async_write(WalProposerConn * conn, void const *buf, size_t size)
{
int result;
int result;
/* If we aren't in non-blocking mode, switch to it. */
if (!ensure_nonblocking_status(conn, true))
return PG_ASYNC_WRITE_FAIL;
/* The docs for PQputcopyData list the return values as:
* 1 if the data was queued,
* 0 if it was not queued because of full buffers, or
* -1 if an error occured
/*
* The docs for PQputcopyData list the return values as: 1 if the data was
* queued, 0 if it was not queued because of full buffers, or -1 if an
* error occured
*/
result = PQputCopyData(conn->pg_conn, buf, size);
/* We won't get a result of zero because walproposer always empties the
* connection's buffers before sending more */
/*
* We won't get a result of zero because walproposer always empties the
* connection's buffers before sending more
*/
Assert(result != 0);
switch (result)
@@ -366,16 +386,17 @@ libpqprop_async_write(WalProposerConn* conn, void const* buf, size_t size)
elog(FATAL, "invalid return %d from PQputCopyData", result);
}
/* After queueing the data, we still need to flush to get it to send.
* This might take multiple tries, but we don't want to wait around
* until it's done.
/*
* After queueing the data, we still need to flush to get it to send. This
* might take multiple tries, but we don't want to wait around until it's
* done.
*
* PQflush has the following returns (directly quoting the docs):
* 0 if sucessful,
* 1 if it was unable to send all the data in the send queue yet
* -1 if it failed for some reason
* PQflush has the following returns (directly quoting the docs): 0 if
* sucessful, 1 if it was unable to send all the data in the send queue
* yet -1 if it failed for some reason
*/
switch (result = PQflush(conn->pg_conn)) {
switch (result = PQflush(conn->pg_conn))
{
case 0:
return PG_ASYNC_WRITE_SUCCESS;
case 1:
@@ -388,16 +409,18 @@ libpqprop_async_write(WalProposerConn* conn, void const* buf, size_t size)
}
static bool
libpqprop_blocking_write(WalProposerConn* conn, void const* buf, size_t size)
libpqprop_blocking_write(WalProposerConn * conn, void const *buf, size_t size)
{
int result;
int result;
/* If we are in non-blocking mode, switch out of it. */
if (!ensure_nonblocking_status(conn, false))
return false;
/* Ths function is very similar to libpqprop_async_write. For more
* information, refer to the comments there */
/*
* Ths function is very similar to libpqprop_async_write. For more
* information, refer to the comments there
*/
if ((result = PQputCopyData(conn->pg_conn, buf, size)) == -1)
return false;

View File

@@ -29,7 +29,8 @@ PG_MODULE_MAGIC;
void _PG_init(void);
void _PG_init(void)
void
_PG_init(void)
{
pg_init_libpagestore();
pg_init_libpqwalproposer();
@@ -59,9 +60,9 @@ pg_cluster_size(PG_FUNCTION_ARGS)
Datum
backpressure_lsns(PG_FUNCTION_ARGS)
{
XLogRecPtr writePtr;
XLogRecPtr flushPtr;
XLogRecPtr applyPtr;
XLogRecPtr writePtr;
XLogRecPtr flushPtr;
XLogRecPtr applyPtr;
Datum values[3];
bool nulls[3];
TupleDesc tupdesc;

View File

@@ -16,4 +16,4 @@ extern void pg_init_libpagestore(void);
extern void pg_init_libpqwalproposer(void);
extern void pg_init_walproposer(void);
#endif /* NEON_H */
#endif /* NEON_H */

View File

@@ -83,8 +83,8 @@ typedef struct
typedef struct
{
ZenithRequest req;
Oid dbNode;
} ZenithDbSizeRequest;
Oid dbNode;
} ZenithDbSizeRequest;
typedef struct
@@ -123,12 +123,13 @@ typedef struct
{
ZenithMessageTag tag;
int64 db_size;
} ZenithDbSizeResponse;
} ZenithDbSizeResponse;
typedef struct
{
ZenithMessageTag tag;
char message[FLEXIBLE_ARRAY_MEMBER]; /* null-terminated error message */
char message[FLEXIBLE_ARRAY_MEMBER]; /* null-terminated error
* message */
} ZenithErrorResponse;
extern StringInfoData zm_pack_request(ZenithRequest *msg);
@@ -142,12 +143,12 @@ extern char *zm_to_string(ZenithMessage *msg);
typedef struct
{
ZenithResponse *(*request) (ZenithRequest *request);
void (*send) (ZenithRequest *request);
void (*send) (ZenithRequest *request);
ZenithResponse *(*receive) (void);
void (*flush) (void);
void (*flush) (void);
} page_server_api;
extern page_server_api *page_server;
extern page_server_api * page_server;
extern char *page_server_connstring;
extern char *zenith_timeline;
@@ -179,7 +180,7 @@ extern void zenith_read(SMgrRelation reln, ForkNumber forknum, BlockNumber block
char *buffer);
extern void zenith_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
XLogRecPtr request_lsn, bool request_latest, char *buffer);
XLogRecPtr request_lsn, bool request_latest, char *buffer);
extern void zenith_write(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer, bool skipFsync);
@@ -217,7 +218,7 @@ extern void inmem_immedsync(SMgrRelation reln, ForkNumber forknum);
/* utils for zenith relsize cache */
extern void relsize_hash_init(void);
extern bool get_cached_relsize(RelFileNode rnode, ForkNumber forknum, BlockNumber* size);
extern bool get_cached_relsize(RelFileNode rnode, ForkNumber forknum, BlockNumber *size);
extern void set_cached_relsize(RelFileNode rnode, ForkNumber forknum, BlockNumber size);
extern void update_cached_relsize(RelFileNode rnode, ForkNumber forknum, BlockNumber size);
extern void forget_cached_relsize(RelFileNode rnode, ForkNumber forknum);

View File

@@ -94,7 +94,9 @@ const int SmgrTrace = DEBUG5;
page_server_api *page_server;
/* GUCs */
char *page_server_connstring; // with substituted password
char *page_server_connstring;
//with substituted password
char *zenith_timeline;
char *zenith_tenant;
bool wal_redo = false;
@@ -107,7 +109,7 @@ typedef enum
UNLOGGED_BUILD_PHASE_1,
UNLOGGED_BUILD_PHASE_2,
UNLOGGED_BUILD_NOT_PERMANENT
} UnloggedBuildPhase;
} UnloggedBuildPhase;
static SMgrRelation unlogged_build_rel = NULL;
static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
@@ -127,31 +129,33 @@ static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
#define MAX_PREFETCH_REQUESTS 128
BufferTag prefetch_requests[MAX_PREFETCH_REQUESTS];
BufferTag prefetch_responses[MAX_PREFETCH_REQUESTS];
int n_prefetch_requests;
int n_prefetch_responses;
int n_prefetched_buffers;
int n_prefetch_hits;
int n_prefetch_misses;
XLogRecPtr prefetch_lsn;
BufferTag prefetch_requests[MAX_PREFETCH_REQUESTS];
BufferTag prefetch_responses[MAX_PREFETCH_REQUESTS];
int n_prefetch_requests;
int n_prefetch_responses;
int n_prefetched_buffers;
int n_prefetch_hits;
int n_prefetch_misses;
XLogRecPtr prefetch_lsn;
static void
consume_prefetch_responses(void)
{
for (int i = n_prefetched_buffers; i < n_prefetch_responses; i++) {
ZenithResponse* resp = page_server->receive();
for (int i = n_prefetched_buffers; i < n_prefetch_responses; i++)
{
ZenithResponse *resp = page_server->receive();
pfree(resp);
}
n_prefetched_buffers = 0;
n_prefetch_responses = 0;
}
static ZenithResponse*
page_server_request(void const* req)
static ZenithResponse *
page_server_request(void const *req)
{
consume_prefetch_responses();
return page_server->request((ZenithRequest*)req);
return page_server->request((ZenithRequest *) req);
}
@@ -196,11 +200,11 @@ zm_pack_request(ZenithRequest *msg)
{
ZenithDbSizeRequest *msg_req = (ZenithDbSizeRequest *) msg;
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, msg_req->dbNode);
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, msg_req->dbNode);
break;
break;
}
case T_ZenithGetPageRequest:
{
@@ -546,21 +550,22 @@ zenith_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
else if (lsn == InvalidXLogRecPtr)
{
/*
* When PostgreSQL extends a relation, it calls smgrextend() with an all-zeros pages,
* and we can just ignore that in Zenith. We do need to remember the new size,
* though, so that smgrnblocks() returns the right answer after the rel has
* been extended. We rely on the relsize cache for that.
* When PostgreSQL extends a relation, it calls smgrextend() with an
* all-zeros pages, and we can just ignore that in Zenith. We do need
* to remember the new size, though, so that smgrnblocks() returns the
* right answer after the rel has been extended. We rely on the
* relsize cache for that.
*
* A completely empty heap page doesn't need to be WAL-logged, either. The
* heapam can leave such a page behind, if e.g. an insert errors out after
* initializing the page, but before it has inserted the tuple and WAL-logged
* the change. When we read the page from the page server, it will come back
* as all-zeros. That's OK, the heapam will initialize an all-zeros page on
* first use.
* A completely empty heap page doesn't need to be WAL-logged, either.
* The heapam can leave such a page behind, if e.g. an insert errors
* out after initializing the page, but before it has inserted the
* tuple and WAL-logged the change. When we read the page from the
* page server, it will come back as all-zeros. That's OK, the heapam
* will initialize an all-zeros page on first use.
*
* In other scenarios, evicting a dirty page with no LSN is a bad sign: it implies
* that the page was not WAL-logged, and its contents will be lost when it's
* evicted.
* In other scenarios, evicting a dirty page with no LSN is a bad
* sign: it implies that the page was not WAL-logged, and its contents
* will be lost when it's evicted.
*/
if (PageIsNew(buffer))
{
@@ -691,9 +696,9 @@ zenith_get_request_lsn(bool *latest, RelFileNode rnode, ForkNumber forknum, Bloc
* Is it possible that the last-written LSN is ahead of last flush
* LSN? Generally not, we shouldn't evict a page from the buffer cache
* before all its modifications have been safely flushed. That's the
* "WAL before data" rule. However, such case does exist at index building,
* _bt_blwritepage logs the full page without flushing WAL before
* smgrextend (files are fsynced before build ends).
* "WAL before data" rule. However, such case does exist at index
* building, _bt_blwritepage logs the full page without flushing WAL
* before smgrextend (files are fsynced before build ends).
*/
#if PG_VERSION_NUM >= 150000
flushlsn = GetFlushRecPtr(NULL);
@@ -728,10 +733,12 @@ zenith_exists(SMgrRelation reln, ForkNumber forkNum)
switch (reln->smgr_relpersistence)
{
case 0:
/*
* We don't know if it's an unlogged rel stored locally, or permanent
* rel stored in the page server. First check if it exists locally.
* If it does, great. Otherwise check if it exists in the page server.
* We don't know if it's an unlogged rel stored locally, or
* permanent rel stored in the page server. First check if it
* exists locally. If it does, great. Otherwise check if it exists
* in the page server.
*/
if (mdexists(reln, forkNum))
return true;
@@ -755,11 +762,11 @@ zenith_exists(SMgrRelation reln, ForkNumber forkNum)
/*
* \d+ on a view calls smgrexists with 0/0/0 relfilenode. The page server
* will error out if you check that, because the whole dbdir for tablespace
* 0, db 0 doesn't exists. We possibly should change the page server to
* accept that and return 'false', to be consistent with mdexists(). But
* we probably also should fix pg_table_size() to not call smgrexists()
* with bogus relfilenode.
* will error out if you check that, because the whole dbdir for
* tablespace 0, db 0 doesn't exists. We possibly should change the page
* server to accept that and return 'false', to be consistent with
* mdexists(). But we probably also should fix pg_table_size() to not call
* smgrexists() with bogus relfilenode.
*
* For now, handle that special case here.
*/
@@ -880,13 +887,13 @@ void
zenith_unlink(RelFileNodeBackend rnode, ForkNumber forkNum, bool isRedo)
{
/*
* Might or might not exist locally, depending on whether it's
* an unlogged or permanent relation (or if DEBUG_COMPARE_LOCAL is
* set). Try to unlink, it won't do any harm if the file doesn't
* exist.
* Might or might not exist locally, depending on whether it's an unlogged
* or permanent relation (or if DEBUG_COMPARE_LOCAL is set). Try to
* unlink, it won't do any harm if the file doesn't exist.
*/
mdunlink(rnode, forkNum, isRedo);
if (!RelFileNodeBackendIsTemp(rnode)) {
if (!RelFileNodeBackendIsTemp(rnode))
{
forget_cached_relsize(rnode.node, forkNum);
}
}
@@ -926,8 +933,9 @@ zenith_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
/*
* Check that the cluster size limit has not been exceeded.
*
* Temporary and unlogged relations are not included in the cluster size measured
* by the page server, so ignore those. Autovacuum processes are also exempt.
* Temporary and unlogged relations are not included in the cluster size
* measured by the page server, so ignore those. Autovacuum processes are
* also exempt.
*/
if (max_cluster_size > 0 &&
reln->smgr_relpersistence == RELPERSISTENCE_PERMANENT &&
@@ -937,10 +945,10 @@ zenith_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
if (current_size >= ((uint64) max_cluster_size) * 1024 * 1024)
ereport(ERROR,
(errcode(ERRCODE_DISK_FULL),
errmsg("could not extend file because cluster size limit (%d MB) has been exceeded",
max_cluster_size),
errhint("This limit is defined by neon.max_cluster_size GUC")));
(errcode(ERRCODE_DISK_FULL),
errmsg("could not extend file because cluster size limit (%d MB) has been exceeded",
max_cluster_size),
errhint("This limit is defined by neon.max_cluster_size GUC")));
}
zenith_wallog_page(reln, forkNum, blkno, buffer);
@@ -987,8 +995,8 @@ void
zenith_close(SMgrRelation reln, ForkNumber forknum)
{
/*
* Let md.c close it, if it had it open. Doesn't hurt to do this
* even for permanent relations that have no local storage.
* Let md.c close it, if it had it open. Doesn't hurt to do this even for
* permanent relations that have no local storage.
*/
mdclose(reln, forknum);
}
@@ -1079,17 +1087,18 @@ zenith_writeback(SMgrRelation reln, ForkNumber forknum,
* While function is defined in the zenith extension it's used within neon_test_utils directly.
* To avoid breaking tests in the runtime please keep function signature in sync.
*/
void zenith_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
XLogRecPtr request_lsn, bool request_latest, char *buffer)
void
zenith_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
XLogRecPtr request_lsn, bool request_latest, char *buffer)
{
ZenithResponse *resp;
int i;
int i;
/*
* Try to find prefetched page.
* It is assumed that pages will be requested in the same order as them are prefetched,
* but some other backend may load page in shared buffers, so some prefetch responses should
* be skipped.
* Try to find prefetched page. It is assumed that pages will be requested
* in the same order as them are prefetched, but some other backend may
* load page in shared buffers, so some prefetch responses should be
* skipped.
*/
for (i = n_prefetched_buffers; i < n_prefetch_responses; i++)
{
@@ -1099,19 +1108,20 @@ void zenith_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno
prefetch_responses[i].forkNum == forkNum &&
prefetch_responses[i].blockNum == blkno)
{
char* page = ((ZenithGetPageResponse *) resp)->page;
char *page = ((ZenithGetPageResponse *) resp)->page;
/*
* Check if prefetched page is still relevant.
* If it is updated by some other backend, then it should not
* be requested from smgr unless it is evicted from shared buffers.
* In the last case last_evicted_lsn should be updated and
* request_lsn should be greater than prefetch_lsn.
* Maximum with page LSN is used because page returned by page server
* may have LSN either greater either smaller than requested.
* Check if prefetched page is still relevant. If it is updated by
* some other backend, then it should not be requested from smgr
* unless it is evicted from shared buffers. In the last case
* last_evicted_lsn should be updated and request_lsn should be
* greater than prefetch_lsn. Maximum with page LSN is used
* because page returned by page server may have LSN either
* greater either smaller than requested.
*/
if (Max(prefetch_lsn, PageGetLSN(page)) >= request_lsn)
{
n_prefetched_buffers = i+1;
n_prefetched_buffers = i + 1;
n_prefetch_hits += 1;
n_prefetch_requests = 0;
memcpy(buffer, page, BLCKSZ);
@@ -1133,6 +1143,7 @@ void zenith_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno
.forknum = forkNum,
.blkno = blkno
};
if (n_prefetch_requests > 0)
{
/* Combine all prefetch requests with primary request */
@@ -1471,8 +1482,8 @@ int64
zenith_dbsize(Oid dbNode)
{
ZenithResponse *resp;
int64 db_size;
XLogRecPtr request_lsn;
int64 db_size;
XLogRecPtr request_lsn;
bool latest;
RelFileNode dummy_node = {InvalidOid, InvalidOid, InvalidOid};
@@ -1564,10 +1575,12 @@ zenith_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
XLogFlush(lsn);
/*
* Truncate may affect several chunks of relations. So we should either update last written LSN for all of them,
* or update LSN for "dummy" metadata block. Second approach seems more efficient. If the relation is extended
* again later, the extension will update the last-written LSN for the extended pages, so there's no harm in
* leaving behind obsolete entries for the truncated chunks.
* Truncate may affect several chunks of relations. So we should either
* update last written LSN for all of them, or update LSN for "dummy"
* metadata block. Second approach seems more efficient. If the relation
* is extended again later, the extension will update the last-written LSN
* for the extended pages, so there's no harm in leaving behind obsolete
* entries for the truncated chunks.
*/
SetLastWrittenLSNForRelation(lsn, reln->smgr_rnode.node, forknum);

File diff suppressed because it is too large Load Diff

View File

@@ -14,10 +14,13 @@
#define SK_PROTOCOL_VERSION 2
#define MAX_SAFEKEEPERS 32
#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) /* max size of a single WAL message */
#define XLOG_HDR_SIZE (1+8*3) /* 'w' + startPos + walEnd + timestamp */
#define XLOG_HDR_START_POS 1 /* offset of start position in wal sender message header */
#define XLOG_HDR_END_POS (1+8) /* offset of end position in wal sender message header */
#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) /* max size of a single
* WAL message */
#define XLOG_HDR_SIZE (1+8*3) /* 'w' + startPos + walEnd + timestamp */
#define XLOG_HDR_START_POS 1 /* offset of start position in wal sender
* message header */
#define XLOG_HDR_END_POS (1+8) /* offset of end position in wal sender
* message header */
/*
* In the spirit of WL_SOCKET_READABLE and others, this corresponds to no events having occured,
@@ -25,12 +28,12 @@
*/
#define WL_NO_EVENTS 0
extern char* wal_acceptors_list;
extern int wal_acceptor_reconnect_timeout;
extern int wal_acceptor_connect_timeout;
extern bool am_wal_proposer;
extern char *wal_acceptors_list;
extern int wal_acceptor_reconnect_timeout;
extern int wal_acceptor_connect_timeout;
extern bool am_wal_proposer;
struct WalProposerConn; /* Defined in libpqwalproposer */
struct WalProposerConn; /* Defined in libpqwalproposer */
typedef struct WalProposerConn WalProposerConn;
struct WalMessage;
@@ -44,21 +47,26 @@ typedef enum
{
/* The full read was successful. buf now points to the data */
PG_ASYNC_READ_SUCCESS,
/* The read is ongoing. Wait until the connection is read-ready, then try
* again. */
/*
* The read is ongoing. Wait until the connection is read-ready, then try
* again.
*/
PG_ASYNC_READ_TRY_AGAIN,
/* Reading failed. Check PQerrorMessage(conn) */
PG_ASYNC_READ_FAIL,
} PGAsyncReadResult;
} PGAsyncReadResult;
/* Possible return values from WritePGAsync */
typedef enum
{
/* The write fully completed */
PG_ASYNC_WRITE_SUCCESS,
/* The write started, but you'll need to call PQflush some more times
* to finish it off. We just tried, so it's best to wait until the
* connection is read- or write-ready to try again.
/*
* The write started, but you'll need to call PQflush some more times to
* finish it off. We just tried, so it's best to wait until the connection
* is read- or write-ready to try again.
*
* If it becomes read-ready, call PQconsumeInput and flush again. If it
* becomes write-ready, just call PQflush.
@@ -66,7 +74,7 @@ typedef enum
PG_ASYNC_WRITE_TRY_FLUSH,
/* Writing failed. Check PQerrorMessage(conn) */
PG_ASYNC_WRITE_FAIL,
} PGAsyncWriteResult;
} PGAsyncWriteResult;
/*
* WAL safekeeper state, which is used to wait for some event.
@@ -79,8 +87,8 @@ typedef enum
typedef enum
{
/*
* Does not have an active connection and will stay that way until
* further notice.
* Does not have an active connection and will stay that way until further
* notice.
*
* Moves to SS_CONNECTING_WRITE by calls to ResetConnection.
*/
@@ -105,8 +113,8 @@ typedef enum
SS_WAIT_EXEC_RESULT,
/*
* Executing the receiving half of the handshake. After receiving, moves to
* SS_VOTING.
* Executing the receiving half of the handshake. After receiving, moves
* to SS_VOTING.
*/
SS_HANDSHAKE_RECV,
@@ -120,8 +128,9 @@ typedef enum
SS_VOTING,
/*
* Already sent voting information, waiting to receive confirmation from the
* node. After receiving, moves to SS_IDLE, if the quorum isn't reached yet.
* Already sent voting information, waiting to receive confirmation from
* the node. After receiving, moves to SS_IDLE, if the quorum isn't
* reached yet.
*/
SS_WAIT_VERDICT,
@@ -141,7 +150,7 @@ typedef enum
* to read.
*/
SS_ACTIVE,
} SafekeeperState;
} SafekeeperState;
/* Consensus logical timestamp. */
typedef uint64 term_t;
@@ -156,21 +165,21 @@ typedef uint64 NNodeId;
/* Initial Proposer -> Acceptor message */
typedef struct ProposerGreeting
{
uint64 tag; /* message tag */
uint32 protocolVersion; /* proposer-safekeeper protocol version */
uint32 pgVersion;
pg_uuid_t proposerId;
uint64 systemId; /* Postgres system identifier */
uint8 ztimelineid[16]; /* Zenith timeline id */
uint8 ztenantid[16];
TimeLineID timeline;
uint32 walSegSize;
} ProposerGreeting;
uint64 tag; /* message tag */
uint32 protocolVersion; /* proposer-safekeeper protocol version */
uint32 pgVersion;
pg_uuid_t proposerId;
uint64 systemId; /* Postgres system identifier */
uint8 ztimelineid[16]; /* Zenith timeline id */
uint8 ztenantid[16];
TimeLineID timeline;
uint32 walSegSize;
} ProposerGreeting;
typedef struct AcceptorProposerMessage
{
uint64 tag;
} AcceptorProposerMessage;
uint64 tag;
} AcceptorProposerMessage;
/*
* Acceptor -> Proposer initial response: the highest term acceptor voted for.
@@ -180,7 +189,7 @@ typedef struct AcceptorGreeting
AcceptorProposerMessage apm;
term_t term;
NNodeId nodeId;
} AcceptorGreeting;
} AcceptorGreeting;
/*
* Proposer -> Acceptor vote request.
@@ -189,36 +198,39 @@ typedef struct VoteRequest
{
uint64 tag;
term_t term;
pg_uuid_t proposerId; /* for monitoring/debugging */
} VoteRequest;
pg_uuid_t proposerId; /* for monitoring/debugging */
} VoteRequest;
/* Element of term switching chain. */
typedef struct TermSwitchEntry
{
term_t term;
XLogRecPtr lsn;
} TermSwitchEntry;
term_t term;
XLogRecPtr lsn;
} TermSwitchEntry;
typedef struct TermHistory
{
uint32 n_entries;
uint32 n_entries;
TermSwitchEntry *entries;
} TermHistory;
} TermHistory;
/* Vote itself, sent from safekeeper to proposer */
typedef struct VoteResponse {
typedef struct VoteResponse
{
AcceptorProposerMessage apm;
term_t term;
uint64 voteGiven;
term_t term;
uint64 voteGiven;
/*
* Safekeeper flush_lsn (end of WAL) + history of term switches allow
* proposer to choose the most advanced one.
* proposer to choose the most advanced one.
*/
XLogRecPtr flushLsn;
XLogRecPtr truncateLsn; /* minimal LSN which may be needed for recovery of some safekeeper */
XLogRecPtr flushLsn;
XLogRecPtr truncateLsn; /* minimal LSN which may be needed for
* recovery of some safekeeper */
TermHistory termHistory;
XLogRecPtr timelineStartLsn; /* timeline globally starts at this LSN */
} VoteResponse;
XLogRecPtr timelineStartLsn; /* timeline globally starts at this LSN */
} VoteResponse;
/*
* Proposer -> Acceptor message announcing proposer is elected and communicating
@@ -226,60 +238,62 @@ typedef struct VoteResponse {
*/
typedef struct ProposerElected
{
uint64 tag;
term_t term;
uint64 tag;
term_t term;
/* proposer will send since this point */
XLogRecPtr startStreamingAt;
XLogRecPtr startStreamingAt;
/* history of term switches up to this proposer */
TermHistory *termHistory;
/* timeline globally starts at this LSN */
XLogRecPtr timelineStartLsn;
} ProposerElected;
XLogRecPtr timelineStartLsn;
} ProposerElected;
/*
* Header of request with WAL message sent from proposer to safekeeper.
*/
typedef struct AppendRequestHeader
{
uint64 tag;
term_t term; /* term of the proposer */
uint64 tag;
term_t term; /* term of the proposer */
/*
* LSN since which current proposer appends WAL (begin_lsn of its first
* record); determines epoch switch point.
*/
XLogRecPtr epochStartLsn;
XLogRecPtr beginLsn; /* start position of message in WAL */
XLogRecPtr endLsn; /* end position of message in WAL */
XLogRecPtr commitLsn; /* LSN committed by quorum of safekeepers */
XLogRecPtr epochStartLsn;
XLogRecPtr beginLsn; /* start position of message in WAL */
XLogRecPtr endLsn; /* end position of message in WAL */
XLogRecPtr commitLsn; /* LSN committed by quorum of safekeepers */
/*
* minimal LSN which may be needed for recovery of some safekeeper (end lsn
* + 1 of last chunk streamed to everyone)
* minimal LSN which may be needed for recovery of some safekeeper (end
* lsn + 1 of last chunk streamed to everyone)
*/
XLogRecPtr truncateLsn;
pg_uuid_t proposerId; /* for monitoring/debugging */
} AppendRequestHeader;
XLogRecPtr truncateLsn;
pg_uuid_t proposerId; /* for monitoring/debugging */
} AppendRequestHeader;
/*
* Hot standby feedback received from replica
*/
typedef struct HotStandbyFeedback
{
TimestampTz ts;
TimestampTz ts;
FullTransactionId xmin;
FullTransactionId catalog_xmin;
} HotStandbyFeedback;
} HotStandbyFeedback;
typedef struct ReplicationFeedback
typedef struct ReplicationFeedback
{
// current size of the timeline on pageserver
uint64 currentClusterSize;
// standby_status_update fields that safekeeper received from pageserver
XLogRecPtr ps_writelsn;
XLogRecPtr ps_flushlsn;
XLogRecPtr ps_applylsn;
/* current size of the timeline on pageserver */
uint64 currentClusterSize;
/* standby_status_update fields that safekeeper received from pageserver */
XLogRecPtr ps_writelsn;
XLogRecPtr ps_flushlsn;
XLogRecPtr ps_applylsn;
TimestampTz ps_replytime;
} ReplicationFeedback;
} ReplicationFeedback;
typedef struct WalproposerShmemState
@@ -288,7 +302,7 @@ typedef struct WalproposerShmemState
ReplicationFeedback feedback;
term_t mineLastElectedTerm;
pg_atomic_uint64 backpressureThrottlingTime;
} WalproposerShmemState;
} WalproposerShmemState;
/*
* Report safekeeper state to proposer
@@ -296,25 +310,26 @@ typedef struct WalproposerShmemState
typedef struct AppendResponse
{
AcceptorProposerMessage apm;
/*
* Current term of the safekeeper; if it is higher than proposer's, the
* compute is out of date.
*/
term_t term;
// TODO: add comment
XLogRecPtr flushLsn;
// Safekeeper reports back his awareness about which WAL is committed, as
// this is a criterion for walproposer --sync mode exit
XLogRecPtr commitLsn;
term_t term;
/* TODO: add comment */
XLogRecPtr flushLsn;
/* Safekeeper reports back his awareness about which WAL is committed, as */
/* this is a criterion for walproposer --sync mode exit */
XLogRecPtr commitLsn;
HotStandbyFeedback hs;
// Feedback recieved from pageserver includes standby_status_update fields
// and custom zenith feedback.
// This part of the message is extensible.
/* Feedback recieved from pageserver includes standby_status_update fields */
/* and custom zenith feedback. */
/* This part of the message is extensible. */
ReplicationFeedback rf;
} AppendResponse;
} AppendResponse;
// ReplicationFeedback is extensible part of the message that is parsed separately
// Other fields are fixed part
/* ReplicationFeedback is extensible part of the message that is parsed separately */
/* Other fields are fixed part */
#define APPENDRESPONSE_FIXEDPART_SIZE offsetof(AppendResponse, rf)
@@ -323,9 +338,10 @@ typedef struct AppendResponse
*/
typedef struct Safekeeper
{
char const* host;
char const* port;
char conninfo[MAXCONNINFO]; /* connection info for connecting/reconnecting */
char const *host;
char const *port;
char conninfo[MAXCONNINFO]; /* connection info for
* connecting/reconnecting */
/*
* postgres protocol connection to the WAL acceptor
@@ -333,46 +349,50 @@ typedef struct Safekeeper
* Equals NULL only when state = SS_OFFLINE. Nonblocking is set once we
* reach SS_ACTIVE; not before.
*/
WalProposerConn* conn;
WalProposerConn *conn;
/*
* Temporary buffer for the message being sent to the safekeeper.
*/
StringInfoData outbuf;
/*
* WAL reader, allocated for each safekeeper.
*/
XLogReaderState* xlogreader;
XLogReaderState *xlogreader;
/*
* Streaming will start here; must be record boundary.
*/
XLogRecPtr startStreamingAt;
XLogRecPtr startStreamingAt;
bool flushWrite; /* set to true if we need to call AsyncFlush, to flush pending messages */
XLogRecPtr streamingAt; /* current streaming position */
AppendRequestHeader appendRequest; /* request for sending to safekeeper */
bool flushWrite; /* set to true if we need to call AsyncFlush,
* to flush pending messages */
XLogRecPtr streamingAt; /* current streaming position */
AppendRequestHeader appendRequest; /* request for sending to safekeeper */
int eventPos; /* position in wait event set. Equal to -1 if no event */
SafekeeperState state; /* safekeeper state machine state */
TimestampTz startedConnAt; /* when connection attempt started */
AcceptorGreeting greetResponse; /* acceptor greeting */
VoteResponse voteResponse; /* the vote */
AppendResponse appendResponse; /* feedback for master */
int eventPos; /* position in wait event set. Equal to -1 if
* no event */
SafekeeperState state; /* safekeeper state machine state */
TimestampTz startedConnAt; /* when connection attempt started */
AcceptorGreeting greetResponse; /* acceptor greeting */
VoteResponse voteResponse; /* the vote */
AppendResponse appendResponse; /* feedback for master */
} Safekeeper;
extern PGDLLIMPORT void WalProposerMain(Datum main_arg);
void WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos);
void WalProposerPoll(void);
void WalProposerRegister(void);
void ParseReplicationFeedbackMessage(StringInfo reply_message,
ReplicationFeedback *rf);
void WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos);
void WalProposerPoll(void);
void WalProposerRegister(void);
void ParseReplicationFeedbackMessage(StringInfo reply_message,
ReplicationFeedback * rf);
extern void StartProposerReplication(StartReplicationCmd *cmd);
Size WalproposerShmemSize(void);
bool WalproposerShmemInit(void);
void replication_feedback_set(ReplicationFeedback *rf);
void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);
Size WalproposerShmemSize(void);
bool WalproposerShmemInit(void);
void replication_feedback_set(ReplicationFeedback * rf);
void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);
/* libpqwalproposer hooks & helper type */
@@ -383,29 +403,37 @@ typedef enum
WP_CONN_POLLING_READING,
WP_CONN_POLLING_WRITING,
WP_CONN_POLLING_OK,
/*
* 'libpq-fe.h' still has PGRES_POLLING_ACTIVE, but says it's unused.
* We've removed it here to avoid clutter.
*/
} WalProposerConnectPollStatusType;
} WalProposerConnectPollStatusType;
/* Re-exported and modified ExecStatusType */
typedef enum
{
/* We received a single CopyBoth result */
WP_EXEC_SUCCESS_COPYBOTH,
/* Any success result other than a single CopyBoth was received. The specifics of the result
* were already logged, but it may be useful to provide an error message indicating which
* safekeeper messed up.
/*
* Any success result other than a single CopyBoth was received. The
* specifics of the result were already logged, but it may be useful to
* provide an error message indicating which safekeeper messed up.
*
* Do not expect PQerrorMessage to be appropriately set. */
* Do not expect PQerrorMessage to be appropriately set.
*/
WP_EXEC_UNEXPECTED_SUCCESS,
/* No result available at this time. Wait until read-ready, then call again. Internally, this is
* returned when PQisBusy indicates that PQgetResult would block. */
/*
* No result available at this time. Wait until read-ready, then call
* again. Internally, this is returned when PQisBusy indicates that
* PQgetResult would block.
*/
WP_EXEC_NEEDS_INPUT,
/* Catch-all failure. Check PQerrorMessage. */
WP_EXEC_FAILED,
} WalProposerExecStatusType;
} WalProposerExecStatusType;
/* Re-exported ConnStatusType */
typedef enum
@@ -414,40 +442,39 @@ typedef enum
WP_CONNECTION_BAD,
/*
* The original ConnStatusType has many more tags, but requests that
* they not be relied upon (except for displaying to the user). We
* don't need that extra functionality, so we collect them into a
* single tag here.
* The original ConnStatusType has many more tags, but requests that they
* not be relied upon (except for displaying to the user). We don't need
* that extra functionality, so we collect them into a single tag here.
*/
WP_CONNECTION_IN_PROGRESS,
} WalProposerConnStatusType;
} WalProposerConnStatusType;
/* Re-exported PQerrorMessage */
typedef char* (*walprop_error_message_fn) (WalProposerConn* conn);
typedef char *(*walprop_error_message_fn) (WalProposerConn * conn);
/* Re-exported PQstatus */
typedef WalProposerConnStatusType (*walprop_status_fn) (WalProposerConn* conn);
typedef WalProposerConnStatusType(*walprop_status_fn) (WalProposerConn * conn);
/* Re-exported PQconnectStart */
typedef WalProposerConn* (*walprop_connect_start_fn) (char* conninfo);
typedef WalProposerConn * (*walprop_connect_start_fn) (char *conninfo);
/* Re-exported PQconectPoll */
typedef WalProposerConnectPollStatusType (*walprop_connect_poll_fn) (WalProposerConn* conn);
typedef WalProposerConnectPollStatusType(*walprop_connect_poll_fn) (WalProposerConn * conn);
/* Blocking wrapper around PQsendQuery */
typedef bool (*walprop_send_query_fn) (WalProposerConn* conn, char* query);
typedef bool (*walprop_send_query_fn) (WalProposerConn * conn, char *query);
/* Wrapper around PQconsumeInput + PQisBusy + PQgetResult */
typedef WalProposerExecStatusType (*walprop_get_query_result_fn) (WalProposerConn* conn);
typedef WalProposerExecStatusType(*walprop_get_query_result_fn) (WalProposerConn * conn);
/* Re-exported PQsocket */
typedef pgsocket (*walprop_socket_fn) (WalProposerConn* conn);
typedef pgsocket (*walprop_socket_fn) (WalProposerConn * conn);
/* Wrapper around PQconsumeInput (if socket's read-ready) + PQflush */
typedef int (*walprop_flush_fn) (WalProposerConn* conn);
typedef int (*walprop_flush_fn) (WalProposerConn * conn);
/* Re-exported PQfinish */
typedef void (*walprop_finish_fn) (WalProposerConn* conn);
typedef void (*walprop_finish_fn) (WalProposerConn * conn);
/*
* Ergonomic wrapper around PGgetCopyData
@@ -463,9 +490,9 @@ typedef void (*walprop_finish_fn) (WalProposerConn* conn);
* performs a bit of extra checking work that's always required and is normally
* somewhat verbose.
*/
typedef PGAsyncReadResult (*walprop_async_read_fn) (WalProposerConn* conn,
char** buf,
int* amount);
typedef PGAsyncReadResult(*walprop_async_read_fn) (WalProposerConn * conn,
char **buf,
int *amount);
/*
* Ergonomic wrapper around PQputCopyData + PQflush
@@ -474,33 +501,33 @@ typedef PGAsyncReadResult (*walprop_async_read_fn) (WalProposerConn* conn,
*
* For information on the meaning of return codes, refer to PGAsyncWriteResult.
*/
typedef PGAsyncWriteResult (*walprop_async_write_fn) (WalProposerConn* conn,
void const* buf,
size_t size);
typedef PGAsyncWriteResult(*walprop_async_write_fn) (WalProposerConn * conn,
void const *buf,
size_t size);
/*
* Blocking equivalent to walprop_async_write_fn
*
* Returns 'true' if successful, 'false' on failure.
*/
typedef bool (*walprop_blocking_write_fn) (WalProposerConn* conn, void const* buf, size_t size);
typedef bool (*walprop_blocking_write_fn) (WalProposerConn * conn, void const *buf, size_t size);
/* All libpqwalproposer exported functions collected together. */
typedef struct WalProposerFunctionsType
{
walprop_error_message_fn walprop_error_message;
walprop_status_fn walprop_status;
walprop_connect_start_fn walprop_connect_start;
walprop_connect_poll_fn walprop_connect_poll;
walprop_send_query_fn walprop_send_query;
walprop_get_query_result_fn walprop_get_query_result;
walprop_socket_fn walprop_socket;
walprop_flush_fn walprop_flush;
walprop_finish_fn walprop_finish;
walprop_async_read_fn walprop_async_read;
walprop_async_write_fn walprop_async_write;
walprop_blocking_write_fn walprop_blocking_write;
} WalProposerFunctionsType;
walprop_error_message_fn walprop_error_message;
walprop_status_fn walprop_status;
walprop_connect_start_fn walprop_connect_start;
walprop_connect_poll_fn walprop_connect_poll;
walprop_send_query_fn walprop_send_query;
walprop_get_query_result_fn walprop_get_query_result;
walprop_socket_fn walprop_socket;
walprop_flush_fn walprop_flush;
walprop_finish_fn walprop_finish;
walprop_async_read_fn walprop_async_read;
walprop_async_write_fn walprop_async_write;
walprop_blocking_write_fn walprop_blocking_write;
} WalProposerFunctionsType;
/* Allow the above functions to be "called" with normal syntax */
#define walprop_error_message(conn) \
@@ -536,8 +563,8 @@ typedef struct WalProposerFunctionsType
* This pointer is set by the initializer in libpqwalproposer, so that we
* can use it later.
*/
extern PGDLLIMPORT WalProposerFunctionsType *WalProposerFunctions;
extern PGDLLIMPORT WalProposerFunctionsType * WalProposerFunctions;
extern uint64 BackpressureThrottlingTime(void);
#endif /* __NEON_WALPROPOSER_H__ */
#endif /* __NEON_WALPROPOSER_H__ */

View File

@@ -127,10 +127,10 @@ CompareLsn(const void *a, const void *b)
*
* elog(LOG, "currently in state [%s]", FormatSafekeeperState(sk->state));
*/
char*
char *
FormatSafekeeperState(SafekeeperState state)
{
char* return_val = NULL;
char *return_val = NULL;
switch (state)
{
@@ -171,27 +171,30 @@ FormatSafekeeperState(SafekeeperState state)
/* Asserts that the provided events are expected for given safekeeper's state */
void
AssertEventsOkForState(uint32 events, Safekeeper* sk)
AssertEventsOkForState(uint32 events, Safekeeper *sk)
{
uint32 expected = SafekeeperStateDesiredEvents(sk->state);
uint32 expected = SafekeeperStateDesiredEvents(sk->state);
/* The events are in-line with what we're expecting, under two conditions:
* (a) if we aren't expecting anything, `events` has no read- or
* write-ready component.
* (b) if we are expecting something, there's overlap
* (i.e. `events & expected != 0`)
/*
* The events are in-line with what we're expecting, under two conditions:
* (a) if we aren't expecting anything, `events` has no read- or
* write-ready component. (b) if we are expecting something, there's
* overlap (i.e. `events & expected != 0`)
*/
bool events_ok_for_state; /* long name so the `Assert` is more clear later */
bool events_ok_for_state; /* long name so the `Assert` is more
* clear later */
if (expected == WL_NO_EVENTS)
events_ok_for_state = ((events & (WL_SOCKET_READABLE|WL_SOCKET_WRITEABLE)) == 0);
events_ok_for_state = ((events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) == 0);
else
events_ok_for_state = ((events & expected) != 0);
if (!events_ok_for_state)
{
/* To give a descriptive message in the case of failure, we use elog and
* then an assertion that's guaranteed to fail. */
/*
* To give a descriptive message in the case of failure, we use elog
* and then an assertion that's guaranteed to fail.
*/
elog(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]",
FormatEvents(events), sk->host, sk->port, FormatSafekeeperState(sk->state));
Assert(events_ok_for_state);
@@ -204,12 +207,12 @@ AssertEventsOkForState(uint32 events, Safekeeper* sk)
uint32
SafekeeperStateDesiredEvents(SafekeeperState state)
{
uint32 result = WL_NO_EVENTS;
uint32 result = WL_NO_EVENTS;
/* If the state doesn't have a modifier, we can check the base state */
switch (state)
{
/* Connecting states say what they want in the name */
/* Connecting states say what they want in the name */
case SS_CONNECTING_READ:
result = WL_SOCKET_READABLE;
break;
@@ -217,33 +220,35 @@ SafekeeperStateDesiredEvents(SafekeeperState state)
result = WL_SOCKET_WRITEABLE;
break;
/* Reading states need the socket to be read-ready to continue */
/* Reading states need the socket to be read-ready to continue */
case SS_WAIT_EXEC_RESULT:
case SS_HANDSHAKE_RECV:
case SS_WAIT_VERDICT:
result = WL_SOCKET_READABLE;
break;
/* Idle states use read-readiness as a sign that the connection has been
* disconnected. */
/*
* Idle states use read-readiness as a sign that the connection
* has been disconnected.
*/
case SS_VOTING:
case SS_IDLE:
result = WL_SOCKET_READABLE;
break;
/*
* Flush states require write-ready for flushing.
* Active state does both reading and writing.
*
* TODO: SS_ACTIVE sometimes doesn't need to be write-ready. We should
* check sk->flushWrite here to set WL_SOCKET_WRITEABLE.
*/
/*
* Flush states require write-ready for flushing. Active state
* does both reading and writing.
*
* TODO: SS_ACTIVE sometimes doesn't need to be write-ready. We
* should check sk->flushWrite here to set WL_SOCKET_WRITEABLE.
*/
case SS_SEND_ELECTED_FLUSH:
case SS_ACTIVE:
result = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
break;
/* The offline state expects no events. */
/* The offline state expects no events. */
case SS_OFFLINE:
result = WL_NO_EVENTS;
break;
@@ -263,27 +268,30 @@ SafekeeperStateDesiredEvents(SafekeeperState state)
*
* The string should not be freed. It should also not be expected to remain the same between
* function calls. */
char*
char *
FormatEvents(uint32 events)
{
static char return_str[8];
/* Helper variable to check if there's extra bits */
uint32 all_flags = WL_LATCH_SET
| WL_SOCKET_READABLE
| WL_SOCKET_WRITEABLE
| WL_TIMEOUT
| WL_POSTMASTER_DEATH
| WL_EXIT_ON_PM_DEATH
| WL_SOCKET_CONNECTED;
uint32 all_flags = WL_LATCH_SET
| WL_SOCKET_READABLE
| WL_SOCKET_WRITEABLE
| WL_TIMEOUT
| WL_POSTMASTER_DEATH
| WL_EXIT_ON_PM_DEATH
| WL_SOCKET_CONNECTED;
/* The formatting here isn't supposed to be *particularly* useful -- it's just to give an
* sense of what events have been triggered without needing to remember your powers of two. */
/*
* The formatting here isn't supposed to be *particularly* useful -- it's
* just to give an sense of what events have been triggered without
* needing to remember your powers of two.
*/
return_str[0] = (events & WL_LATCH_SET ) ? 'L' : '_';
return_str[1] = (events & WL_SOCKET_READABLE ) ? 'R' : '_';
return_str[0] = (events & WL_LATCH_SET) ? 'L' : '_';
return_str[1] = (events & WL_SOCKET_READABLE) ? 'R' : '_';
return_str[2] = (events & WL_SOCKET_WRITEABLE) ? 'W' : '_';
return_str[3] = (events & WL_TIMEOUT ) ? 'T' : '_';
return_str[3] = (events & WL_TIMEOUT) ? 'T' : '_';
return_str[4] = (events & WL_POSTMASTER_DEATH) ? 'D' : '_';
return_str[5] = (events & WL_EXIT_ON_PM_DEATH) ? 'E' : '_';
return_str[5] = (events & WL_SOCKET_CONNECTED) ? 'C' : '_';
@@ -291,7 +299,7 @@ FormatEvents(uint32 events)
if (events & (~all_flags))
{
elog(WARNING, "Event formatting found unexpected component %d",
events & (~all_flags));
events & (~all_flags));
return_str[6] = '*';
return_str[7] = '\0';
}
@@ -407,21 +415,21 @@ XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr)
if (walpropFile < 0)
{
#if PG_VERSION_NUM >= 150000
// FIXME Is it ok to use hardcoded value here?
TimeLineID tli = 1;
#else
#if PG_VERSION_NUM >= 150000
/* FIXME Is it ok to use hardcoded value here? */
TimeLineID tli = 1;
#else
bool use_existent = true;
#endif
#endif
/* Create/use new log file */
XLByteToSeg(recptr, walpropSegNo, wal_segment_size);
#if PG_VERSION_NUM >= 150000
#if PG_VERSION_NUM >= 150000
walpropFile = XLogFileInit(walpropSegNo, tli);
walpropFileTLI = tli;
#else
#else
walpropFile = XLogFileInit(walpropSegNo, &use_existent, false);
walpropFileTLI = ThisTimeLineID;
#endif
#endif
}
/* Calculate the start offset of the received logs */
@@ -483,6 +491,7 @@ XLogWalPropClose(XLogRecPtr recptr)
if (close(walpropFile) != 0)
{
char xlogfname[MAXFNAMELEN];
XLogFileName(xlogfname, walpropFileTLI, walpropSegNo, wal_segment_size);
ereport(PANIC,
@@ -508,12 +517,12 @@ StartProposerReplication(StartReplicationCmd *cmd)
XLogRecPtr FlushPtr;
TimeLineID currTLI;
#if PG_VERSION_NUM < 150000
#if PG_VERSION_NUM < 150000
if (ThisTimeLineID == 0)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
#endif
errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
#endif
/* create xlogreader for physical replication */
xlogreader =
@@ -525,7 +534,7 @@ StartProposerReplication(StartReplicationCmd *cmd)
if (!xlogreader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
errmsg("out of memory")));
/*
* We assume here that we're logging enough information in the WAL for
@@ -542,7 +551,7 @@ StartProposerReplication(StartReplicationCmd *cmd)
if (SlotIsLogical(MyReplicationSlot))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot use a logical replication slot for physical replication")));
errmsg("cannot use a logical replication slot for physical replication")));
/*
* We don't need to verify the slot's restart_lsn here; instead we
@@ -630,9 +639,9 @@ StartProposerReplication(StartReplicationCmd *cmd)
(errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
LSN_FORMAT_ARGS(cmd->startpoint),
cmd->timeline),
errdetail("This server's history forked from timeline %u at %X/%X.",
cmd->timeline,
LSN_FORMAT_ARGS(switchpoint))));
errdetail("This server's history forked from timeline %u at %X/%X.",
cmd->timeline,
LSN_FORMAT_ARGS(switchpoint))));
}
sendTimeLineValidUpto = switchpoint;
}
@@ -869,14 +878,14 @@ WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
errno = save_errno;
ereport(ERROR,
(errcode_for_file_access(),
errmsg("requested WAL segment %s has already been removed",
xlogfname)));
errmsg("requested WAL segment %s has already been removed",
xlogfname)));
}
else
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\": %m",
path)));
errmsg("could not open file \"%s\": %m",
path)));
}
@@ -943,7 +952,7 @@ XLogSendPhysical(void)
XLogRecPtr startptr;
XLogRecPtr endptr;
Size nbytes PG_USED_FOR_ASSERTS_ONLY;
TimeLineID currTLI;
TimeLineID currTLI;
/* If requested switch the WAL sender to the stopping state. */
if (got_STOPPING)
@@ -1004,8 +1013,8 @@ XLogSendPhysical(void)
{
/*
* Still a cascading standby. But is the timeline we're sending
* still the one recovery is recovering from? currTLI was
* updated by the GetStandbyFlushRecPtr() call above.
* still the one recovery is recovering from? currTLI was updated
* by the GetStandbyFlushRecPtr() call above.
*/
if (sendTimeLine != currTLI)
becameHistoric = true;
@@ -1043,11 +1052,11 @@ XLogSendPhysical(void)
* primary: if the primary subsequently crashes and restarts, standbys
* must not have applied any WAL that got lost on the primary.
*/
#if PG_VERSION_NUM >= 150000
#if PG_VERSION_NUM >= 150000
SendRqstPtr = GetFlushRecPtr(NULL);
#else
#else
SendRqstPtr = GetFlushRecPtr();
#endif
#endif
}
/*
@@ -1180,4 +1189,3 @@ XLogSendPhysical(void)
set_ps_display(activitymsg);
}
}

View File

@@ -3,17 +3,17 @@
#include "walproposer.h"
int CompareLsn(const void *a, const void *b);
char* FormatSafekeeperState(SafekeeperState state);
void AssertEventsOkForState(uint32 events, Safekeeper* sk);
uint32 SafekeeperStateDesiredEvents(SafekeeperState state);
char* FormatEvents(uint32 events);
bool HexDecodeString(uint8 *result, char *input, int nbytes);
uint32 pq_getmsgint32_le(StringInfo msg);
uint64 pq_getmsgint64_le(StringInfo msg);
void pq_sendint32_le(StringInfo buf, uint32 i);
void pq_sendint64_le(StringInfo buf, uint64 i);
void XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr);
void XLogWalPropClose(XLogRecPtr recptr);
int CompareLsn(const void *a, const void *b);
char *FormatSafekeeperState(SafekeeperState state);
void AssertEventsOkForState(uint32 events, Safekeeper *sk);
uint32 SafekeeperStateDesiredEvents(SafekeeperState state);
char *FormatEvents(uint32 events);
bool HexDecodeString(uint8 *result, char *input, int nbytes);
uint32 pq_getmsgint32_le(StringInfo msg);
uint64 pq_getmsgint64_le(StringInfo msg);
void pq_sendint32_le(StringInfo buf, uint32 i);
void pq_sendint64_le(StringInfo buf, uint64 i);
void XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr);
void XLogWalPropClose(XLogRecPtr recptr);
#endif /* __NEON_WALPROPOSER_UTILS_H__ */
#endif /* __NEON_WALPROPOSER_UTILS_H__ */

View File

@@ -39,8 +39,8 @@ PG_FUNCTION_INFO_V1(neon_xlogflush);
* Linkage to functions in zenith module.
* The signature here would need to be updated whenever function parameters change in pagestore_smgr.c
*/
typedef void (*zenith_read_at_lsn_type)(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
XLogRecPtr request_lsn, bool request_latest, char *buffer);
typedef void (*zenith_read_at_lsn_type) (RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
XLogRecPtr request_lsn, bool request_latest, char *buffer);
static zenith_read_at_lsn_type zenith_read_at_lsn_ptr;
@@ -136,8 +136,8 @@ clear_buffer_cache(PG_FUNCTION_ARGS)
/*
* Pin the buffer, and release it again. Because we have
* zenith_test_evict==true, this will evict the page from
* the buffer cache if no one else is holding a pin on it.
* zenith_test_evict==true, this will evict the page from the
* buffer cache if no one else is holding a pin on it.
*/
if (isvalid)
{
@@ -177,8 +177,8 @@ get_raw_page_at_lsn(PG_FUNCTION_ARGS)
text *forkname;
uint32 blkno;
bool request_latest = PG_ARGISNULL(3);
uint64 read_lsn = request_latest ? GetXLogInsertRecPtr() : PG_GETARG_INT64(3);
bool request_latest = PG_ARGISNULL(3);
uint64 read_lsn = request_latest ? GetXLogInsertRecPtr() : PG_GETARG_INT64(3);
if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2))
PG_RETURN_NULL();
@@ -262,7 +262,7 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS)
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to use raw page functions")));
errmsg("must be superuser to use raw page functions")));
if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2) ||
PG_ARGISNULL(3) || PG_ARGISNULL(4))
@@ -271,19 +271,20 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS)
{
RelFileNode rnode = {
.spcNode = PG_GETARG_OID(0),
.dbNode = PG_GETARG_OID(1),
.dbNode = PG_GETARG_OID(1),
.relNode = PG_GETARG_OID(2)
};
ForkNumber forknum = PG_GETARG_UINT32(3);
ForkNumber forknum = PG_GETARG_UINT32(3);
uint32 blkno = PG_GETARG_UINT32(4);
bool request_latest = PG_ARGISNULL(5);
uint64 read_lsn = request_latest ? GetXLogInsertRecPtr() : PG_GETARG_INT64(5);
uint32 blkno = PG_GETARG_UINT32(4);
bool request_latest = PG_ARGISNULL(5);
uint64 read_lsn = request_latest ? GetXLogInsertRecPtr() : PG_GETARG_INT64(5);
/* Initialize buffer to copy to */
bytea *raw_page = (bytea *) palloc(BLCKSZ + VARHDRSZ);
bytea *raw_page = (bytea *) palloc(BLCKSZ + VARHDRSZ);
SET_VARSIZE(raw_page, BLCKSZ + VARHDRSZ);
raw_page_data = VARDATA(raw_page);
@@ -298,7 +299,8 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS)
Datum
neon_xlogflush(PG_FUNCTION_ARGS)
{
XLogRecPtr lsn = PG_GETARG_LSN(0);
XLogRecPtr lsn = PG_GETARG_LSN(0);
XLogFlush(lsn);
PG_RETURN_VOID();
}

3776
pgxn/typedefs.list Normal file

File diff suppressed because it is too large Load Diff