Extract neon and neon_test_utils from postgres repo (#2325)

* Extract neon and neon_test_utils from postgres repo
* Remove neon from vendored postgres repo, and fix build_and_test.yml
* Move EmitWarningsOnPlaceholders to end of _PG_init in neon.c (from libpagestore.c)
* Fix Makefile location comments
* remove Makefile EXTRA_INSTALL flag
* Update Dockerfile.compute-node to build and include the neon extension
This commit is contained in:
MMeent
2022-08-25 18:48:09 +02:00
committed by GitHub
parent bc588f3a53
commit 04a018a5b1
24 changed files with 7830 additions and 10 deletions

View File

@@ -136,6 +136,10 @@ jobs:
run: mold -run make postgres -j$(nproc)
shell: bash -euxo pipefail {0}
- name: Build neon extensions
run: mold -run make neon-pg-ext -j$(nproc)
shell: bash -euxo pipefail {0}
- name: Run cargo build
run: |
${cov_prefix} mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests

View File

@@ -81,6 +81,9 @@ jobs:
if: steps.cache_pg.outputs.cache-hit != 'true'
run: make postgres
- name: Build neon extensions
run: make neon-pg-ext
# Plain configure output can contain weird errors like 'error: C compiler cannot create executables'
# and the real cause will be inside config.log
- name: Print configure logs in case of failure

View File

@@ -13,7 +13,8 @@ RUN cd postgres && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s install && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C contrib/ install && \
# Install headers
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/include install
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/include install && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/interfaces/libpq install
# Build PostGIS from the upstream PostGIS mirror. PostGIS compiles against neon postgres sources without changes.
# Perhaps we could even use the upstream binaries, compiled against vanilla Postgres, but it would require some
@@ -55,6 +56,16 @@ RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.3.tar.gz && \
make install && \
rm -rf /plv8-*
# compile neon extensions
FROM build-deps AS neon-pg-ext-build
COPY --from=plv8-build /usr/local/pgsql/ /usr/local/pgsql/
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \
PG_CONFIG=/usr/local/pgsql/bin/pg_config \
-C pgxn/neon \
-s install
# Compile and run the Neon-specific `compute_ctl` binary
FROM 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:$TAG AS compute-tools
USER nonroot
@@ -73,8 +84,8 @@ RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
echo '/usr/local/lib' >> /etc/ld.so.conf && /sbin/ldconfig
# TODO: Check if we can make the extension setup more modular versus a linear build
# currently plv8-build copies the output /usr/local/pgsql from postgis-build#
COPY --from=plv8-build --chown=postgres /usr/local/pgsql /usr/local
# currently plv8-build copies the output /usr/local/pgsql from postgis-build, etc#
COPY --from=neon-pg-ext-build --chown=postgres /usr/local/pgsql /usr/local
COPY --from=compute-tools --chown=postgres /home/nonroot/compute_tools/target/release/compute_ctl /usr/local/bin/compute_ctl
RUN apt update && \

View File

@@ -51,7 +51,7 @@ CARGO_CMD_PREFIX += CARGO_TERM_PROGRESS_WHEN=never CI=1
# Top level Makefile to build Zenith and PostgreSQL
#
.PHONY: all
all: zenith postgres
all: zenith postgres neon-pg-ext
### Zenith Rust bits
#
@@ -87,25 +87,39 @@ postgres: postgres-configure \
postgres-headers # to prevent `make install` conflicts with zenith's `postgres-headers`
+@echo "Compiling PostgreSQL"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build MAKELEVEL=0 install
+@echo "Compiling contrib/neon"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/contrib/neon install
+@echo "Compiling contrib/neon_test_utils"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/contrib/neon_test_utils install
+@echo "Compiling libpq"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/src/interfaces/libpq install
+@echo "Compiling pg_buffercache"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/contrib/pg_buffercache install
+@echo "Compiling pageinspect"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/contrib/pageinspect install
.PHONY: postgres-clean
postgres-clean:
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build MAKELEVEL=0 clean
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/contrib/pg_buffercache clean
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/contrib/pageinspect clean
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/src/interfaces/libpq clean
neon-pg-ext: postgres
+@echo "Compiling neon"
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/bin/pg_config \
-C $(ROOT_PROJECT_DIR)/pgxn/neon install
+@echo "Compiling neon_test_utils"
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/bin/pg_config \
-C $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils install
.PHONY: neon-pg-ext-clean
$(MAKE) -C $(ROOT_PROJECT_DIR)/pgxn/neon clean
$(MAKE) -C $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils clean
# This doesn't remove the effects of 'configure'.
.PHONY: clean
clean:
cd $(POSTGRES_INSTALL_DIR)/build && $(MAKE) clean
$(CARGO_CMD_PREFIX) cargo clean
cd pgxn/neon && $(MAKE) clean
cd pgxn/neon_test_utils && $(MAKE) clean
# This removes everything
.PHONY: distclean

26
pgxn/neon/Makefile Normal file
View File

@@ -0,0 +1,26 @@
# pgxs/neon/Makefile
MODULE_big = neon
OBJS = \
$(WIN32RES) \
inmem_smgr.o \
libpagestore.o \
libpqwalproposer.o \
pagestore_smgr.o \
relsize_cache.o \
neon.o \
walproposer.o \
walproposer_utils.o
PG_CPPFLAGS = -I$(libpq_srcdir)
SHLIB_LINK_INTERNAL = $(libpq)
EXTENSION = neon
DATA = neon--1.0.sql
PGFILEDESC = "neon - cloud storage for PostgreSQL"
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)

286
pgxn/neon/inmem_smgr.c Normal file
View File

@@ -0,0 +1,286 @@
/*-------------------------------------------------------------------------
*
* inmem_smgr.c
*
* This is an implementation of the SMGR interface, used in the WAL redo
* process (see src/backend/tcop/zenith_wal_redo.c). It has no persistent
* storage, the pages that are written out are kept in a small number of
* in-memory buffers.
*
* Normally, replaying a WAL record only needs to access a handful of
* buffers, which fit in the normal buffer cache, so this is just for
* "overflow" storage when the buffer cache is not large enough.
*
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* contrib/neon/inmem_smgr.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/xlog.h"
#include "pagestore_client.h"
#include "storage/block.h"
#include "storage/buf_internals.h"
#include "storage/relfilenode.h"
#include "storage/smgr.h"
/* Size of the in-memory smgr */
#define MAX_PAGES 64
/* If more than WARN_PAGES are used, print a warning in the log */
#define WARN_PAGES 32
static BufferTag page_tag[MAX_PAGES];
static char page_body[MAX_PAGES][BLCKSZ];
static int used_pages;
static int
locate_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno)
{
/* We only hold a small number of pages, so linear search */
for (int i = 0; i < used_pages; i++)
{
if (RelFileNodeEquals(reln->smgr_rnode.node, page_tag[i].rnode)
&& forknum == page_tag[i].forkNum
&& blkno == page_tag[i].blockNum)
{
return i;
}
}
return -1;
}
/*
* inmem_init() -- Initialize private state
*/
void
inmem_init(void)
{
used_pages = 0;
}
/*
* inmem_exists() -- Does the physical file exist?
*/
bool
inmem_exists(SMgrRelation reln, ForkNumber forknum)
{
for (int i = 0; i < used_pages; i++)
{
if (RelFileNodeEquals(reln->smgr_rnode.node, page_tag[i].rnode)
&& forknum == page_tag[i].forkNum)
{
return true;
}
}
return false;
}
/*
* inmem_create() -- Create a new relation on zenithd storage
*
* If isRedo is true, it's okay for the relation to exist already.
*/
void
inmem_create(SMgrRelation reln, ForkNumber forknum, bool isRedo)
{
}
/*
* inmem_unlink() -- Unlink a relation.
*/
void
inmem_unlink(RelFileNodeBackend rnode, ForkNumber forknum, bool isRedo)
{
}
/*
* inmem_extend() -- Add a block to the specified relation.
*
* The semantics are nearly the same as mdwrite(): write at the
* specified position. However, this is to be used for the case of
* extending a relation (i.e., blocknum is at or beyond the current
* EOF). Note that we assume writing a block beyond current EOF
* causes intervening file space to become filled with zeroes.
*/
void
inmem_extend(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
char *buffer, bool skipFsync)
{
/* same as smgwrite() for us */
inmem_write(reln, forknum, blkno, buffer, skipFsync);
}
/*
* inmem_open() -- Initialize newly-opened relation.
*/
void
inmem_open(SMgrRelation reln)
{
}
/*
* inmem_close() -- Close the specified relation, if it isn't closed already.
*/
void
inmem_close(SMgrRelation reln, ForkNumber forknum)
{
}
/*
* inmem_prefetch() -- Initiate asynchronous read of the specified block of a relation
*/
bool
inmem_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
{
return true;
}
/*
* inmem_writeback() -- Tell the kernel to write pages back to storage.
*/
void
inmem_writeback(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, BlockNumber nblocks)
{
}
/*
* inmem_read() -- Read the specified block from a relation.
*/
void
inmem_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
char *buffer)
{
int pg;
pg = locate_page(reln, forknum, blkno);
if (pg < 0)
memset(buffer, 0, BLCKSZ);
else
memcpy(buffer, page_body[pg], BLCKSZ);
}
/*
* inmem_write() -- Write the supplied block at the appropriate location.
*
* This is to be used only for updating already-existing blocks of a
* relation (ie, those before the current EOF). To extend a relation,
* use mdextend().
*/
void
inmem_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
char *buffer, bool skipFsync)
{
int pg;
pg = locate_page(reln, forknum, blocknum);
if (pg < 0)
{
/*
* We assume the buffer cache is large enough to hold all the buffers
* needed for most operations. Overflowing to this "in-mem smgr" in rare
* cases is OK. But if we find that we're using more than WARN_PAGES,
* print a warning so that we get alerted and get to investigate why
* we're accessing so many buffers.
*/
elog(used_pages >= WARN_PAGES ? WARNING : DEBUG1,
"inmem_write() called for %u/%u/%u.%u blk %u: used_pages %u",
reln->smgr_rnode.node.spcNode,
reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode,
forknum,
blocknum,
used_pages);
if (used_pages == MAX_PAGES)
elog(ERROR, "Inmem storage overflow");
pg = used_pages;
used_pages++;
INIT_BUFFERTAG(page_tag[pg], reln->smgr_rnode.node, forknum, blocknum);
} else {
elog(DEBUG1, "inmem_write() called for %u/%u/%u.%u blk %u: found at %u",
reln->smgr_rnode.node.spcNode,
reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode,
forknum,
blocknum,
used_pages);
}
memcpy(page_body[pg], buffer, BLCKSZ);
}
/*
* inmem_nblocks() -- Get the number of blocks stored in a relation.
*/
BlockNumber
inmem_nblocks(SMgrRelation reln, ForkNumber forknum)
{
/*
* It's not clear why a WAL redo function would call smgrnblocks().
* During recovery, at least before reaching consistency, the size of a
* relation could be arbitrarily small, if it was truncated after the
* record being replayed, or arbitrarily large if it was extended
* afterwards. But one place where it's called is in
* XLogReadBufferExtended(): it extends the relation, if it's smaller than
* the requested page. That's a waste of time in the WAL redo
* process. Pretend that all relations are maximally sized to avoid it.
*/
return MaxBlockNumber;
}
/*
* inmem_truncate() -- Truncate relation to specified number of blocks.
*/
void
inmem_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
{
}
/*
* inmem_immedsync() -- Immediately sync a relation to stable storage.
*/
void
inmem_immedsync(SMgrRelation reln, ForkNumber forknum)
{
}
static const struct f_smgr inmem_smgr =
{
.smgr_init = inmem_init,
.smgr_shutdown = NULL,
.smgr_open = inmem_open,
.smgr_close = inmem_close,
.smgr_create = inmem_create,
.smgr_exists = inmem_exists,
.smgr_unlink = inmem_unlink,
.smgr_extend = inmem_extend,
.smgr_prefetch = inmem_prefetch,
.smgr_read = inmem_read,
.smgr_write = inmem_write,
.smgr_writeback = inmem_writeback,
.smgr_nblocks = inmem_nblocks,
.smgr_truncate = inmem_truncate,
.smgr_immedsync = inmem_immedsync,
};
const f_smgr *
smgr_inmem(BackendId backend, RelFileNode rnode)
{
Assert(InRecovery);
if (backend != InvalidBackendId)
return smgr_standard(backend, rnode);
else
return &inmem_smgr;
}
void
smgr_init_inmem()
{
inmem_init();
}

