diff --git a/pgxn/neon_test_utils/Makefile b/pgxn/neon_test_utils/Makefile index 9c774ec185..4aeb85bf4e 100644 --- a/pgxn/neon_test_utils/Makefile +++ b/pgxn/neon_test_utils/Makefile @@ -10,6 +10,9 @@ EXTENSION = neon_test_utils DATA = neon_test_utils--1.0.sql PGFILEDESC = "neon_test_utils - helpers for neon testing and debugging" +PG_CPPFLAGS = -I$(libpq_srcdir) +SHLIB_LINK_INTERNAL = $(libpq) + PG_CONFIG = pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) include $(PGXS) diff --git a/pgxn/neon_test_utils/neon_test_utils--1.0.sql b/pgxn/neon_test_utils/neon_test_utils--1.0.sql index 402981a9a6..843c4e5ab7 100644 --- a/pgxn/neon_test_utils/neon_test_utils--1.0.sql +++ b/pgxn/neon_test_utils/neon_test_utils--1.0.sql @@ -23,6 +23,11 @@ RETURNS bytea AS 'MODULE_PATHNAME', 'get_raw_page_at_lsn_ex' LANGUAGE C PARALLEL UNSAFE; +CREATE FUNCTION neon_seqscan_rel(rel regclass, nprefetch int DEFAULT 0) +RETURNS void +AS 'MODULE_PATHNAME', 'neon_seqscan_rel' +LANGUAGE C PARALLEL UNSAFE; + CREATE FUNCTION neon_xlogflush(lsn pg_lsn) RETURNS VOID AS 'MODULE_PATHNAME', 'neon_xlogflush' diff --git a/pgxn/neon_test_utils/neontest.c b/pgxn/neon_test_utils/neontest.c index e0cea4177b..6abe30d22b 100644 --- a/pgxn/neon_test_utils/neontest.c +++ b/pgxn/neon_test_utils/neontest.c @@ -23,8 +23,13 @@ #include "utils/pg_lsn.h" #include "utils/rel.h" #include "utils/varlena.h" +#include "utils/wait_event.h" #include "../neon/pagestore_client.h" +#include "libpq-fe.h" +#include "libpq/pqformat.h" +#include "libpq/libpq.h" + PG_MODULE_MAGIC; extern void _PG_init(void); @@ -34,6 +39,7 @@ PG_FUNCTION_INFO_V1(clear_buffer_cache); PG_FUNCTION_INFO_V1(get_raw_page_at_lsn); PG_FUNCTION_INFO_V1(get_raw_page_at_lsn_ex); PG_FUNCTION_INFO_V1(neon_xlogflush); +PG_FUNCTION_INFO_V1(neon_seqscan_rel); /* * Linkage to functions in neon module. @@ -289,6 +295,238 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS) } } + +/* + * A wrapper around PQgetCopyData that checks for interrupts while sleeping. + */ +static int +call_PQgetCopyData(PGconn *conn, char **buffer) +{ + int ret; + +retry: + ret = PQgetCopyData(conn, buffer, 1 /* async */ ); + + if (ret == 0) + { + int wc; + + /* Sleep until there's something to do */ + wc = WaitLatchOrSocket(MyLatch, + WL_LATCH_SET | WL_SOCKET_READABLE | + WL_EXIT_ON_PM_DEATH, + PQsocket(conn), + -1L, PG_WAIT_EXTENSION); + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + + /* Data available in socket? */ + if (wc & WL_SOCKET_READABLE) + { + if (!PQconsumeInput(conn)) + elog(ERROR, "could not get response from pageserver: %s", + PQerrorMessage(conn)); + } + + goto retry; + } + + return ret; +} + +static void send_getpage_request(PGconn *pageserver_conn, RelFileNode rnode, BlockNumber blkno, XLogRecPtr lsn); + +/* + * Fetch all pages of given relation. This simulates a sequential scan + * over the table. You can specify the number of blocks to prefetch; + * the function will try to keep that many requests "in flight" at all + * times. The fetched pages are simply discarded. + */ +Datum +neon_seqscan_rel(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + Oid nprefetch = PG_GETARG_INT32(1); + Relation rel; + char *raw_page_data; + BlockNumber nblocks; + PGconn *pageserver_conn; + XLogRecPtr read_lsn; + + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to use raw page functions"))); + + rel = relation_open(relid, AccessShareLock); + + nblocks = RelationGetNumberOfBlocks(rel); + + pageserver_conn = PQconnectdb(page_server_connstring); + if (PQstatus(pageserver_conn) == CONNECTION_BAD) + { + char *msg = pchomp(PQerrorMessage(pageserver_conn)); + + PQfinish(pageserver_conn); + ereport(ERROR, + (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), + errmsg("could not establish connection to pageserver"), + errdetail_internal("%s", msg))); + } + PG_TRY(); + { + char *query; + int ret; + StringInfoData resp_buff; + + read_lsn = GetXLogInsertRecPtr(); + + query = psprintf("pagestream %s %s", neon_tenant, neon_timeline); + ret = PQsendQuery(pageserver_conn, query); + if (ret != 1) + { + PQfinish(pageserver_conn); + pageserver_conn = NULL; + elog(ERROR, "could not send pagestream command to pageserver"); + } + + while (PQisBusy(pageserver_conn)) + { + int wc; + + /* Sleep until there's something to do */ + wc = WaitLatchOrSocket(MyLatch, + WL_LATCH_SET | WL_SOCKET_READABLE | + WL_EXIT_ON_PM_DEATH, + PQsocket(pageserver_conn), + -1L, PG_WAIT_EXTENSION); + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + + /* Data available in socket? */ + if (wc & WL_SOCKET_READABLE) + { + if (!PQconsumeInput(pageserver_conn)) + { + char *msg = pchomp(PQerrorMessage(pageserver_conn)); + + PQfinish(pageserver_conn); + pageserver_conn = NULL; + + elog(ERROR, "could not complete handshake with pageserver: %s", + msg); + } + } + } + + elog(INFO, "scanning %u blocks, prefetch %u", nblocks, nprefetch); + + BlockNumber nsent = 0; + for (BlockNumber blkno = 0; blkno < nblocks; blkno++) + { + NeonGetPageRequest request = { + .req.tag = T_NeonGetPageRequest, + .req.latest = true, + .req.lsn = read_lsn, + .rnode = rel->rd_node, + .forknum = MAIN_FORKNUM, + .blkno = blkno + }; + NeonResponse *resp; + + if (blkno % 1024 == 0) + elog(INFO, "blk %u/%u", blkno, nblocks); + + if (nsent < blkno + nprefetch + 1 && nsent < nblocks) + { + while (nsent < blkno + nprefetch + 1 && nsent < nblocks) + send_getpage_request(pageserver_conn, rel->rd_node, nsent++, read_lsn); + + if (PQflush(pageserver_conn)) + { + char *msg = PQerrorMessage(pageserver_conn); + + elog(ERROR, "failed to flush page requests: %s", msg); + } + } + + /* read response */ + resp_buff.len = call_PQgetCopyData(pageserver_conn, &resp_buff.data); + resp_buff.cursor = 0; + + if (resp_buff.len < 0) + { + if (resp_buff.len == -1) + elog(ERROR, "end of COPY"); + else if (resp_buff.len == -2) + elog(ERROR, "could not read COPY data: %s", PQerrorMessage(pageserver_conn)); + } + resp = nm_unpack_response(&resp_buff); + + switch (resp->tag) + { + case T_NeonGetPageResponse: + /* ok */ + break; + + case T_NeonErrorResponse: + ereport(ERROR, + (errcode(ERRCODE_IO_ERROR), + errmsg("could not read block %u", blkno), + errdetail("page server returned error: %s", + ((NeonErrorResponse *) resp)->message))); + break; + + default: + elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag); + } + + PQfreemem(resp_buff.data); + } + } + PG_CATCH(); + { + PQfinish(pageserver_conn); + PG_RE_THROW(); + } + PG_END_TRY(); + + relation_close(rel, AccessShareLock); +} + +static void +send_getpage_request(PGconn *pageserver_conn, RelFileNode rnode, BlockNumber blkno, XLogRecPtr lsn) +{ + NeonGetPageRequest request = { + .req.tag = T_NeonGetPageRequest, + .req.latest = true, + .req.lsn = lsn, + .rnode = rnode, + .forknum = MAIN_FORKNUM, + .blkno = blkno + }; + StringInfoData req_buff; + + req_buff = nm_pack_request(&request.req); + /* + * Send request. + * + * In principle, this could block if the output buffer is full, and we + * should use async mode and check for interrupts while waiting. In + * practice, our requests are small enough to always fit in the output and + * TCP buffer. + */ + if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0) + { + char *msg = PQerrorMessage(pageserver_conn); + + elog(ERROR, "failed to send page request: %s", msg); + } + pfree(req_buff.data); +} + /* * Directly calls XLogFlush(lsn) to flush WAL buffers. */