mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-05 09:20:38 +00:00
Compare commits
1 Commits
split-prox
...
seqscan-pe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d6ec0456e8 |
@@ -10,6 +10,9 @@ EXTENSION = neon_test_utils
|
|||||||
DATA = neon_test_utils--1.0.sql
|
DATA = neon_test_utils--1.0.sql
|
||||||
PGFILEDESC = "neon_test_utils - helpers for neon testing and debugging"
|
PGFILEDESC = "neon_test_utils - helpers for neon testing and debugging"
|
||||||
|
|
||||||
|
PG_CPPFLAGS = -I$(libpq_srcdir)
|
||||||
|
SHLIB_LINK_INTERNAL = $(libpq)
|
||||||
|
|
||||||
PG_CONFIG = pg_config
|
PG_CONFIG = pg_config
|
||||||
PGXS := $(shell $(PG_CONFIG) --pgxs)
|
PGXS := $(shell $(PG_CONFIG) --pgxs)
|
||||||
include $(PGXS)
|
include $(PGXS)
|
||||||
|
|||||||
@@ -23,6 +23,11 @@ RETURNS bytea
|
|||||||
AS 'MODULE_PATHNAME', 'get_raw_page_at_lsn_ex'
|
AS 'MODULE_PATHNAME', 'get_raw_page_at_lsn_ex'
|
||||||
LANGUAGE C PARALLEL UNSAFE;
|
LANGUAGE C PARALLEL UNSAFE;
|
||||||
|
|
||||||
|
CREATE FUNCTION neon_seqscan_rel(rel regclass, nprefetch int DEFAULT 0)
|
||||||
|
RETURNS void
|
||||||
|
AS 'MODULE_PATHNAME', 'neon_seqscan_rel'
|
||||||
|
LANGUAGE C PARALLEL UNSAFE;
|
||||||
|
|
||||||
CREATE FUNCTION neon_xlogflush(lsn pg_lsn)
|
CREATE FUNCTION neon_xlogflush(lsn pg_lsn)
|
||||||
RETURNS VOID
|
RETURNS VOID
|
||||||
AS 'MODULE_PATHNAME', 'neon_xlogflush'
|
AS 'MODULE_PATHNAME', 'neon_xlogflush'
|
||||||
|
|||||||
@@ -23,8 +23,13 @@
|
|||||||
#include "utils/pg_lsn.h"
|
#include "utils/pg_lsn.h"
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
#include "utils/varlena.h"
|
#include "utils/varlena.h"
|
||||||
|
#include "utils/wait_event.h"
|
||||||
#include "../neon/pagestore_client.h"
|
#include "../neon/pagestore_client.h"
|
||||||
|
|
||||||
|
#include "libpq-fe.h"
|
||||||
|
#include "libpq/pqformat.h"
|
||||||
|
#include "libpq/libpq.h"
|
||||||
|
|
||||||
PG_MODULE_MAGIC;
|
PG_MODULE_MAGIC;
|
||||||
|
|
||||||
extern void _PG_init(void);
|
extern void _PG_init(void);
|
||||||
@@ -34,6 +39,7 @@ PG_FUNCTION_INFO_V1(clear_buffer_cache);
|
|||||||
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn);
|
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn);
|
||||||
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn_ex);
|
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn_ex);
|
||||||
PG_FUNCTION_INFO_V1(neon_xlogflush);
|
PG_FUNCTION_INFO_V1(neon_xlogflush);
|
||||||
|
PG_FUNCTION_INFO_V1(neon_seqscan_rel);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Linkage to functions in neon module.
|
* Linkage to functions in neon module.
|
||||||
@@ -289,6 +295,238 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* A wrapper around PQgetCopyData that checks for interrupts while sleeping.
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
call_PQgetCopyData(PGconn *conn, char **buffer)
|
||||||
|
{
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
retry:
|
||||||
|
ret = PQgetCopyData(conn, buffer, 1 /* async */ );
|
||||||
|
|
||||||
|
if (ret == 0)
|
||||||
|
{
|
||||||
|
int wc;
|
||||||
|
|
||||||
|
/* Sleep until there's something to do */
|
||||||
|
wc = WaitLatchOrSocket(MyLatch,
|
||||||
|
WL_LATCH_SET | WL_SOCKET_READABLE |
|
||||||
|
WL_EXIT_ON_PM_DEATH,
|
||||||
|
PQsocket(conn),
|
||||||
|
-1L, PG_WAIT_EXTENSION);
|
||||||
|
ResetLatch(MyLatch);
|
||||||
|
|
||||||
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
|
||||||
|
/* Data available in socket? */
|
||||||
|
if (wc & WL_SOCKET_READABLE)
|
||||||
|
{
|
||||||
|
if (!PQconsumeInput(conn))
|
||||||
|
elog(ERROR, "could not get response from pageserver: %s",
|
||||||
|
PQerrorMessage(conn));
|
||||||
|
}
|
||||||
|
|
||||||
|
goto retry;
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void send_getpage_request(PGconn *pageserver_conn, RelFileNode rnode, BlockNumber blkno, XLogRecPtr lsn);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Fetch all pages of given relation. This simulates a sequential scan
|
||||||
|
* over the table. You can specify the number of blocks to prefetch;
|
||||||
|
* the function will try to keep that many requests "in flight" at all
|
||||||
|
* times. The fetched pages are simply discarded.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
neon_seqscan_rel(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
Oid relid = PG_GETARG_OID(0);
|
||||||
|
Oid nprefetch = PG_GETARG_INT32(1);
|
||||||
|
Relation rel;
|
||||||
|
char *raw_page_data;
|
||||||
|
BlockNumber nblocks;
|
||||||
|
PGconn *pageserver_conn;
|
||||||
|
XLogRecPtr read_lsn;
|
||||||
|
|
||||||
|
if (!superuser())
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||||
|
errmsg("must be superuser to use raw page functions")));
|
||||||
|
|
||||||
|
rel = relation_open(relid, AccessShareLock);
|
||||||
|
|
||||||
|
nblocks = RelationGetNumberOfBlocks(rel);
|
||||||
|
|
||||||
|
pageserver_conn = PQconnectdb(page_server_connstring);
|
||||||
|
if (PQstatus(pageserver_conn) == CONNECTION_BAD)
|
||||||
|
{
|
||||||
|
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||||
|
|
||||||
|
PQfinish(pageserver_conn);
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
|
||||||
|
errmsg("could not establish connection to pageserver"),
|
||||||
|
errdetail_internal("%s", msg)));
|
||||||
|
}
|
||||||
|
PG_TRY();
|
||||||
|
{
|
||||||
|
char *query;
|
||||||
|
int ret;
|
||||||
|
StringInfoData resp_buff;
|
||||||
|
|
||||||
|
read_lsn = GetXLogInsertRecPtr();
|
||||||
|
|
||||||
|
query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
|
||||||
|
ret = PQsendQuery(pageserver_conn, query);
|
||||||
|
if (ret != 1)
|
||||||
|
{
|
||||||
|
PQfinish(pageserver_conn);
|
||||||
|
pageserver_conn = NULL;
|
||||||
|
elog(ERROR, "could not send pagestream command to pageserver");
|
||||||
|
}
|
||||||
|
|
||||||
|
while (PQisBusy(pageserver_conn))
|
||||||
|
{
|
||||||
|
int wc;
|
||||||
|
|
||||||
|
/* Sleep until there's something to do */
|
||||||
|
wc = WaitLatchOrSocket(MyLatch,
|
||||||
|
WL_LATCH_SET | WL_SOCKET_READABLE |
|
||||||
|
WL_EXIT_ON_PM_DEATH,
|
||||||
|
PQsocket(pageserver_conn),
|
||||||
|
-1L, PG_WAIT_EXTENSION);
|
||||||
|
ResetLatch(MyLatch);
|
||||||
|
|
||||||
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
|
||||||
|
/* Data available in socket? */
|
||||||
|
if (wc & WL_SOCKET_READABLE)
|
||||||
|
{
|
||||||
|
if (!PQconsumeInput(pageserver_conn))
|
||||||
|
{
|
||||||
|
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||||
|
|
||||||
|
PQfinish(pageserver_conn);
|
||||||
|
pageserver_conn = NULL;
|
||||||
|
|
||||||
|
elog(ERROR, "could not complete handshake with pageserver: %s",
|
||||||
|
msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
elog(INFO, "scanning %u blocks, prefetch %u", nblocks, nprefetch);
|
||||||
|
|
||||||
|
BlockNumber nsent = 0;
|
||||||
|
for (BlockNumber blkno = 0; blkno < nblocks; blkno++)
|
||||||
|
{
|
||||||
|
NeonGetPageRequest request = {
|
||||||
|
.req.tag = T_NeonGetPageRequest,
|
||||||
|
.req.latest = true,
|
||||||
|
.req.lsn = read_lsn,
|
||||||
|
.rnode = rel->rd_node,
|
||||||
|
.forknum = MAIN_FORKNUM,
|
||||||
|
.blkno = blkno
|
||||||
|
};
|
||||||
|
NeonResponse *resp;
|
||||||
|
|
||||||
|
if (blkno % 1024 == 0)
|
||||||
|
elog(INFO, "blk %u/%u", blkno, nblocks);
|
||||||
|
|
||||||
|
if (nsent < blkno + nprefetch + 1 && nsent < nblocks)
|
||||||
|
{
|
||||||
|
while (nsent < blkno + nprefetch + 1 && nsent < nblocks)
|
||||||
|
send_getpage_request(pageserver_conn, rel->rd_node, nsent++, read_lsn);
|
||||||
|
|
||||||
|
if (PQflush(pageserver_conn))
|
||||||
|
{
|
||||||
|
char *msg = PQerrorMessage(pageserver_conn);
|
||||||
|
|
||||||
|
elog(ERROR, "failed to flush page requests: %s", msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* read response */
|
||||||
|
resp_buff.len = call_PQgetCopyData(pageserver_conn, &resp_buff.data);
|
||||||
|
resp_buff.cursor = 0;
|
||||||
|
|
||||||
|
if (resp_buff.len < 0)
|
||||||
|
{
|
||||||
|
if (resp_buff.len == -1)
|
||||||
|
elog(ERROR, "end of COPY");
|
||||||
|
else if (resp_buff.len == -2)
|
||||||
|
elog(ERROR, "could not read COPY data: %s", PQerrorMessage(pageserver_conn));
|
||||||
|
}
|
||||||
|
resp = nm_unpack_response(&resp_buff);
|
||||||
|
|
||||||
|
switch (resp->tag)
|
||||||
|
{
|
||||||
|
case T_NeonGetPageResponse:
|
||||||
|
/* ok */
|
||||||
|
break;
|
||||||
|
|
||||||
|
case T_NeonErrorResponse:
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_IO_ERROR),
|
||||||
|
errmsg("could not read block %u", blkno),
|
||||||
|
errdetail("page server returned error: %s",
|
||||||
|
((NeonErrorResponse *) resp)->message)));
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
|
||||||
|
}
|
||||||
|
|
||||||
|
PQfreemem(resp_buff.data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
PG_CATCH();
|
||||||
|
{
|
||||||
|
PQfinish(pageserver_conn);
|
||||||
|
PG_RE_THROW();
|
||||||
|
}
|
||||||
|
PG_END_TRY();
|
||||||
|
|
||||||
|
relation_close(rel, AccessShareLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
send_getpage_request(PGconn *pageserver_conn, RelFileNode rnode, BlockNumber blkno, XLogRecPtr lsn)
|
||||||
|
{
|
||||||
|
NeonGetPageRequest request = {
|
||||||
|
.req.tag = T_NeonGetPageRequest,
|
||||||
|
.req.latest = true,
|
||||||
|
.req.lsn = lsn,
|
||||||
|
.rnode = rnode,
|
||||||
|
.forknum = MAIN_FORKNUM,
|
||||||
|
.blkno = blkno
|
||||||
|
};
|
||||||
|
StringInfoData req_buff;
|
||||||
|
|
||||||
|
req_buff = nm_pack_request(&request.req);
|
||||||
|
/*
|
||||||
|
* Send request.
|
||||||
|
*
|
||||||
|
* In principle, this could block if the output buffer is full, and we
|
||||||
|
* should use async mode and check for interrupts while waiting. In
|
||||||
|
* practice, our requests are small enough to always fit in the output and
|
||||||
|
* TCP buffer.
|
||||||
|
*/
|
||||||
|
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
|
||||||
|
{
|
||||||
|
char *msg = PQerrorMessage(pageserver_conn);
|
||||||
|
|
||||||
|
elog(ERROR, "failed to send page request: %s", msg);
|
||||||
|
}
|
||||||
|
pfree(req_buff.data);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Directly calls XLogFlush(lsn) to flush WAL buffers.
|
* Directly calls XLogFlush(lsn) to flush WAL buffers.
|
||||||
*/
|
*/
|
||||||
|
|||||||
Reference in New Issue
Block a user