432
pgxn/neon/libpagestore.c Normal file
View File

@@ -0,0 +1,432 @@
/*-------------------------------------------------------------------------
*
* libpagestore.c
* Handles network communications with the remote pagestore.
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* contrib/neon/libpqpagestore.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "pagestore_client.h"
#include "fmgr.h"
#include "access/xlog.h"
#include "libpq-fe.h"
#include "libpq/pqformat.h"
#include "libpq/libpq.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "utils/guc.h"
#include "neon.h"
#include "walproposer.h"
#include "walproposer_utils.h"
#define PageStoreTrace DEBUG5
#define NEON_TAG "[NEON_SMGR] "
#define neon_log(tag, fmt, ...) ereport(tag, \
(errmsg(NEON_TAG fmt, ## __VA_ARGS__), \
errhidestmt(true), errhidecontext(true)))
bool connected = false;
PGconn *pageserver_conn = NULL;
char *page_server_connstring_raw;
static ZenithResponse *pageserver_call(ZenithRequest *request);
page_server_api api = {
.request = pageserver_call
};
static void
pageserver_connect()
{
char *query;
int ret;
Assert(!connected);
pageserver_conn = PQconnectdb(page_server_connstring);
if (PQstatus(pageserver_conn) == CONNECTION_BAD)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
PQfinish(pageserver_conn);
pageserver_conn = NULL;
ereport(ERROR,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
errmsg(NEON_TAG "could not establish connection to pageserver"),
errdetail_internal("%s", msg)));
}
query = psprintf("pagestream %s %s", zenith_tenant, zenith_timeline);
ret = PQsendQuery(pageserver_conn, query);
if (ret != 1)
{
PQfinish(pageserver_conn);
pageserver_conn = NULL;
neon_log(ERROR, "could not send pagestream command to pageserver");
}
while (PQisBusy(pageserver_conn))
{
int wc;
/* Sleep until there's something to do */
wc = WaitLatchOrSocket(MyLatch,
WL_LATCH_SET | WL_SOCKET_READABLE |
WL_EXIT_ON_PM_DEATH,
PQsocket(pageserver_conn),
-1L, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Data available in socket? */
if (wc & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(pageserver_conn))
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
PQfinish(pageserver_conn);
pageserver_conn = NULL;
neon_log(ERROR, "could not complete handshake with pageserver: %s",
msg);
}
}
}
neon_log(LOG, "libpagestore: connected to '%s'", page_server_connstring_raw);
connected = true;
}
/*
* A wrapper around PQgetCopyData that checks for interrupts while sleeping.
*/
static int
call_PQgetCopyData(PGconn *conn, char **buffer)
{
int ret;
retry:
ret = PQgetCopyData(conn, buffer, 1 /* async */ );
if (ret == 0)
{
int wc;
/* Sleep until there's something to do */
wc = WaitLatchOrSocket(MyLatch,
WL_LATCH_SET | WL_SOCKET_READABLE |
WL_EXIT_ON_PM_DEATH,
PQsocket(conn),
-1L, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Data available in socket? */
if (wc & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(conn))
neon_log(ERROR, "could not get response from pageserver: %s",
PQerrorMessage(conn));
}
goto retry;
}
return ret;
}
static ZenithResponse *
pageserver_call(ZenithRequest *request)
{
StringInfoData req_buff;
StringInfoData resp_buff;
ZenithResponse *resp;
PG_TRY();
{
/* If the connection was lost for some reason, reconnect */
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD)
{
PQfinish(pageserver_conn);
pageserver_conn = NULL;
connected = false;
}
if (!connected)
pageserver_connect();
req_buff = zm_pack_request(request);
/*
* Send request.
*
* In principle, this could block if the output buffer is full, and we
* should use async mode and check for interrupts while waiting. In
* practice, our requests are small enough to always fit in the output
* and TCP buffer.
*/
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0 || PQflush(pageserver_conn))
{
neon_log(ERROR, "failed to send page request: %s",
PQerrorMessage(pageserver_conn));
}
pfree(req_buff.data);
if (message_level_is_interesting(PageStoreTrace))
{
char *msg = zm_to_string((ZenithMessage *) request);
neon_log(PageStoreTrace, "sent request: %s", msg);
pfree(msg);
}
/* read response */
resp_buff.len = call_PQgetCopyData(pageserver_conn, &resp_buff.data);
resp_buff.cursor = 0;
if (resp_buff.len == -1)
neon_log(ERROR, "end of COPY");
else if (resp_buff.len == -2)
neon_log(ERROR, "could not read COPY data: %s", PQerrorMessage(pageserver_conn));
resp = zm_unpack_response(&resp_buff);
PQfreemem(resp_buff.data);
if (message_level_is_interesting(PageStoreTrace))
{
char *msg = zm_to_string((ZenithMessage *) resp);
neon_log(PageStoreTrace, "got response: %s", msg);
pfree(msg);
}
}
PG_CATCH();
{
/*
* If anything goes wrong while we were sending a request, it's not
* clear what state the connection is in. For example, if we sent the
* request but didn't receive a response yet, we might receive the
* response some time later after we have already sent a new unrelated
* request. Close the connection to avoid getting confused.
*/
if (connected)
{
neon_log(LOG, "dropping connection to page server due to error");
PQfinish(pageserver_conn);
pageserver_conn = NULL;
connected = false;
}
PG_RE_THROW();
}
PG_END_TRY();
return (ZenithResponse *) resp;
}
static bool
check_zenith_id(char **newval, void **extra, GucSource source)
{
uint8 zid[16];
return **newval == '\0' || HexDecodeString(zid, *newval, 16);
}
static char *
substitute_pageserver_password(const char *page_server_connstring_raw)
{
char *host = NULL;
char *port = NULL;
char *user = NULL;
char *auth_token = NULL;
char *err = NULL;
char *page_server_connstring = NULL;
PQconninfoOption *conn_options;
PQconninfoOption *conn_option;
MemoryContext oldcontext;
/*
* Here we substitute password in connection string with an environment
* variable. To simplify things we construct a connection string back with
* only known options. In particular: host port user and password. We do
* not currently use other options and constructing full connstring in an
* URI shape is quite messy.
*/
if (page_server_connstring_raw == NULL || page_server_connstring_raw[0] == '\0')
return NULL;
/* extract the auth token from the connection string */
conn_options = PQconninfoParse(page_server_connstring_raw, &err);
if (conn_options == NULL)
{
/* The error string is malloc'd, so we must free it explicitly */
char *errcopy = err ? pstrdup(err) : "out of memory";
PQfreemem(err);
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("invalid connection string syntax: %s", errcopy)));
}
/*
* Trying to populate pageserver connection string with auth token from
* environment. We are looking for password in with placeholder value like
* $ENV_VAR_NAME, so if password field is present and starts with $ we try
* to fetch environment variable value and fail loudly if it is not set.
*/
for (conn_option = conn_options; conn_option->keyword != NULL; conn_option++)
{
if (strcmp(conn_option->keyword, "host") == 0)
{
if (conn_option->val != NULL && conn_option->val[0] != '\0')
host = conn_option->val;
}
else if (strcmp(conn_option->keyword, "port") == 0)
{
if (conn_option->val != NULL && conn_option->val[0] != '\0')
port = conn_option->val;
}
else if (strcmp(conn_option->keyword, "user") == 0)
{
if (conn_option->val != NULL && conn_option->val[0] != '\0')
user = conn_option->val;
}
else if (strcmp(conn_option->keyword, "password") == 0)
{
if (conn_option->val != NULL && conn_option->val[0] != '\0')
{
/* ensure that this is a template */
if (strncmp(conn_option->val, "$", 1) != 0)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("expected placeholder value in pageserver password starting from $ but found: %s", &conn_option->val[1])));
neon_log(LOG, "found auth token placeholder in pageserver conn string '%s'", &conn_option->val[1]);
auth_token = getenv(&conn_option->val[1]);
if (!auth_token)
{
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("cannot get auth token, environment variable %s is not set", &conn_option->val[1])));
}
else
{
neon_log(LOG, "using auth token from environment passed via env");
}
}
}
}
/*
* allocate connection string in TopMemoryContext to make sure it is not
* freed
*/
oldcontext = CurrentMemoryContext;
MemoryContextSwitchTo(TopMemoryContext);
page_server_connstring = psprintf("postgresql://%s:%s@%s:%s", user, auth_token ? auth_token : "", host, port);
MemoryContextSwitchTo(oldcontext);
PQconninfoFree(conn_options);
return page_server_connstring;
}
/*
* Module initialization function
*/
void
pg_init_libpagestore(void)
{
DefineCustomStringVariable("neon.pageserver_connstring",
"connection string to the page server",
NULL,
&page_server_connstring_raw,
"",
PGC_POSTMASTER,
0, /* no flags required */
NULL, NULL, NULL);
DefineCustomStringVariable("neon.timeline_id",
"Zenith timelineid the server is running on",
NULL,
&zenith_timeline,
"",
PGC_POSTMASTER,
0, /* no flags required */
check_zenith_id, NULL, NULL);
DefineCustomStringVariable("neon.tenant_id",
"Neon tenantid the server is running on",
NULL,
&zenith_tenant,
"",
PGC_POSTMASTER,
0, /* no flags required */
check_zenith_id, NULL, NULL);
DefineCustomBoolVariable("neon.wal_redo",
"start in wal-redo mode",
NULL,
&wal_redo,
false,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
DefineCustomIntVariable("neon.max_cluster_size",
"cluster size limit",
NULL,
&max_cluster_size,
-1, -1, INT_MAX,
PGC_SIGHUP,
GUC_UNIT_MB,
NULL, NULL, NULL);
relsize_hash_init();
if (page_server != NULL)
neon_log(ERROR, "libpagestore already loaded");
neon_log(PageStoreTrace, "libpagestore already loaded");
page_server = &api;
/* substitute password in pageserver_connstring */
page_server_connstring = substitute_pageserver_password(page_server_connstring_raw);
/* Is there more correct way to pass CustomGUC to postgres code? */
zenith_timeline_walproposer = zenith_timeline;
zenith_tenant_walproposer = zenith_tenant;
if (wal_redo)
{
neon_log(PageStoreTrace, "set inmem_smgr hook");
smgr_hook = smgr_inmem;
smgr_init_hook = smgr_init_inmem;
}
else if (page_server_connstring && page_server_connstring[0])
{
neon_log(PageStoreTrace, "set neon_smgr hook");
smgr_hook = smgr_zenith;
smgr_init_hook = smgr_init_zenith;
dbsize_hook = zenith_dbsize;
}
}

View File

@@ -0,0 +1,413 @@
#include "postgres.h"
#include "libpq-fe.h"
#include "neon.h"
#include "walproposer.h"
/* Header in walproposer.h -- Wrapper struct to abstract away the libpq connection */
struct WalProposerConn
{
PGconn* pg_conn;
bool is_nonblocking; /* whether the connection is non-blocking */
char *recvbuf; /* last received data from libpqprop_async_read */
};
/* Prototypes for exported functions */
static char* libpqprop_error_message(WalProposerConn* conn);
static WalProposerConnStatusType libpqprop_status(WalProposerConn* conn);
static WalProposerConn* libpqprop_connect_start(char* conninfo);
static WalProposerConnectPollStatusType libpqprop_connect_poll(WalProposerConn* conn);
static bool libpqprop_send_query(WalProposerConn* conn, char* query);
static WalProposerExecStatusType libpqprop_get_query_result(WalProposerConn* conn);
static pgsocket libpqprop_socket(WalProposerConn* conn);
static int libpqprop_flush(WalProposerConn* conn);
static void libpqprop_finish(WalProposerConn* conn);
static PGAsyncReadResult libpqprop_async_read(WalProposerConn* conn, char** buf, int* amount);
static PGAsyncWriteResult libpqprop_async_write(WalProposerConn* conn, void const* buf, size_t size);
static bool libpqprop_blocking_write(WalProposerConn* conn, void const* buf, size_t size);
static WalProposerFunctionsType PQWalProposerFunctions = {
libpqprop_error_message,
libpqprop_status,
libpqprop_connect_start,
libpqprop_connect_poll,
libpqprop_send_query,
libpqprop_get_query_result,
libpqprop_socket,
libpqprop_flush,
libpqprop_finish,
libpqprop_async_read,
libpqprop_async_write,
libpqprop_blocking_write,
};
/* Module initialization */
void
pg_init_libpqwalproposer(void)
{
if (WalProposerFunctions != NULL)
elog(ERROR, "libpqwalproposer already loaded");
WalProposerFunctions = &PQWalProposerFunctions;
}
/* Helper function */
static bool
ensure_nonblocking_status(WalProposerConn* conn, bool is_nonblocking)
{
/* If we're already correctly blocking or nonblocking, all good */
if (is_nonblocking == conn->is_nonblocking)
return true;
/* Otherwise, set it appropriately */
if (PQsetnonblocking(conn->pg_conn, is_nonblocking) == -1)
return false;
conn->is_nonblocking = is_nonblocking;
return true;
}
/* Exported function definitions */
static char*
libpqprop_error_message(WalProposerConn* conn)
{
return PQerrorMessage(conn->pg_conn);
}
static WalProposerConnStatusType
libpqprop_status(WalProposerConn* conn)
{
switch (PQstatus(conn->pg_conn))
{
case CONNECTION_OK:
return WP_CONNECTION_OK;
case CONNECTION_BAD:
return WP_CONNECTION_BAD;
default:
return WP_CONNECTION_IN_PROGRESS;
}
}
static WalProposerConn*
libpqprop_connect_start(char* conninfo)
{
WalProposerConn* conn;
PGconn* pg_conn;
pg_conn = PQconnectStart(conninfo);
/*
* Allocation of a PQconn can fail, and will return NULL. We want to fully replicate the
* behavior of PQconnectStart here.
*/
if (!pg_conn)
return NULL;
/*
* And in theory this allocation can fail as well, but it's incredibly unlikely if we just
* successfully allocated a PGconn.
*
* palloc will exit on failure though, so there's not much we could do if it *did* fail.
*/
conn = palloc(sizeof(WalProposerConn));
conn->pg_conn = pg_conn;
conn->is_nonblocking = false; /* connections always start in blocking mode */
conn->recvbuf = NULL;
return conn;
}
static WalProposerConnectPollStatusType
libpqprop_connect_poll(WalProposerConn* conn)
{
WalProposerConnectPollStatusType return_val;
switch (PQconnectPoll(conn->pg_conn))
{
case PGRES_POLLING_FAILED:
return_val = WP_CONN_POLLING_FAILED;
break;
case PGRES_POLLING_READING:
return_val = WP_CONN_POLLING_READING;
break;
case PGRES_POLLING_WRITING:
return_val = WP_CONN_POLLING_WRITING;
break;
case PGRES_POLLING_OK:
return_val = WP_CONN_POLLING_OK;
break;
/* There's a comment at its source about this constant being unused. We'll expect it's never
* returned. */
case PGRES_POLLING_ACTIVE:
elog(FATAL, "Unexpected PGRES_POLLING_ACTIVE returned from PQconnectPoll");
/* This return is never actually reached, but it's here to make the compiler happy */
return WP_CONN_POLLING_FAILED;
default:
Assert(false);
return_val = WP_CONN_POLLING_FAILED; /* keep the compiler quiet */
}
return return_val;
}
static bool
libpqprop_send_query(WalProposerConn* conn, char* query)
{
/* We need to be in blocking mode for sending the query to run without
* requiring a call to PQflush */
if (!ensure_nonblocking_status(conn, false))
return false;
/* PQsendQuery returns 1 on success, 0 on failure */
if (!PQsendQuery(conn->pg_conn, query))
return false;
return true;
}
static WalProposerExecStatusType
libpqprop_get_query_result(WalProposerConn* conn)
{
PGresult* result;
WalProposerExecStatusType return_val;
/* Marker variable if we need to log an unexpected success result */
char* unexpected_success = NULL;
/* Consume any input that we might be missing */
if (!PQconsumeInput(conn->pg_conn))
return WP_EXEC_FAILED;
if (PQisBusy(conn->pg_conn))
return WP_EXEC_NEEDS_INPUT;
result = PQgetResult(conn->pg_conn);
/* PQgetResult returns NULL only if getting the result was successful & there's no more of the
* result to get. */
if (!result)
{
elog(WARNING, "[libpqwalproposer] Unexpected successful end of command results");
return WP_EXEC_UNEXPECTED_SUCCESS;
}
/* Helper macro to reduce boilerplate */
#define UNEXPECTED_SUCCESS(msg) \
return_val = WP_EXEC_UNEXPECTED_SUCCESS; \
unexpected_success = msg; \
break;
switch (PQresultStatus(result))
{
/* "true" success case */
case PGRES_COPY_BOTH:
return_val = WP_EXEC_SUCCESS_COPYBOTH;
break;
/* Unexpected success case */
case PGRES_EMPTY_QUERY:
UNEXPECTED_SUCCESS("empty query return");
case PGRES_COMMAND_OK:
UNEXPECTED_SUCCESS("data-less command end");
case PGRES_TUPLES_OK:
UNEXPECTED_SUCCESS("tuples return");
case PGRES_COPY_OUT:
UNEXPECTED_SUCCESS("'Copy Out' response");
case PGRES_COPY_IN:
UNEXPECTED_SUCCESS("'Copy In' response");
case PGRES_SINGLE_TUPLE:
UNEXPECTED_SUCCESS("single tuple return");
case PGRES_PIPELINE_SYNC:
UNEXPECTED_SUCCESS("pipeline sync point");
/* Failure cases */
case PGRES_BAD_RESPONSE:
case PGRES_NONFATAL_ERROR:
case PGRES_FATAL_ERROR:
case PGRES_PIPELINE_ABORTED:
return_val = WP_EXEC_FAILED;
break;
default:
Assert(false);
return_val = WP_EXEC_FAILED; /* keep the compiler quiet */
}
if (unexpected_success)
elog(WARNING, "[libpqwalproposer] Unexpected successful %s", unexpected_success);
return return_val;
}
static pgsocket
libpqprop_socket(WalProposerConn* conn)
{
return PQsocket(conn->pg_conn);
}
static int
libpqprop_flush(WalProposerConn* conn)
{
return (PQflush(conn->pg_conn));
}
static void
libpqprop_finish(WalProposerConn* conn)
{
if (conn->recvbuf != NULL)
PQfreemem(conn->recvbuf);
PQfinish(conn->pg_conn);
pfree(conn);
}
/*
* Receive a message from the safekeeper.
*
* On success, the data is placed in *buf. It is valid until the next call
* to this function.
*/
static PGAsyncReadResult
libpqprop_async_read(WalProposerConn* conn, char** buf, int* amount)
{
int result;
if (conn->recvbuf != NULL)
{
PQfreemem(conn->recvbuf);
conn->recvbuf = NULL;
}
/* Call PQconsumeInput so that we have the data we need */
if (!PQconsumeInput(conn->pg_conn))
{
*amount = 0;
*buf = NULL;
return PG_ASYNC_READ_FAIL;
}
/* The docs for PQgetCopyData list the return values as:
* 0 if the copy is still in progress, but no "complete row" is
* available
* -1 if the copy is done
* -2 if an error occured
* (> 0) if it was successful; that value is the amount transferred.
*
* The protocol we use between walproposer and safekeeper means that we
* *usually* wouldn't expect to see that the copy is done, but this can
* sometimes be triggered by the server returning an ErrorResponse (which
* also happens to have the effect that the copy is done).
*/
switch (result = PQgetCopyData(conn->pg_conn, &conn->recvbuf, true))
{
case 0:
*amount = 0;
*buf = NULL;
return PG_ASYNC_READ_TRY_AGAIN;
case -1:
{
/*
* If we get -1, it's probably because of a server error; the
* safekeeper won't normally send a CopyDone message.
*
* We can check PQgetResult to make sure that the server failed;
* it'll always result in PGRES_FATAL_ERROR
*/
ExecStatusType status = PQresultStatus(PQgetResult(conn->pg_conn));
if (status != PGRES_FATAL_ERROR)
elog(FATAL, "unexpected result status %d after failed PQgetCopyData", status);
/* If there was actually an error, it'll be properly reported by
* calls to PQerrorMessage -- we don't have to do anything else */
*amount = 0;
*buf = NULL;
return PG_ASYNC_READ_FAIL;
}
case -2:
*amount = 0;
*buf = NULL;
return PG_ASYNC_READ_FAIL;
default:
/* Positive values indicate the size of the returned result */
*amount = result;
*buf = conn->recvbuf;
return PG_ASYNC_READ_SUCCESS;
}
}
static PGAsyncWriteResult
libpqprop_async_write(WalProposerConn* conn, void const* buf, size_t size)
{
int result;
/* If we aren't in non-blocking mode, switch to it. */
if (!ensure_nonblocking_status(conn, true))
return PG_ASYNC_WRITE_FAIL;
/* The docs for PQputcopyData list the return values as:
* 1 if the data was queued,
* 0 if it was not queued because of full buffers, or
* -1 if an error occured
*/
result = PQputCopyData(conn->pg_conn, buf, size);
/* We won't get a result of zero because walproposer always empties the
* connection's buffers before sending more */
Assert(result != 0);
switch (result)
{
case 1:
/* good -- continue */
break;
case -1:
return PG_ASYNC_WRITE_FAIL;
default:
elog(FATAL, "invalid return %d from PQputCopyData", result);
}
/* After queueing the data, we still need to flush to get it to send.
* This might take multiple tries, but we don't want to wait around
* until it's done.
*
* PQflush has the following returns (directly quoting the docs):
* 0 if sucessful,
* 1 if it was unable to send all the data in the send queue yet
* -1 if it failed for some reason
*/
switch (result = PQflush(conn->pg_conn)) {
case 0:
return PG_ASYNC_WRITE_SUCCESS;
case 1:
return PG_ASYNC_WRITE_TRY_FLUSH;
case -1:
return PG_ASYNC_WRITE_FAIL;
default:
elog(FATAL, "invalid return %d from PQflush", result);
}
}
static bool
libpqprop_blocking_write(WalProposerConn* conn, void const* buf, size_t size)
{
int result;
/* If we are in non-blocking mode, switch out of it. */
if (!ensure_nonblocking_status(conn, false))
return false;
/* Ths function is very similar to libpqprop_async_write. For more
* information, refer to the comments there */
if ((result = PQputCopyData(conn->pg_conn, buf, size)) == -1)
return false;
Assert(result == 1);
/* Because the connection is non-blocking, flushing returns 0 or -1 */
if ((result = PQflush(conn->pg_conn)) == -1)
return false;
Assert(result == 0);
return true;
}

17
pgxn/neon/neon--1.0.sql Normal file
View File

@@ -0,0 +1,17 @@
\echo Use "CREATE EXTENSION neon" to load this file. \quit
CREATE FUNCTION pg_cluster_size()
RETURNS bigint
AS 'MODULE_PATHNAME', 'pg_cluster_size'
LANGUAGE C STRICT
PARALLEL UNSAFE;
CREATE FUNCTION backpressure_lsns(
OUT received_lsn pg_lsn,
OUT disk_consistent_lsn pg_lsn,
OUT remote_consistent_lsn pg_lsn
)
RETURNS record
AS 'MODULE_PATHNAME', 'backpressure_lsns'
LANGUAGE C STRICT
PARALLEL UNSAFE;

82
pgxn/neon/neon.c Normal file
View File

@@ -0,0 +1,82 @@
/*-------------------------------------------------------------------------
*
* neon.c
* Utility functions to expose neon specific information to user
*
* IDENTIFICATION
* contrib/neon/neon.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "fmgr.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "catalog/pg_type.h"
#include "replication/walsender.h"
#include "funcapi.h"
#include "access/htup_details.h"
#include "utils/pg_lsn.h"
#include "utils/guc.h"
#include "neon.h"
#include "walproposer.h"
PG_MODULE_MAGIC;
void _PG_init(void);
void _PG_init(void)
{
pg_init_libpagestore();
pg_init_libpqwalproposer();
pg_init_walproposer();
EmitWarningsOnPlaceholders("neon");
}
PG_FUNCTION_INFO_V1(pg_cluster_size);
PG_FUNCTION_INFO_V1(backpressure_lsns);
Datum
pg_cluster_size(PG_FUNCTION_ARGS)
{
int64 size;
size = GetZenithCurrentClusterSize();
if (size == 0)
PG_RETURN_NULL();
PG_RETURN_INT64(size);
}
Datum
backpressure_lsns(PG_FUNCTION_ARGS)
{
XLogRecPtr writePtr;
XLogRecPtr flushPtr;
XLogRecPtr applyPtr;
Datum values[3];
bool nulls[3];
TupleDesc tupdesc;
replication_feedback_get_lsns(&writePtr, &flushPtr, &applyPtr);
tupdesc = CreateTemplateTupleDesc(3);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "received_lsn", PG_LSNOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "disk_consistent_lsn", PG_LSNOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "remote_consistent_lsn", PG_LSNOID, -1, 0);
tupdesc = BlessTupleDesc(tupdesc);
MemSet(nulls, 0, sizeof(nulls));
values[0] = LSNGetDatum(writePtr);
values[1] = LSNGetDatum(flushPtr);
values[2] = LSNGetDatum(applyPtr);
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}

4
pgxn/neon/neon.control Normal file
View File

@@ -0,0 +1,4 @@
# neon extension
comment = 'cloud storage for PostgreSQL'
default_version = '1.0'
module_pathname = '$libdir/neon'

19
pgxn/neon/neon.h Normal file
View File

@@ -0,0 +1,19 @@
/*-------------------------------------------------------------------------
*
* neon.h
* Functions used in the initialization of this extension.
*
* IDENTIFICATION
* contrib/neon/neon.h
*
*-------------------------------------------------------------------------
*/
#ifndef NEON_H
#define NEON_H
extern void pg_init_libpagestore(void);
extern void pg_init_libpqwalproposer(void);
extern void pg_init_walproposer(void);
#endif /* NEON_H */

View File

@@ -0,0 +1,221 @@
/*-------------------------------------------------------------------------
*
* pagestore_client.h
*
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* contrib/neon/pagestore_client.h
*
*-------------------------------------------------------------------------
*/
#ifndef pageserver_h
#define pageserver_h
#include "postgres.h"
#include "access/xlogdefs.h"
#include "storage/relfilenode.h"
#include "storage/block.h"
#include "storage/smgr.h"
#include "lib/stringinfo.h"
#include "libpq/pqformat.h"
#include "utils/memutils.h"
#include "pg_config.h"
typedef enum
{
/* pagestore_client -> pagestore */
T_ZenithExistsRequest = 0,
T_ZenithNblocksRequest,
T_ZenithGetPageRequest,
T_ZenithDbSizeRequest,
/* pagestore -> pagestore_client */
T_ZenithExistsResponse = 100,
T_ZenithNblocksResponse,
T_ZenithGetPageResponse,
T_ZenithErrorResponse,
T_ZenithDbSizeResponse,
} ZenithMessageTag;
/* base struct for c-style inheritance */
typedef struct
{
ZenithMessageTag tag;
} ZenithMessage;
#define messageTag(m) (((const ZenithMessage *)(m))->tag)
/*
* supertype of all the Zenith*Request structs below
*
* If 'latest' is true, we are requesting the latest page version, and 'lsn'
* is just a hint to the server that we know there are no versions of the page
* (or relation size, for exists/nblocks requests) later than the 'lsn'.
*/
typedef struct
{
ZenithMessageTag tag;
bool latest; /* if true, request latest page version */
XLogRecPtr lsn; /* request page version @ this LSN */
} ZenithRequest;
typedef struct
{
ZenithRequest req;
RelFileNode rnode;
ForkNumber forknum;
} ZenithExistsRequest;
typedef struct
{
ZenithRequest req;
RelFileNode rnode;
ForkNumber forknum;
} ZenithNblocksRequest;
typedef struct
{
ZenithRequest req;
Oid dbNode;
} ZenithDbSizeRequest;
typedef struct
{
ZenithRequest req;
RelFileNode rnode;
ForkNumber forknum;
BlockNumber blkno;
} ZenithGetPageRequest;
/* supertype of all the Zenith*Response structs below */
typedef struct
{
ZenithMessageTag tag;
} ZenithResponse;
typedef struct
{
ZenithMessageTag tag;
bool exists;
} ZenithExistsResponse;
typedef struct
{
ZenithMessageTag tag;
uint32 n_blocks;
} ZenithNblocksResponse;
typedef struct
{
ZenithMessageTag tag;
char page[FLEXIBLE_ARRAY_MEMBER];
} ZenithGetPageResponse;
typedef struct
{
ZenithMessageTag tag;
int64 db_size;
} ZenithDbSizeResponse;
typedef struct
{
ZenithMessageTag tag;
char message[FLEXIBLE_ARRAY_MEMBER]; /* null-terminated error message */
} ZenithErrorResponse;
extern StringInfoData zm_pack_request(ZenithRequest *msg);
extern ZenithResponse *zm_unpack_response(StringInfo s);
extern char *zm_to_string(ZenithMessage *msg);
/*
* API
*/
typedef struct
{
ZenithResponse *(*request) (ZenithRequest *request);
} page_server_api;
extern page_server_api *page_server;
extern char *page_server_connstring;
extern char *zenith_timeline;
extern char *zenith_tenant;
extern bool wal_redo;
extern int32 max_cluster_size;
extern const f_smgr *smgr_zenith(BackendId backend, RelFileNode rnode);
extern void smgr_init_zenith(void);
extern const f_smgr *smgr_inmem(BackendId backend, RelFileNode rnode);
extern void smgr_init_inmem(void);
extern void smgr_shutdown_inmem(void);
/* zenith storage manager functionality */
extern void zenith_init(void);
extern void zenith_open(SMgrRelation reln);
extern void zenith_close(SMgrRelation reln, ForkNumber forknum);
extern void zenith_create(SMgrRelation reln, ForkNumber forknum, bool isRedo);
extern bool zenith_exists(SMgrRelation reln, ForkNumber forknum);
extern void zenith_unlink(RelFileNodeBackend rnode, ForkNumber forknum, bool isRedo);
extern void zenith_extend(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer, bool skipFsync);
extern bool zenith_prefetch(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum);
extern void zenith_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
char *buffer);
extern void zenith_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
XLogRecPtr request_lsn, bool request_latest, char *buffer);
extern void zenith_write(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer, bool skipFsync);
extern void zenith_writeback(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, BlockNumber nblocks);
extern BlockNumber zenith_nblocks(SMgrRelation reln, ForkNumber forknum);
extern const int64 zenith_dbsize(Oid dbNode);
extern void zenith_truncate(SMgrRelation reln, ForkNumber forknum,
BlockNumber nblocks);
extern void zenith_immedsync(SMgrRelation reln, ForkNumber forknum);
/* zenith wal-redo storage manager functionality */
extern void inmem_init(void);
extern void inmem_open(SMgrRelation reln);
extern void inmem_close(SMgrRelation reln, ForkNumber forknum);
extern void inmem_create(SMgrRelation reln, ForkNumber forknum, bool isRedo);
extern bool inmem_exists(SMgrRelation reln, ForkNumber forknum);
extern void inmem_unlink(RelFileNodeBackend rnode, ForkNumber forknum, bool isRedo);
extern void inmem_extend(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer, bool skipFsync);
extern bool inmem_prefetch(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum);
extern void inmem_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
char *buffer);
extern void inmem_write(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer, bool skipFsync);
extern void inmem_writeback(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, BlockNumber nblocks);
extern BlockNumber inmem_nblocks(SMgrRelation reln, ForkNumber forknum);
extern void inmem_truncate(SMgrRelation reln, ForkNumber forknum,
BlockNumber nblocks);
extern void inmem_immedsync(SMgrRelation reln, ForkNumber forknum);
/* utils for zenith relsize cache */
extern void relsize_hash_init(void);
extern bool get_cached_relsize(RelFileNode rnode, ForkNumber forknum, BlockNumber* size);
extern void set_cached_relsize(RelFileNode rnode, ForkNumber forknum, BlockNumber size);
extern void update_cached_relsize(RelFileNode rnode, ForkNumber forknum, BlockNumber size);
extern void forget_cached_relsize(RelFileNode rnode, ForkNumber forknum);
#endif

