mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-03 13:30:38 +00:00
Add 'neon_seqscan_rel' test function, to test sequential scan performance.
Usage:
postgres=# \timing
Timing is on.
postgres=# select neon_seqscan_rel('pgbench_accounts', 1000);
INFO: scanning 491804 blocks, prefetch 1000
INFO: blk 0/491804
INFO: blk 1024/491804
INFO: blk 2048/491804
INFO: blk 3072/491804
...
INFO: blk 489472/491804
INFO: blk 490496/491804
INFO: blk 491520/491804
neon_seqscan_rel
------------------
(1 row)
Time: 57517.979 ms (00:57.518)
The second argument to the function is the number of pages to prefetch.
Note: the prefetching in this function works differently from the
prefetching we have for sequential scans in 'main'. After receiving the
result for a block, it immediately sends the request for the next page,
it doesn't send them in batches like 'main' does.
This commit is contained in:
@@ -10,6 +10,9 @@ EXTENSION = neon_test_utils
|
||||
DATA = neon_test_utils--1.0.sql
|
||||
PGFILEDESC = "neon_test_utils - helpers for neon testing and debugging"
|
||||
|
||||
PG_CPPFLAGS = -I$(libpq_srcdir)
|
||||
SHLIB_LINK_INTERNAL = $(libpq)
|
||||
|
||||
PG_CONFIG = pg_config
|
||||
PGXS := $(shell $(PG_CONFIG) --pgxs)
|
||||
include $(PGXS)
|
||||
|
||||
@@ -23,6 +23,11 @@ RETURNS bytea
|
||||
AS 'MODULE_PATHNAME', 'get_raw_page_at_lsn_ex'
|
||||
LANGUAGE C PARALLEL UNSAFE;
|
||||
|
||||
CREATE FUNCTION neon_seqscan_rel(rel regclass, nprefetch int DEFAULT 0)
|
||||
RETURNS void
|
||||
AS 'MODULE_PATHNAME', 'neon_seqscan_rel'
|
||||
LANGUAGE C PARALLEL UNSAFE;
|
||||
|
||||
CREATE FUNCTION neon_xlogflush(lsn pg_lsn)
|
||||
RETURNS VOID
|
||||
AS 'MODULE_PATHNAME', 'neon_xlogflush'
|
||||
|
||||
@@ -23,8 +23,13 @@
|
||||
#include "utils/pg_lsn.h"
|
||||
#include "utils/rel.h"
|
||||
#include "utils/varlena.h"
|
||||
#include "utils/wait_event.h"
|
||||
#include "../neon/pagestore_client.h"
|
||||
|
||||
#include "libpq-fe.h"
|
||||
#include "libpq/pqformat.h"
|
||||
#include "libpq/libpq.h"
|
||||
|
||||
PG_MODULE_MAGIC;
|
||||
|
||||
extern void _PG_init(void);
|
||||
@@ -34,6 +39,7 @@ PG_FUNCTION_INFO_V1(clear_buffer_cache);
|
||||
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn);
|
||||
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn_ex);
|
||||
PG_FUNCTION_INFO_V1(neon_xlogflush);
|
||||
PG_FUNCTION_INFO_V1(neon_seqscan_rel);
|
||||
|
||||
/*
|
||||
* Linkage to functions in neon module.
|
||||
@@ -289,6 +295,238 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* A wrapper around PQgetCopyData that checks for interrupts while sleeping.
|
||||
*/
|
||||
static int
|
||||
call_PQgetCopyData(PGconn *conn, char **buffer)
|
||||
{
|
||||
int ret;
|
||||
|
||||
retry:
|
||||
ret = PQgetCopyData(conn, buffer, 1 /* async */ );
|
||||
|
||||
if (ret == 0)
|
||||
{
|
||||
int wc;
|
||||
|
||||
/* Sleep until there's something to do */
|
||||
wc = WaitLatchOrSocket(MyLatch,
|
||||
WL_LATCH_SET | WL_SOCKET_READABLE |
|
||||
WL_EXIT_ON_PM_DEATH,
|
||||
PQsocket(conn),
|
||||
-1L, PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/* Data available in socket? */
|
||||
if (wc & WL_SOCKET_READABLE)
|
||||
{
|
||||
if (!PQconsumeInput(conn))
|
||||
elog(ERROR, "could not get response from pageserver: %s",
|
||||
PQerrorMessage(conn));
|
||||
}
|
||||
|
||||
goto retry;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void send_getpage_request(PGconn *pageserver_conn, RelFileNode rnode, BlockNumber blkno, XLogRecPtr lsn);
|
||||
|
||||
/*
|
||||
* Fetch all pages of given relation. This simulates a sequential scan
|
||||
* over the table. You can specify the number of blocks to prefetch;
|
||||
* the function will try to keep that many requests "in flight" at all
|
||||
* times. The fetched pages are simply discarded.
|
||||
*/
|
||||
Datum
|
||||
neon_seqscan_rel(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid relid = PG_GETARG_OID(0);
|
||||
Oid nprefetch = PG_GETARG_INT32(1);
|
||||
Relation rel;
|
||||
char *raw_page_data;
|
||||
BlockNumber nblocks;
|
||||
PGconn *pageserver_conn;
|
||||
XLogRecPtr read_lsn;
|
||||
|
||||
if (!superuser())
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
errmsg("must be superuser to use raw page functions")));
|
||||
|
||||
rel = relation_open(relid, AccessShareLock);
|
||||
|
||||
nblocks = RelationGetNumberOfBlocks(rel);
|
||||
|
||||
pageserver_conn = PQconnectdb(page_server_connstring);
|
||||
if (PQstatus(pageserver_conn) == CONNECTION_BAD)
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
|
||||
PQfinish(pageserver_conn);
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
|
||||
errmsg("could not establish connection to pageserver"),
|
||||
errdetail_internal("%s", msg)));
|
||||
}
|
||||
PG_TRY();
|
||||
{
|
||||
char *query;
|
||||
int ret;
|
||||
StringInfoData resp_buff;
|
||||
|
||||
read_lsn = GetXLogInsertRecPtr();
|
||||
|
||||
query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
|
||||
ret = PQsendQuery(pageserver_conn, query);
|
||||
if (ret != 1)
|
||||
{
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
elog(ERROR, "could not send pagestream command to pageserver");
|
||||
}
|
||||
|
||||
while (PQisBusy(pageserver_conn))
|
||||
{
|
||||
int wc;
|
||||
|
||||
/* Sleep until there's something to do */
|
||||
wc = WaitLatchOrSocket(MyLatch,
|
||||
WL_LATCH_SET | WL_SOCKET_READABLE |
|
||||
WL_EXIT_ON_PM_DEATH,
|
||||
PQsocket(pageserver_conn),
|
||||
-1L, PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/* Data available in socket? */
|
||||
if (wc & WL_SOCKET_READABLE)
|
||||
{
|
||||
if (!PQconsumeInput(pageserver_conn))
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
|
||||
elog(ERROR, "could not complete handshake with pageserver: %s",
|
||||
msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
elog(INFO, "scanning %u blocks, prefetch %u", nblocks, nprefetch);
|
||||
|
||||
BlockNumber nsent = 0;
|
||||
for (BlockNumber blkno = 0; blkno < nblocks; blkno++)
|
||||
{
|
||||
NeonGetPageRequest request = {
|
||||
.req.tag = T_NeonGetPageRequest,
|
||||
.req.latest = true,
|
||||
.req.lsn = read_lsn,
|
||||
.rnode = rel->rd_node,
|
||||
.forknum = MAIN_FORKNUM,
|
||||
.blkno = blkno
|
||||
};
|
||||
NeonResponse *resp;
|
||||
|
||||
if (blkno % 1024 == 0)
|
||||
elog(INFO, "blk %u/%u", blkno, nblocks);
|
||||
|
||||
if (nsent < blkno + nprefetch + 1 && nsent < nblocks)
|
||||
{
|
||||
while (nsent < blkno + nprefetch + 1 && nsent < nblocks)
|
||||
send_getpage_request(pageserver_conn, rel->rd_node, nsent++, read_lsn);
|
||||
|
||||
if (PQflush(pageserver_conn))
|
||||
{
|
||||
char *msg = PQerrorMessage(pageserver_conn);
|
||||
|
||||
elog(ERROR, "failed to flush page requests: %s", msg);
|
||||
}
|
||||
}
|
||||
|
||||
/* read response */
|
||||
resp_buff.len = call_PQgetCopyData(pageserver_conn, &resp_buff.data);
|
||||
resp_buff.cursor = 0;
|
||||
|
||||
if (resp_buff.len < 0)
|
||||
{
|
||||
if (resp_buff.len == -1)
|
||||
elog(ERROR, "end of COPY");
|
||||
else if (resp_buff.len == -2)
|
||||
elog(ERROR, "could not read COPY data: %s", PQerrorMessage(pageserver_conn));
|
||||
}
|
||||
resp = nm_unpack_response(&resp_buff);
|
||||
|
||||
switch (resp->tag)
|
||||
{
|
||||
case T_NeonGetPageResponse:
|
||||
/* ok */
|
||||
break;
|
||||
|
||||
case T_NeonErrorResponse:
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_IO_ERROR),
|
||||
errmsg("could not read block %u", blkno),
|
||||
errdetail("page server returned error: %s",
|
||||
((NeonErrorResponse *) resp)->message)));
|
||||
break;
|
||||
|
||||
default:
|
||||
elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
|
||||
}
|
||||
|
||||
PQfreemem(resp_buff.data);
|
||||
}
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
PQfinish(pageserver_conn);
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
relation_close(rel, AccessShareLock);
|
||||
}
|
||||
|
||||
static void
|
||||
send_getpage_request(PGconn *pageserver_conn, RelFileNode rnode, BlockNumber blkno, XLogRecPtr lsn)
|
||||
{
|
||||
NeonGetPageRequest request = {
|
||||
.req.tag = T_NeonGetPageRequest,
|
||||
.req.latest = true,
|
||||
.req.lsn = lsn,
|
||||
.rnode = rnode,
|
||||
.forknum = MAIN_FORKNUM,
|
||||
.blkno = blkno
|
||||
};
|
||||
StringInfoData req_buff;
|
||||
|
||||
req_buff = nm_pack_request(&request.req);
|
||||
/*
|
||||
* Send request.
|
||||
*
|
||||
* In principle, this could block if the output buffer is full, and we
|
||||
* should use async mode and check for interrupts while waiting. In
|
||||
* practice, our requests are small enough to always fit in the output and
|
||||
* TCP buffer.
|
||||
*/
|
||||
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
|
||||
{
|
||||
char *msg = PQerrorMessage(pageserver_conn);
|
||||
|
||||
elog(ERROR, "failed to send page request: %s", msg);
|
||||
}
|
||||
pfree(req_buff.data);
|
||||
}
|
||||
|
||||
/*
|
||||
* Directly calls XLogFlush(lsn) to flush WAL buffers.
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user