1696
pgxn/neon/pagestore_smgr.c Normal file

File diff suppressed because it is too large Load Diff

167
pgxn/neon/relsize_cache.c Normal file
View File

@@ -0,0 +1,167 @@
/*-------------------------------------------------------------------------
*
* relsize_cache.c
* Relation size cache for better zentih performance.
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* contrib/neon/relsize_cache.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "pagestore_client.h"
#include "storage/relfilenode.h"
#include "storage/smgr.h"
#include "storage/lwlock.h"
#include "storage/ipc.h"
#include "storage/shmem.h"
#include "catalog/pg_tablespace_d.h"
#include "utils/dynahash.h"
#include "utils/guc.h"
typedef struct
{
RelFileNode rnode;
ForkNumber forknum;
} RelTag;
typedef struct
{
RelTag tag;
BlockNumber size;
} RelSizeEntry;
static HTAB *relsize_hash;
static LWLockId relsize_lock;
static int relsize_hash_size;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
/*
* Size of a cache entry is 20 bytes. So this default will take about 1.2 MB,
* which seems reasonable.
*/
#define DEFAULT_RELSIZE_HASH_SIZE (64 * 1024)
static void
zenith_smgr_shmem_startup(void)
{
static HASHCTL info;
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
relsize_lock = (LWLockId) GetNamedLWLockTranche("neon_relsize");
info.keysize = sizeof(RelTag);
info.entrysize = sizeof(RelSizeEntry);
relsize_hash = ShmemInitHash("neon_relsize",
relsize_hash_size, relsize_hash_size,
&info,
HASH_ELEM | HASH_BLOBS);
LWLockRelease(AddinShmemInitLock);
}
bool
get_cached_relsize(RelFileNode rnode, ForkNumber forknum, BlockNumber *size)
{
bool found = false;
if (relsize_hash_size > 0)
{
RelTag tag;
RelSizeEntry *entry;
tag.rnode = rnode;
tag.forknum = forknum;
LWLockAcquire(relsize_lock, LW_SHARED);
entry = hash_search(relsize_hash, &tag, HASH_FIND, NULL);
if (entry != NULL)
{
*size = entry->size;
found = true;
}
LWLockRelease(relsize_lock);
}
return found;
}
void
set_cached_relsize(RelFileNode rnode, ForkNumber forknum, BlockNumber size)
{
if (relsize_hash_size > 0)
{
RelTag tag;
RelSizeEntry *entry;
tag.rnode = rnode;
tag.forknum = forknum;
LWLockAcquire(relsize_lock, LW_EXCLUSIVE);
entry = hash_search(relsize_hash, &tag, HASH_ENTER, NULL);
entry->size = size;
LWLockRelease(relsize_lock);
}
}
void
update_cached_relsize(RelFileNode rnode, ForkNumber forknum, BlockNumber size)
{
if (relsize_hash_size > 0)
{
RelTag tag;
RelSizeEntry *entry;
bool found;
tag.rnode = rnode;
tag.forknum = forknum;
LWLockAcquire(relsize_lock, LW_EXCLUSIVE);
entry = hash_search(relsize_hash, &tag, HASH_ENTER, &found);
if (!found || entry->size < size)
entry->size = size;
LWLockRelease(relsize_lock);
}
}
void
forget_cached_relsize(RelFileNode rnode, ForkNumber forknum)
{
if (relsize_hash_size > 0)
{
RelTag tag;
tag.rnode = rnode;
tag.forknum = forknum;
LWLockAcquire(relsize_lock, LW_EXCLUSIVE);
hash_search(relsize_hash, &tag, HASH_REMOVE, NULL);
LWLockRelease(relsize_lock);
}
}
void
relsize_hash_init(void)
{
DefineCustomIntVariable("neon.relsize_hash_size",
"Sets the maximum number of cached relation sizes for neon",
NULL,
&relsize_hash_size,
DEFAULT_RELSIZE_HASH_SIZE,
0,
INT_MAX,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
if (relsize_hash_size > 0)
{
RequestAddinShmemSpace(hash_estimate_size(relsize_hash_size, sizeof(RelSizeEntry)));
RequestNamedLWLockTranche("neon_relsize", 1);
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = zenith_smgr_shmem_startup;
}
}

2403
pgxn/neon/walproposer.c Normal file

File diff suppressed because it is too large Load Diff

540
pgxn/neon/walproposer.h Normal file
View File

@@ -0,0 +1,540 @@
#ifndef __NEON_WALPROPOSER_H__
#define __NEON_WALPROPOSER_H__
#include "access/xlogdefs.h"
#include "postgres.h"
#include "port.h"
#include "access/xlog_internal.h"
#include "access/transam.h"
#include "nodes/replnodes.h"
#include "utils/uuid.h"
#include "replication/walreceiver.h"
#define SK_MAGIC 0xCafeCeefu
#define SK_PROTOCOL_VERSION 2
#define MAX_SAFEKEEPERS 32
#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) /* max size of a single WAL message */
#define XLOG_HDR_SIZE (1+8*3) /* 'w' + startPos + walEnd + timestamp */
#define XLOG_HDR_START_POS 1 /* offset of start position in wal sender message header */
#define XLOG_HDR_END_POS (1+8) /* offset of end position in wal sender message header */
/*
* In the spirit of WL_SOCKET_READABLE and others, this corresponds to no events having occured,
* because all WL_* events are given flags equal to some (1 << i), starting from i = 0
*/
#define WL_NO_EVENTS 0
extern char* wal_acceptors_list;
extern int wal_acceptor_reconnect_timeout;
extern int wal_acceptor_connect_timeout;
extern bool am_wal_proposer;
struct WalProposerConn; /* Defined in libpqwalproposer */
typedef struct WalProposerConn WalProposerConn;
struct WalMessage;
typedef struct WalMessage WalMessage;
extern char *zenith_timeline_walproposer;
extern char *zenith_tenant_walproposer;
/* Possible return values from ReadPGAsync */
typedef enum
{
/* The full read was successful. buf now points to the data */
PG_ASYNC_READ_SUCCESS,
/* The read is ongoing. Wait until the connection is read-ready, then try
* again. */
PG_ASYNC_READ_TRY_AGAIN,
/* Reading failed. Check PQerrorMessage(conn) */
PG_ASYNC_READ_FAIL,
} PGAsyncReadResult;
/* Possible return values from WritePGAsync */
typedef enum
{
/* The write fully completed */
PG_ASYNC_WRITE_SUCCESS,
/* The write started, but you'll need to call PQflush some more times
* to finish it off. We just tried, so it's best to wait until the
* connection is read- or write-ready to try again.
*
* If it becomes read-ready, call PQconsumeInput and flush again. If it
* becomes write-ready, just call PQflush.
*/
PG_ASYNC_WRITE_TRY_FLUSH,
/* Writing failed. Check PQerrorMessage(conn) */
PG_ASYNC_WRITE_FAIL,
} PGAsyncWriteResult;
/*
* WAL safekeeper state, which is used to wait for some event.
*
* States are listed here in the order that they're executed.
*
* Most states, upon failure, will move back to SS_OFFLINE by calls to
* ResetConnection or ShutdownConnection.
*/
typedef enum
{
/*
* Does not have an active connection and will stay that way until
* further notice.
*
* Moves to SS_CONNECTING_WRITE by calls to ResetConnection.
*/
SS_OFFLINE,
/*
* Connecting states. "_READ" waits for the socket to be available for
* reading, "_WRITE" waits for writing. There's no difference in the code
* they execute when polled, but we have this distinction in order to
* recreate the event set in HackyRemoveWalProposerEvent.
*
* After the connection is made, "START_WAL_PUSH" query is sent.
*/
SS_CONNECTING_WRITE,
SS_CONNECTING_READ,
/*
* Waiting for the result of the "START_WAL_PUSH" command.
*
* After we get a successful result, sends handshake to safekeeper.
*/
SS_WAIT_EXEC_RESULT,
/*
* Executing the receiving half of the handshake. After receiving, moves to
* SS_VOTING.
*/
SS_HANDSHAKE_RECV,
/*
* Waiting to participate in voting, but a quorum hasn't yet been reached.
* This is an idle state - we do not expect AdvancePollState to be called.
*
* Moved externally by execution of SS_HANDSHAKE_RECV, when we received a
* quorum of handshakes.
*/
SS_VOTING,
/*
* Already sent voting information, waiting to receive confirmation from the
* node. After receiving, moves to SS_IDLE, if the quorum isn't reached yet.
*/
SS_WAIT_VERDICT,
/* Need to flush ProposerElected message. */
SS_SEND_ELECTED_FLUSH,
/*
* Waiting for quorum to send WAL. Idle state. If the socket becomes
* read-ready, the connection has been closed.
*
* Moves to SS_ACTIVE only by call to StartStreaming.
*/
SS_IDLE,
/*
* Active phase, when we acquired quorum and have WAL to send or feedback
* to read.
*/
SS_ACTIVE,
} SafekeeperState;
/* Consensus logical timestamp. */
typedef uint64 term_t;
/* neon storage node id */
typedef uint64 NNodeId;
/*
* Proposer <-> Acceptor messaging.
*/
/* Initial Proposer -> Acceptor message */
typedef struct ProposerGreeting
{
uint64 tag; /* message tag */
uint32 protocolVersion; /* proposer-safekeeper protocol version */
uint32 pgVersion;
pg_uuid_t proposerId;
uint64 systemId; /* Postgres system identifier */
uint8 ztimelineid[16]; /* Zenith timeline id */
uint8 ztenantid[16];
TimeLineID timeline;
uint32 walSegSize;
} ProposerGreeting;
typedef struct AcceptorProposerMessage
{
uint64 tag;
} AcceptorProposerMessage;
/*
* Acceptor -> Proposer initial response: the highest term acceptor voted for.
*/
typedef struct AcceptorGreeting
{
AcceptorProposerMessage apm;
term_t term;
NNodeId nodeId;
} AcceptorGreeting;
/*
* Proposer -> Acceptor vote request.
*/
typedef struct VoteRequest
{
uint64 tag;
term_t term;
pg_uuid_t proposerId; /* for monitoring/debugging */
} VoteRequest;
/* Element of term switching chain. */
typedef struct TermSwitchEntry
{
term_t term;
XLogRecPtr lsn;
} TermSwitchEntry;
typedef struct TermHistory
{
uint32 n_entries;
TermSwitchEntry *entries;
} TermHistory;
/* Vote itself, sent from safekeeper to proposer */
typedef struct VoteResponse {
AcceptorProposerMessage apm;
term_t term;
uint64 voteGiven;
/*
* Safekeeper flush_lsn (end of WAL) + history of term switches allow
* proposer to choose the most advanced one.
*/
XLogRecPtr flushLsn;
XLogRecPtr truncateLsn; /* minimal LSN which may be needed for recovery of some safekeeper */
TermHistory termHistory;
XLogRecPtr timelineStartLsn; /* timeline globally starts at this LSN */
} VoteResponse;
/*
* Proposer -> Acceptor message announcing proposer is elected and communicating
* epoch history to it.
*/
typedef struct ProposerElected
{
uint64 tag;
term_t term;
/* proposer will send since this point */
XLogRecPtr startStreamingAt;
/* history of term switches up to this proposer */
TermHistory *termHistory;
/* timeline globally starts at this LSN */
XLogRecPtr timelineStartLsn;
} ProposerElected;
/*
* Header of request with WAL message sent from proposer to safekeeper.
*/
typedef struct AppendRequestHeader
{
uint64 tag;
term_t term; /* term of the proposer */
/*
* LSN since which current proposer appends WAL (begin_lsn of its first
* record); determines epoch switch point.
*/
XLogRecPtr epochStartLsn;
XLogRecPtr beginLsn; /* start position of message in WAL */
XLogRecPtr endLsn; /* end position of message in WAL */
XLogRecPtr commitLsn; /* LSN committed by quorum of safekeepers */
/*
* minimal LSN which may be needed for recovery of some safekeeper (end lsn
* + 1 of last chunk streamed to everyone)
*/
XLogRecPtr truncateLsn;
pg_uuid_t proposerId; /* for monitoring/debugging */
} AppendRequestHeader;
/*
* Hot standby feedback received from replica
*/
typedef struct HotStandbyFeedback
{
TimestampTz ts;
FullTransactionId xmin;
FullTransactionId catalog_xmin;
} HotStandbyFeedback;
typedef struct ReplicationFeedback
{
// current size of the timeline on pageserver
uint64 currentClusterSize;
// standby_status_update fields that safekeeper received from pageserver
XLogRecPtr ps_writelsn;
XLogRecPtr ps_flushlsn;
XLogRecPtr ps_applylsn;
TimestampTz ps_replytime;
} ReplicationFeedback;
typedef struct WalproposerShmemState
{
slock_t mutex;
ReplicationFeedback feedback;
term_t mineLastElectedTerm;
} WalproposerShmemState;
/*
* Report safekeeper state to proposer
*/
typedef struct AppendResponse
{
AcceptorProposerMessage apm;
/*
* Current term of the safekeeper; if it is higher than proposer's, the
* compute is out of date.
*/
term_t term;
// TODO: add comment
XLogRecPtr flushLsn;
// Safekeeper reports back his awareness about which WAL is committed, as
// this is a criterion for walproposer --sync mode exit
XLogRecPtr commitLsn;
HotStandbyFeedback hs;
// Feedback recieved from pageserver includes standby_status_update fields
// and custom zenith feedback.
// This part of the message is extensible.
ReplicationFeedback rf;
} AppendResponse;
// ReplicationFeedback is extensible part of the message that is parsed separately
// Other fields are fixed part
#define APPENDRESPONSE_FIXEDPART_SIZE offsetof(AppendResponse, rf)
/*
* Descriptor of safekeeper
*/
typedef struct Safekeeper
{
char const* host;
char const* port;
char conninfo[MAXCONNINFO]; /* connection info for connecting/reconnecting */
/*
* postgres protocol connection to the WAL acceptor
*
* Equals NULL only when state = SS_OFFLINE. Nonblocking is set once we
* reach SS_ACTIVE; not before.
*/
WalProposerConn* conn;
/*
* Temporary buffer for the message being sent to the safekeeper.
*/
StringInfoData outbuf;
/*
* WAL reader, allocated for each safekeeper.
*/
XLogReaderState* xlogreader;
/*
* Streaming will start here; must be record boundary.
*/
XLogRecPtr startStreamingAt;
bool flushWrite; /* set to true if we need to call AsyncFlush, to flush pending messages */
XLogRecPtr streamingAt; /* current streaming position */
AppendRequestHeader appendRequest; /* request for sending to safekeeper */
int eventPos; /* position in wait event set. Equal to -1 if no event */
SafekeeperState state; /* safekeeper state machine state */
TimestampTz startedConnAt; /* when connection attempt started */
AcceptorGreeting greetResponse; /* acceptor greeting */
VoteResponse voteResponse; /* the vote */
AppendResponse appendResponse; /* feedback for master */
} Safekeeper;
extern PGDLLIMPORT void WalProposerMain(Datum main_arg);
void WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos);
void WalProposerPoll(void);
void WalProposerRegister(void);
void ParseReplicationFeedbackMessage(StringInfo reply_message,
ReplicationFeedback *rf);
extern void StartProposerReplication(StartReplicationCmd *cmd);
Size WalproposerShmemSize(void);
bool WalproposerShmemInit(void);
void replication_feedback_set(ReplicationFeedback *rf);
void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);
/* libpqwalproposer hooks & helper type */
/* Re-exported PostgresPollingStatusType */
typedef enum
{
WP_CONN_POLLING_FAILED = 0,
WP_CONN_POLLING_READING,
WP_CONN_POLLING_WRITING,
WP_CONN_POLLING_OK,
/*
* 'libpq-fe.h' still has PGRES_POLLING_ACTIVE, but says it's unused.
* We've removed it here to avoid clutter.
*/
} WalProposerConnectPollStatusType;
/* Re-exported and modified ExecStatusType */
typedef enum
{
/* We received a single CopyBoth result */
WP_EXEC_SUCCESS_COPYBOTH,
/* Any success result other than a single CopyBoth was received. The specifics of the result
* were already logged, but it may be useful to provide an error message indicating which
* safekeeper messed up.
*
* Do not expect PQerrorMessage to be appropriately set. */
WP_EXEC_UNEXPECTED_SUCCESS,
/* No result available at this time. Wait until read-ready, then call again. Internally, this is
* returned when PQisBusy indicates that PQgetResult would block. */
WP_EXEC_NEEDS_INPUT,
/* Catch-all failure. Check PQerrorMessage. */
WP_EXEC_FAILED,
} WalProposerExecStatusType;
/* Re-exported ConnStatusType */
typedef enum
{
WP_CONNECTION_OK,
WP_CONNECTION_BAD,
/*
* The original ConnStatusType has many more tags, but requests that
* they not be relied upon (except for displaying to the user). We
* don't need that extra functionality, so we collect them into a
* single tag here.
*/
WP_CONNECTION_IN_PROGRESS,
} WalProposerConnStatusType;
/* Re-exported PQerrorMessage */
typedef char* (*walprop_error_message_fn) (WalProposerConn* conn);
/* Re-exported PQstatus */
typedef WalProposerConnStatusType (*walprop_status_fn) (WalProposerConn* conn);
/* Re-exported PQconnectStart */
typedef WalProposerConn* (*walprop_connect_start_fn) (char* conninfo);
/* Re-exported PQconectPoll */
typedef WalProposerConnectPollStatusType (*walprop_connect_poll_fn) (WalProposerConn* conn);
/* Blocking wrapper around PQsendQuery */
typedef bool (*walprop_send_query_fn) (WalProposerConn* conn, char* query);
/* Wrapper around PQconsumeInput + PQisBusy + PQgetResult */
typedef WalProposerExecStatusType (*walprop_get_query_result_fn) (WalProposerConn* conn);
/* Re-exported PQsocket */
typedef pgsocket (*walprop_socket_fn) (WalProposerConn* conn);
/* Wrapper around PQconsumeInput (if socket's read-ready) + PQflush */
typedef int (*walprop_flush_fn) (WalProposerConn* conn);
/* Re-exported PQfinish */
typedef void (*walprop_finish_fn) (WalProposerConn* conn);
/*
* Ergonomic wrapper around PGgetCopyData
*
* Reads a CopyData block from a safekeeper, setting *amount to the number
* of bytes returned.
*
* This function is allowed to assume certain properties specific to the
* protocol with the safekeepers, so it should not be used as-is for any
* other purpose.
*
* Note: If possible, using <AsyncRead> is generally preferred, because it
* performs a bit of extra checking work that's always required and is normally
* somewhat verbose.
*/
typedef PGAsyncReadResult (*walprop_async_read_fn) (WalProposerConn* conn,
char** buf,
int* amount);
/*
* Ergonomic wrapper around PQputCopyData + PQflush
*
* Starts to write a CopyData block to a safekeeper.
*
* For information on the meaning of return codes, refer to PGAsyncWriteResult.
*/
typedef PGAsyncWriteResult (*walprop_async_write_fn) (WalProposerConn* conn,
void const* buf,
size_t size);
/*
* Blocking equivalent to walprop_async_write_fn
*
* Returns 'true' if successful, 'false' on failure.
*/
typedef bool (*walprop_blocking_write_fn) (WalProposerConn* conn, void const* buf, size_t size);
/* All libpqwalproposer exported functions collected together. */
typedef struct WalProposerFunctionsType
{
walprop_error_message_fn walprop_error_message;
walprop_status_fn walprop_status;
walprop_connect_start_fn walprop_connect_start;
walprop_connect_poll_fn walprop_connect_poll;
walprop_send_query_fn walprop_send_query;
walprop_get_query_result_fn walprop_get_query_result;
walprop_socket_fn walprop_socket;
walprop_flush_fn walprop_flush;
walprop_finish_fn walprop_finish;
walprop_async_read_fn walprop_async_read;
walprop_async_write_fn walprop_async_write;
walprop_blocking_write_fn walprop_blocking_write;
} WalProposerFunctionsType;
/* Allow the above functions to be "called" with normal syntax */
#define walprop_error_message(conn) \
WalProposerFunctions->walprop_error_message(conn)
#define walprop_status(conn) \
WalProposerFunctions->walprop_status(conn)
#define walprop_connect_start(conninfo) \
WalProposerFunctions->walprop_connect_start(conninfo)
#define walprop_connect_poll(conn) \
WalProposerFunctions->walprop_connect_poll(conn)
#define walprop_send_query(conn, query) \
WalProposerFunctions->walprop_send_query(conn, query)
#define walprop_get_query_result(conn) \
WalProposerFunctions->walprop_get_query_result(conn)
#define walprop_set_nonblocking(conn, arg) \
WalProposerFunctions->walprop_set_nonblocking(conn, arg)
#define walprop_socket(conn) \
WalProposerFunctions->walprop_socket(conn)
#define walprop_flush(conn) \
WalProposerFunctions->walprop_flush(conn)
#define walprop_finish(conn) \
WalProposerFunctions->walprop_finish(conn)
#define walprop_async_read(conn, buf, amount) \
WalProposerFunctions->walprop_async_read(conn, buf, amount)
#define walprop_async_write(conn, buf, size) \
WalProposerFunctions->walprop_async_write(conn, buf, size)
#define walprop_blocking_write(conn, buf, size) \
WalProposerFunctions->walprop_blocking_write(conn, buf, size)
/*
* The runtime location of the libpqwalproposer functions.
*
* This pointer is set by the initializer in libpqwalproposer, so that we
* can use it later.
*/
extern PGDLLIMPORT WalProposerFunctionsType *WalProposerFunctions;
#endif /* __NEON_WALPROPOSER_H__ */

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,19 @@
#ifndef __NEON_WALPROPOSER_UTILS_H__
#define __NEON_WALPROPOSER_UTILS_H__
#include "walproposer.h"
int CompareLsn(const void *a, const void *b);
char* FormatSafekeeperState(SafekeeperState state);
void AssertEventsOkForState(uint32 events, Safekeeper* sk);
uint32 SafekeeperStateDesiredEvents(SafekeeperState state);
char* FormatEvents(uint32 events);
bool HexDecodeString(uint8 *result, char *input, int nbytes);
uint32 pq_getmsgint32_le(StringInfo msg);
uint64 pq_getmsgint64_le(StringInfo msg);
void pq_sendint32_le(StringInfo buf, uint32 i);
void pq_sendint64_le(StringInfo buf, uint64 i);
void XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr);
void XLogWalPropClose(XLogRecPtr recptr);
#endif /* __NEON_WALPROPOSER_UTILS_H__ */

View File

@@ -0,0 +1,15 @@
# pgxs/neon_test_utils/Makefile
MODULE_big = neon_test_utils
OBJS = \
$(WIN32RES) \
neontest.o
EXTENSION = neon_test_utils
DATA = neon_test_utils--1.0.sql
PGFILEDESC = "neon_test_utils - helpers for neon testing and debugging"
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)

View File

@@ -0,0 +1,29 @@
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION neon_test_utils" to load this file. \quit
CREATE FUNCTION test_consume_xids(nxids int)
RETURNS VOID
AS 'MODULE_PATHNAME', 'test_consume_xids'
LANGUAGE C STRICT
PARALLEL UNSAFE;
CREATE FUNCTION clear_buffer_cache()
RETURNS VOID
AS 'MODULE_PATHNAME', 'clear_buffer_cache'
LANGUAGE C STRICT
PARALLEL UNSAFE;
CREATE FUNCTION get_raw_page_at_lsn(relname text, forkname text, blocknum int8, lsn pg_lsn)
RETURNS bytea
AS 'MODULE_PATHNAME', 'get_raw_page_at_lsn'
LANGUAGE C PARALLEL UNSAFE;
CREATE FUNCTION get_raw_page_at_lsn(tbspc oid, db oid, relfilenode oid, forknum int8, blocknum int8, lsn pg_lsn)
RETURNS bytea
AS 'MODULE_PATHNAME', 'get_raw_page_at_lsn_ex'
LANGUAGE C PARALLEL UNSAFE;
CREATE FUNCTION neon_xlogflush(lsn pg_lsn)
RETURNS VOID
AS 'MODULE_PATHNAME', 'neon_xlogflush'
LANGUAGE C PARALLEL UNSAFE;

View File

@@ -0,0 +1,5 @@
# neon_test_utils extension
comment = 'helpers for neon testing and debugging'
default_version = '1.0'
module_pathname = '$libdir/neon_test_utils'
relocatable = true

View File

@@ -0,0 +1,304 @@
/*-------------------------------------------------------------------------
*
* neontest.c
* Helpers for neon testing and debugging
*
* IDENTIFICATION
* contrib/neon_test_utils/neontest.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/relation.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "catalog/namespace.h"
#include "fmgr.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/varlena.h"
#include "../neon/pagestore_client.h"
PG_MODULE_MAGIC;
extern void _PG_init(void);
PG_FUNCTION_INFO_V1(test_consume_xids);
PG_FUNCTION_INFO_V1(clear_buffer_cache);
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn);
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn_ex);
PG_FUNCTION_INFO_V1(neon_xlogflush);
/*
* Linkage to functions in zenith module.
* The signature here would need to be updated whenever function parameters change in pagestore_smgr.c
*/
typedef void (*zenith_read_at_lsn_type)(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
XLogRecPtr request_lsn, bool request_latest, char *buffer);
static zenith_read_at_lsn_type zenith_read_at_lsn_ptr;
/*
* Module initialize function: fetch function pointers for cross-module calls.
*/
void
_PG_init(void)
{
/* Asserts verify that typedefs above match original declarations */
AssertVariableIsOfType(&zenith_read_at_lsn, zenith_read_at_lsn_type);
zenith_read_at_lsn_ptr = (zenith_read_at_lsn_type)
load_external_function("$libdir/neon", "zenith_read_at_lsn",
true, NULL);
}
#define zenith_read_at_lsn zenith_read_at_lsn_ptr
/*
* test_consume_xids(int4), for rapidly consuming XIDs, to test wraparound.
*/
Datum
test_consume_xids(PG_FUNCTION_ARGS)
{
int32 nxids = PG_GETARG_INT32(0);
TransactionId topxid;
FullTransactionId fullxid;
TransactionId xid;
TransactionId targetxid;
/* make sure we have a top-XID first */
topxid = GetTopTransactionId();
xid = ReadNextTransactionId();
targetxid = xid + nxids;
while (targetxid < FirstNormalTransactionId)
targetxid++;
while (TransactionIdPrecedes(xid, targetxid))
{
fullxid = GetNewTransactionId(true);
xid = XidFromFullTransactionId(fullxid);
elog(DEBUG1, "topxid: %u xid: %u", topxid, xid);
}
PG_RETURN_VOID();
}
/*
* Flush the buffer cache, evicting all pages that are not currently pinned.
*/
Datum
clear_buffer_cache(PG_FUNCTION_ARGS)
{
bool save_zenith_test_evict;
/*
* Temporarily set the zenith_test_evict GUC, so that when we pin and
* unpin a buffer, the buffer is evicted. We use that hack to evict all
* buffers, as there is no explicit "evict this buffer" function in the
* buffer manager.
*/
save_zenith_test_evict = zenith_test_evict;
zenith_test_evict = true;
PG_TRY();
{
/* Scan through all the buffers */
for (int i = 0; i < NBuffers; i++)
{
BufferDesc *bufHdr;
uint32 buf_state;
Buffer bufferid;
bool isvalid;
RelFileNode rnode;
ForkNumber forknum;
BlockNumber blocknum;
/* Peek into the buffer header to see what page it holds. */
bufHdr = GetBufferDescriptor(i);
buf_state = LockBufHdr(bufHdr);
if ((buf_state & BM_VALID) && (buf_state & BM_TAG_VALID))
isvalid = true;
else
isvalid = false;
bufferid = BufferDescriptorGetBuffer(bufHdr);
rnode = bufHdr->tag.rnode;
forknum = bufHdr->tag.forkNum;
blocknum = bufHdr->tag.blockNum;
UnlockBufHdr(bufHdr, buf_state);
/*
* Pin the buffer, and release it again. Because we have
* zenith_test_evict==true, this will evict the page from
* the buffer cache if no one else is holding a pin on it.
*/
if (isvalid)
{
if (ReadRecentBuffer(rnode, forknum, blocknum, bufferid))
ReleaseBuffer(bufferid);
}
}
}
PG_FINALLY();
{
/* restore the GUC */
zenith_test_evict = save_zenith_test_evict;
}
PG_END_TRY();
PG_RETURN_VOID();
}
/*
* Reads the page from page server without buffer cache
* usage mimics get_raw_page() in pageinspect, but offers reading versions at specific LSN
* NULL read lsn will result in reading the latest version.
*
* Note: reading latest version will result in waiting for latest changes to reach the page server,
* if this is undesirable, use pageinspect' get_raw_page that uses buffered access to the latest page
*/
Datum
get_raw_page_at_lsn(PG_FUNCTION_ARGS)
{
bytea *raw_page;
ForkNumber forknum;
RangeVar *relrv;
Relation rel;
char *raw_page_data;
text *relname;
text *forkname;
uint32 blkno;
bool request_latest = PG_ARGISNULL(3);
uint64 read_lsn = request_latest ? GetXLogInsertRecPtr() : PG_GETARG_INT64(3);
if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2))
PG_RETURN_NULL();
relname = PG_GETARG_TEXT_PP(0);
forkname = PG_GETARG_TEXT_PP(1);
blkno = PG_GETARG_UINT32(2);
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to use raw page functions")));
relrv = makeRangeVarFromNameList(textToQualifiedNameList(relname));
rel = relation_openrv(relrv, AccessShareLock);
/* Check that this relation has storage */
if (rel->rd_rel->relkind == RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot get raw page from view \"%s\"",
RelationGetRelationName(rel))));
if (rel->rd_rel->relkind == RELKIND_COMPOSITE_TYPE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot get raw page from composite type \"%s\"",
RelationGetRelationName(rel))));
if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot get raw page from foreign table \"%s\"",
RelationGetRelationName(rel))));
if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot get raw page from partitioned table \"%s\"",
RelationGetRelationName(rel))));
if (rel->rd_rel->relkind == RELKIND_PARTITIONED_INDEX)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot get raw page from partitioned index \"%s\"",
RelationGetRelationName(rel))));
/*
* Reject attempts to read non-local temporary relations; we would be
* likely to get wrong data since we have no visibility into the owning
* session's local buffers.
*/
if (RELATION_IS_OTHER_TEMP(rel))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot access temporary tables of other sessions")));
forknum = forkname_to_number(text_to_cstring(forkname));
/* Initialize buffer to copy to */
raw_page = (bytea *) palloc(BLCKSZ + VARHDRSZ);
SET_VARSIZE(raw_page, BLCKSZ + VARHDRSZ);
raw_page_data = VARDATA(raw_page);
zenith_read_at_lsn(rel->rd_node, forknum, blkno, read_lsn, request_latest, raw_page_data);
relation_close(rel, AccessShareLock);
PG_RETURN_BYTEA_P(raw_page);
}
/*
* Another option to read a relation page from page server without cache
* this version doesn't validate input and allows reading blocks of dropped relations
*
* Note: reading latest version will result in waiting for latest changes to reach the page server,
* if this is undesirable, use pageinspect' get_raw_page that uses buffered access to the latest page
*/
Datum
get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS)
{
char *raw_page_data;
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to use raw page functions")));
if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2) ||
PG_ARGISNULL(3) || PG_ARGISNULL(4))
PG_RETURN_NULL();
{
RelFileNode rnode = {
.spcNode = PG_GETARG_OID(0),
.dbNode = PG_GETARG_OID(1),
.relNode = PG_GETARG_OID(2)
};
ForkNumber forknum = PG_GETARG_UINT32(3);
uint32 blkno = PG_GETARG_UINT32(4);
bool request_latest = PG_ARGISNULL(5);
uint64 read_lsn = request_latest ? GetXLogInsertRecPtr() : PG_GETARG_INT64(5);
/* Initialize buffer to copy to */
bytea *raw_page = (bytea *) palloc(BLCKSZ + VARHDRSZ);
SET_VARSIZE(raw_page, BLCKSZ + VARHDRSZ);
raw_page_data = VARDATA(raw_page);
zenith_read_at_lsn(rnode, forknum, blkno, read_lsn, request_latest, raw_page_data);
PG_RETURN_BYTEA_P(raw_page);
}
}
/*
* Directly calls XLogFlush(lsn) to flush WAL buffers.
*/
Datum
neon_xlogflush(PG_FUNCTION_ARGS)
{
XLogRecPtr lsn = PG_GETARG_LSN(0);
XLogFlush(lsn);
PG_RETURN_VOID();
}