Add vendor/postgres-v16 and v16 support in pgxn/neon

This commit is contained in:
Anastasia Lubennikova
2023-07-17 17:51:27 +03:00
parent 13bd44a1f0
commit 12f0c0ec8f
11 changed files with 408 additions and 152 deletions

View File

@@ -25,7 +25,11 @@
#include "pagestore_client.h"
#include "access/parallel.h"
#include "postmaster/bgworker.h"
#if PG_VERSION_NUM >= 160000
#include "storage/relfilelocator.h"
#else
#include "storage/relfilenode.h"
#endif
#include "storage/buf_internals.h"
#include "storage/latch.h"
#include "storage/ipc.h"
@@ -39,6 +43,7 @@
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
/*
* Local file cache is used to temporary store relations pages in local file system.
* All blocks of all relations are stored inside one file and addressed using shared hash map.
@@ -360,9 +365,12 @@ lfc_cache_contains(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno)
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
return false;
tag.rnode = rnode;
tag.forkNum = forkNum;
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1);
#if PG_VERSION_NUM >= 160000
InitBufferTag(&tag, &rnode, forkNum, (blkno & ~(BLOCKS_PER_CHUNK-1)));
#else
INIT_BUFFERTAG(tag, rnode, forkNum, (blkno & ~(BLOCKS_PER_CHUNK-1)));
#endif
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_SHARED);
@@ -387,7 +395,11 @@ lfc_evict(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno)
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
return;
#if PG_VERSION_NUM >= 160000
InitBufferTag(&tag, &rnode, forkNum, (blkno & ~(BLOCKS_PER_CHUNK-1)));
#else
INIT_BUFFERTAG(tag, rnode, forkNum, (blkno & ~(BLOCKS_PER_CHUNK-1)));
#endif
hash = get_hash_value(lfc_hash, &tag);
@@ -457,10 +469,12 @@ lfc_read(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
return false;
#if PG_VERSION_NUM >= 160000
InitBufferTag(&tag, &rnode, forkNum, (blkno & ~(BLOCKS_PER_CHUNK-1)));
#else
INIT_BUFFERTAG(tag, rnode, forkNum, (blkno & ~(BLOCKS_PER_CHUNK-1)));
#endif
tag.rnode = rnode;
tag.forkNum = forkNum;
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1);
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
@@ -526,9 +540,12 @@ lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
return;
tag.rnode = rnode;
tag.forkNum = forkNum;
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1);
#if PG_VERSION_NUM >= 160000
InitBufferTag(&tag, &rnode, forkNum, (blkno & ~(BLOCKS_PER_CHUNK-1)));
#else
INIT_BUFFERTAG(tag, rnode, forkNum, (blkno & ~(BLOCKS_PER_CHUNK-1)));
#endif
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
@@ -722,9 +739,16 @@ local_cache_pages(PG_FUNCTION_ARGS)
if (entry->bitmap[i >> 5] & (1 << (i & 31)))
{
fctx->record[n_pages].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i;
#if PG_VERSION_NUM >= 160000
fctx->record[n_pages].relfilenode = entry->key.relNumber;
fctx->record[n_pages].reltablespace = entry->key.spcOid;
fctx->record[n_pages].reldatabase = entry->key.dbOid;
#else
fctx->record[n_pages].relfilenode = entry->key.rnode.relNode;
fctx->record[n_pages].reltablespace = entry->key.rnode.spcNode;
fctx->record[n_pages].reldatabase = entry->key.rnode.dbNode;
#endif
fctx->record[n_pages].forknum = entry->key.forkNum;
fctx->record[n_pages].blocknum = entry->key.blockNum + i;
fctx->record[n_pages].accesscount = entry->access_count;

View File

@@ -16,7 +16,11 @@
#include "postgres.h"
#include "access/xlogdefs.h"
#if PG_VERSION_NUM >= 160000
#include "storage/relfilelocator.h"
#else
#include "storage/relfilenode.h"
#endif
#include "storage/block.h"
#include "storage/smgr.h"
#include "lib/stringinfo.h"
@@ -25,6 +29,34 @@
#include "pg_config.h"
// This is a hack to avoid too many ifdefs in the function definitions.
#if PG_VERSION_NUM >= 160000
typedef RelFileLocator RelFileNode;
typedef RelFileLocatorBackend RelFileNodeBackend;
#define RelFileNodeBackendIsTemp RelFileLocatorBackendIsTemp
#endif
#if PG_VERSION_NUM >= 160000
#define RelnGetRnode(reln) (reln->smgr_rlocator.locator)
#define RnodeGetSpcOid(rnode) (rnode.spcOid)
#define RnodeGetDbOid(rnode) (rnode.dbOid)
#define RnodeGetRelNumber(rnode) (rnode.relNumber)
#define BufTagGetRnode(tag) (BufTagGetRelFileLocator(&tag))
#else
#define RelnGetRnode(reln) (reln->smgr_rnode.node)
#define RnodeGetSpcOid(rnode) (rnode.spcNode)
#define RnodeGetDbOid(rnode) (rnode.dbNode)
#define RnodeGetRelNumber(rnode) (rnode.relNode)
#define BufTagGetRnode(tag) (tag.rnode)
#endif
#define RelnGetSpcOid(reln) (RnodeGetRelNumber(RelnGetRnode(reln)))
#define RelnGetDbOid(reln) (RnodeGetDbOid(RelnGetRnode(reln)))
#define RelnGetRelNumber(reln) (RnodeGetRelNumber(RelnGetRnode(reln)))
typedef enum
{
/* pagestore_client -> pagestore */
@@ -85,7 +117,7 @@ typedef struct
typedef struct
{
NeonRequest req;
Oid dbNode;
Oid dbOid;
} NeonDbSizeRequest;
typedef struct

View File

@@ -58,7 +58,11 @@
#include "postmaster/autovacuum.h"
#include "replication/walsender.h"
#include "storage/bufmgr.h"
#if PG_VERSION_NUM >= 160000
#include "storage/relfilelocator.h"
#else
#include "storage/relfilenode.h"
#endif
#include "storage/buf_internals.h"
#include "storage/smgr.h"
#include "storage/md.h"
@@ -70,6 +74,8 @@
#include "access/xlogrecovery.h"
#endif
/*
* If DEBUG_COMPARE_LOCAL is defined, we pass through all the SMGR API
* calls to md.c, and *also* do the calls to the Page Server. On every
@@ -86,7 +92,10 @@
static char *hexdump_page(char *page);
#endif
#define IS_LOCAL_REL(reln) (reln->smgr_rnode.node.dbNode != 0 && reln->smgr_rnode.node.relNode > FirstNormalObjectId)
#define IS_LOCAL_REL(reln) (RelnGetDbOid(reln) != 0 && RelnGetRelNumber(reln) > FirstNormalObjectId)
const int SmgrTrace = DEBUG5;
@@ -184,7 +193,13 @@ typedef struct PrfHashEntry {
sizeof(BufferTag) \
)
#if PG_VERSION_NUM >= 160000
#define SH_EQUAL(tb, a, b) (BufferTagsEqual(&((a)->buftag),&((b)->buftag)))
#else
#define SH_EQUAL(tb, a, b) (BUFFERTAGS_EQUAL((a)->buftag, (b)->buftag))
#endif
#define SH_SCOPE static inline
#define SH_DEFINE
#define SH_DECLARE
@@ -634,7 +649,7 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
.req.tag = T_NeonGetPageRequest,
.req.latest = false,
.req.lsn = 0,
.rnode = slot->buftag.rnode,
.rnode = BufTagGetRnode(slot->buftag),
.forknum = slot->buftag.forkNum,
.blkno = slot->buftag.blockNum,
};
@@ -649,7 +664,7 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
{
XLogRecPtr lsn = neon_get_request_lsn(
&request.req.latest,
slot->buftag.rnode,
BufTagGetRnode(slot->buftag),
slot->buftag.forkNum,
slot->buftag.blockNum
);
@@ -729,8 +744,11 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
Assert(slot->status != PRFS_UNUSED);
Assert(MyPState->ring_last <= ring_index &&
ring_index < MyPState->ring_unused);
#if PG_VERSION_NUM >= 160000
Assert(BufferTagsEqual(&slot->buftag, &tag));
#else
Assert(BUFFERTAGS_EQUAL(slot->buftag, tag));
#endif
/*
* If we want a specific lsn, we do not accept requests that were made
* with a potentially different LSN.
@@ -893,9 +911,9 @@ nm_pack_request(NeonRequest * msg)
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, msg_req->rnode.spcNode);
pq_sendint32(&s, msg_req->rnode.dbNode);
pq_sendint32(&s, msg_req->rnode.relNode);
pq_sendint32(&s, RnodeGetSpcOid(msg_req->rnode));
pq_sendint32(&s, RnodeGetDbOid(msg_req->rnode));
pq_sendint32(&s, RnodeGetRelNumber(msg_req->rnode));
pq_sendbyte(&s, msg_req->forknum);
break;
@@ -906,9 +924,9 @@ nm_pack_request(NeonRequest * msg)
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, msg_req->rnode.spcNode);
pq_sendint32(&s, msg_req->rnode.dbNode);
pq_sendint32(&s, msg_req->rnode.relNode);
pq_sendint32(&s, RnodeGetSpcOid(msg_req->rnode));
pq_sendint32(&s, RnodeGetDbOid(msg_req->rnode));
pq_sendint32(&s, RnodeGetRelNumber(msg_req->rnode));
pq_sendbyte(&s, msg_req->forknum);
break;
@@ -919,7 +937,7 @@ nm_pack_request(NeonRequest * msg)
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, msg_req->dbNode);
pq_sendint32(&s, msg_req->dbOid);
break;
}
@@ -929,9 +947,9 @@ nm_pack_request(NeonRequest * msg)
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, msg_req->rnode.spcNode);
pq_sendint32(&s, msg_req->rnode.dbNode);
pq_sendint32(&s, msg_req->rnode.relNode);
pq_sendint32(&s, RnodeGetSpcOid(msg_req->rnode));
pq_sendint32(&s, RnodeGetDbOid(msg_req->rnode));
pq_sendint32(&s, RnodeGetRelNumber(msg_req->rnode));
pq_sendbyte(&s, msg_req->forknum);
pq_sendint32(&s, msg_req->blkno);
@@ -1064,9 +1082,9 @@ nm_to_string(NeonMessage * msg)
appendStringInfoString(&s, "{\"type\": \"NeonExistsRequest\"");
appendStringInfo(&s, ", \"rnode\": \"%u/%u/%u\"",
msg_req->rnode.spcNode,
msg_req->rnode.dbNode,
msg_req->rnode.relNode);
RnodeGetSpcOid(msg_req->rnode),
RnodeGetDbOid(msg_req->rnode),
RnodeGetRelNumber(msg_req->rnode));
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
@@ -1080,9 +1098,9 @@ nm_to_string(NeonMessage * msg)
appendStringInfoString(&s, "{\"type\": \"NeonNblocksRequest\"");
appendStringInfo(&s, ", \"rnode\": \"%u/%u/%u\"",
msg_req->rnode.spcNode,
msg_req->rnode.dbNode,
msg_req->rnode.relNode);
RnodeGetSpcOid(msg_req->rnode),
RnodeGetDbOid(msg_req->rnode),
RnodeGetRelNumber(msg_req->rnode));
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
@@ -1096,9 +1114,9 @@ nm_to_string(NeonMessage * msg)
appendStringInfoString(&s, "{\"type\": \"NeonGetPageRequest\"");
appendStringInfo(&s, ", \"rnode\": \"%u/%u/%u\"",
msg_req->rnode.spcNode,
msg_req->rnode.dbNode,
msg_req->rnode.relNode);
RnodeGetSpcOid(msg_req->rnode),
RnodeGetDbOid(msg_req->rnode),
RnodeGetRelNumber(msg_req->rnode));
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
appendStringInfo(&s, ", \"blkno\": %u", msg_req->blkno);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
@@ -1111,7 +1129,7 @@ nm_to_string(NeonMessage * msg)
NeonDbSizeRequest *msg_req = (NeonDbSizeRequest *) msg;
appendStringInfoString(&s, "{\"type\": \"NeonDbSizeRequest\"");
appendStringInfo(&s, ", \"dbnode\": \"%u\"", msg_req->dbNode);
appendStringInfo(&s, ", \"dbnode\": \"%u\"", msg_req->dbOid);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
appendStringInfoChar(&s, '}');
@@ -1213,6 +1231,7 @@ static void
neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool force)
{
XLogRecPtr lsn = PageGetLSN(buffer);
RelFileNode rnode = RelnGetRnode(reln);
if (ShutdownRequestPending)
return;
@@ -1232,15 +1251,16 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ch
/* FSM is never WAL-logged and we don't care. */
XLogRecPtr recptr;
recptr = log_newpage_copy(&reln->smgr_rnode.node, forknum, blocknum, buffer, false);
recptr = log_newpage_copy(&rnode, forknum, blocknum, buffer, false);
XLogFlush(recptr);
lsn = recptr;
ereport(SmgrTrace,
(errmsg("Page %u of relation %u/%u/%u.%u was force logged. Evicted at lsn=%X/%X",
blocknum,
reln->smgr_rnode.node.spcNode,
reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode,
RelnGetSpcOid(reln),
RelnGetDbOid(reln),
RelnGetRelNumber(reln),
forknum, LSN_FORMAT_ARGS(lsn))));
}
else if (lsn == InvalidXLogRecPtr)
@@ -1268,9 +1288,9 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ch
ereport(SmgrTrace,
(errmsg("Page %u of relation %u/%u/%u.%u is all-zeros",
blocknum,
reln->smgr_rnode.node.spcNode,
reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode,
RelnGetSpcOid(reln),
RelnGetDbOid(reln),
RelnGetRelNumber(reln),
forknum)));
}
else if (PageIsEmptyHeapPage(buffer))
@@ -1278,9 +1298,9 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ch
ereport(SmgrTrace,
(errmsg("Page %u of relation %u/%u/%u.%u is an empty heap page with no LSN",
blocknum,
reln->smgr_rnode.node.spcNode,
reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode,
RelnGetSpcOid(reln),
RelnGetDbOid(reln),
RelnGetRelNumber(reln),
forknum)));
}
else
@@ -1288,9 +1308,9 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ch
ereport(PANIC,
(errmsg("Page %u of relation %u/%u/%u.%u is evicted with zero LSN",
blocknum,
reln->smgr_rnode.node.spcNode,
reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode,
RelnGetSpcOid(reln),
RelnGetDbOid(reln),
RelnGetRelNumber(reln),
forknum)));
}
}
@@ -1299,9 +1319,9 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ch
ereport(SmgrTrace,
(errmsg("Page %u of relation %u/%u/%u.%u is already wal logged at lsn=%X/%X",
blocknum,
reln->smgr_rnode.node.spcNode,
reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode,
RelnGetSpcOid(reln),
RelnGetDbOid(reln),
RelnGetRelNumber(reln),
forknum, LSN_FORMAT_ARGS(lsn))));
}
@@ -1309,7 +1329,7 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ch
* Remember the LSN on this page. When we read the page again, we must
* read the same or newer version of it.
*/
SetLastWrittenLSNForBlock(lsn, reln->smgr_rnode.node, forknum, blocknum);
SetLastWrittenLSNForBlock(lsn, rnode, forknum, blocknum);
}
/*
@@ -1459,6 +1479,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
BlockNumber n_blocks;
bool latest;
XLogRecPtr request_lsn;
RelFileNode rnode = RelnGetRnode(reln);
switch (reln->smgr_relpersistence)
{
@@ -1485,7 +1506,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
if (get_cached_relsize(reln->smgr_rnode.node, forkNum, &n_blocks))
if (get_cached_relsize(RelnGetRnode(reln), forkNum, &n_blocks))
{
return true;
}
@@ -1500,20 +1521,20 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
*
* For now, handle that special case here.
*/
if (reln->smgr_rnode.node.spcNode == 0 &&
reln->smgr_rnode.node.dbNode == 0 &&
reln->smgr_rnode.node.relNode == 0)
if (RelnGetSpcOid(reln) == 0 &&
RelnGetDbOid(reln) == 0 &&
RelnGetRelNumber(reln) == 0)
{
return false;
}
request_lsn = neon_get_request_lsn(&latest, reln->smgr_rnode.node, forkNum, REL_METADATA_PSEUDO_BLOCKNO);
request_lsn = neon_get_request_lsn(&latest, rnode, forkNum, REL_METADATA_PSEUDO_BLOCKNO);
{
NeonExistsRequest request = {
.req.tag = T_NeonExistsRequest,
.req.latest = latest,
.req.lsn = request_lsn,
.rnode = reln->smgr_rnode.node,
.rnode = rnode,
.forknum = forkNum};
resp = page_server_request(&request);
@@ -1529,9 +1550,9 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X",
reln->smgr_rnode.node.spcNode,
reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode,
RelnGetSpcOid(reln),
RelnGetDbOid(reln),
RelnGetRelNumber(reln),
forkNum,
(uint32) (request_lsn >> 32), (uint32) request_lsn),
errdetail("page server returned error: %s",
@@ -1553,6 +1574,8 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
void
neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
{
RelFileNode rnode = RelnGetRnode(reln);
switch (reln->smgr_relpersistence)
{
case 0:
@@ -1571,9 +1594,8 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
}
elog(SmgrTrace, "Create relation %u/%u/%u.%u",
reln->smgr_rnode.node.spcNode,
reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode,
RelnGetSpcOid(reln),
RelnGetDbOid(reln), RelnGetRelNumber(reln),
forkNum);
/*
@@ -1597,12 +1619,12 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
*/
if (isRedo)
{
update_cached_relsize(reln->smgr_rnode.node, forkNum, 0);
get_cached_relsize(reln->smgr_rnode.node, forkNum,
update_cached_relsize(rnode, forkNum, 0);
get_cached_relsize(rnode, forkNum,
&reln->smgr_cached_nblocks[forkNum]);
}
else
set_cached_relsize(reln->smgr_rnode.node, forkNum, 0);
set_cached_relsize(rnode, forkNum, 0);
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
@@ -1639,7 +1661,12 @@ neon_unlink(RelFileNodeBackend rnode, ForkNumber forkNum, bool isRedo)
mdunlink(rnode, forkNum, isRedo);
if (!RelFileNodeBackendIsTemp(rnode))
{
#if PG_VERSION_NUM >= 160000
forget_cached_relsize(rnode.locator, forkNum);
#else
forget_cached_relsize(rnode.node, forkNum);
#endif
}
}
@@ -1658,6 +1685,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
{
XLogRecPtr lsn;
BlockNumber n_blocks = 0;
RelFileNode rnode = RelnGetRnode(reln);
switch (reln->smgr_relpersistence)
{
@@ -1707,17 +1735,16 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
neon_wallog_page(reln, forkNum, n_blocks++, buffer, true);
neon_wallog_page(reln, forkNum, blkno, buffer, false);
set_cached_relsize(reln->smgr_rnode.node, forkNum, blkno + 1);
set_cached_relsize(rnode, forkNum, blkno + 1);
lsn = PageGetLSN(buffer);
elog(SmgrTrace, "smgrextend called for %u/%u/%u.%u blk %u, page LSN: %X/%08X",
reln->smgr_rnode.node.spcNode,
reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode,
RelnGetSpcOid(reln),
RelnGetDbOid(reln), RelnGetRelNumber(reln),
forkNum, blkno,
(uint32) (lsn >> 32), (uint32) lsn);
lfc_write(reln->smgr_rnode.node, forkNum, blkno, buffer);
lfc_write(rnode, forkNum, blkno, buffer);
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
@@ -1732,9 +1759,9 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
if (lsn == InvalidXLogRecPtr)
{
lsn = GetXLogInsertRecPtr();
SetLastWrittenLSNForBlock(lsn, reln->smgr_rnode.node, forkNum, blkno);
SetLastWrittenLSNForBlock(lsn, rnode, forkNum, blkno);
}
SetLastWrittenLSNForRelation(lsn, reln->smgr_rnode.node, forkNum);
SetLastWrittenLSNForRelation(lsn, rnode, forkNum);
}
/*
@@ -1778,6 +1805,8 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
BufferTag tag;
uint64 ring_index PG_USED_FOR_ASSERTS_ONLY;
RelFileNode rnode = RelnGetRnode(reln);
switch (reln->smgr_relpersistence)
{
case 0: /* probably shouldn't happen, but ignore it */
@@ -1792,15 +1821,18 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
if (lfc_cache_contains(reln->smgr_rnode.node, forknum, blocknum))
if (lfc_cache_contains(rnode, forknum, blocknum))
return false;
#if PG_VERSION_NUM >= 160000
InitBufferTag(&tag, &rnode, forknum, blocknum);
#else
tag = (BufferTag) {
.rnode = reln->smgr_rnode.node,
.rnode = rnode,
.forkNum = forknum,
.blockNum = blocknum
};
#endif
ring_index = prefetch_register_buffer(tag, NULL, NULL);
Assert(ring_index < MyPState->ring_unused &&
@@ -1861,11 +1893,15 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
PrfHashEntry *entry;
PrefetchRequest *slot;
#if PG_VERSION_NUM >= 160000
InitBufferTag(&buftag, &rnode, forkNum, blkno);
#else
buftag = (BufferTag) {
.rnode = rnode,
.forkNum = forkNum,
.blockNum = blkno,
.blockNum = blkno
};
#endif
/*
* The redo process does not lock pages that it needs to replay but are
@@ -1965,9 +2001,9 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
(errcode(ERRCODE_IO_ERROR),
errmsg("could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X",
blkno,
rnode.spcNode,
rnode.dbNode,
rnode.relNode,
RnodeGetSpcOid(rnode),
RnodeGetDbOid(rnode),
RnodeGetRelNumber(rnode),
forkNum,
(uint32) (request_lsn >> 32), (uint32) request_lsn),
errdetail("page server returned error: %s",
@@ -1991,6 +2027,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
{
bool latest;
XLogRecPtr request_lsn;
RelFileNode rnode = RelnGetRnode(reln);
switch (reln->smgr_relpersistence)
{
@@ -2010,13 +2047,13 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
}
/* Try to read from local file cache */
if (lfc_read(reln->smgr_rnode.node, forkNum, blkno, buffer))
if (lfc_read(RelnGetRnode(reln), forkNum, blkno, buffer))
{
return;
}
request_lsn = neon_get_request_lsn(&latest, reln->smgr_rnode.node, forkNum, blkno);
neon_read_at_lsn(reln->smgr_rnode.node, forkNum, blkno, request_lsn, latest, buffer);
request_lsn = neon_get_request_lsn(&latest, rnode, forkNum, blkno);
neon_read_at_lsn(rnode, forkNum, blkno, request_lsn, latest, buffer);
#ifdef DEBUG_COMPARE_LOCAL
if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
@@ -2036,9 +2073,9 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
{
elog(PANIC, "page is new in MD but not in Page Server at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n",
blkno,
reln->smgr_rnode.node.spcNode,
reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode,
RelnGetSpcOid(reln),
RelnGetDbOid(reln),
RelnGetRelNumber(reln),
forkNum,
(uint32) (request_lsn >> 32), (uint32) request_lsn,
hexdump_page(buffer));
@@ -2048,9 +2085,9 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
{
elog(PANIC, "page is new in Page Server but not in MD at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n",
blkno,
reln->smgr_rnode.node.spcNode,
reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode,
RelnGetSpcOid(reln),
RelnGetDbOid(reln),
RelnGetRelNumber(reln),
forkNum,
(uint32) (request_lsn >> 32), (uint32) request_lsn,
hexdump_page(mdbuf));
@@ -2065,9 +2102,9 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
{
elog(PANIC, "heap buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n",
blkno,
reln->smgr_rnode.node.spcNode,
reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode,
RelnGetSpcOid(reln),
RelnGetDbOid(reln),
RelnGetRelNumber(reln),
forkNum,
(uint32) (request_lsn >> 32), (uint32) request_lsn,
hexdump_page(mdbuf_masked),
@@ -2086,9 +2123,9 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
{
elog(PANIC, "btree buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n",
blkno,
reln->smgr_rnode.node.spcNode,
reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode,
RelnGetSpcOid(reln),
RelnGetDbOid(reln),
RelnGetRelNumber(reln),
forkNum,
(uint32) (request_lsn >> 32), (uint32) request_lsn,
hexdump_page(mdbuf_masked),
@@ -2133,7 +2170,7 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
char *buffer, bool skipFsync)
{
XLogRecPtr lsn;
RelFileNode rnode = RelnGetRnode(reln);
switch (reln->smgr_relpersistence)
{
case 0:
@@ -2170,13 +2207,12 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
lsn = PageGetLSN(buffer);
elog(SmgrTrace, "smgrwrite called for %u/%u/%u.%u blk %u, page LSN: %X/%08X",
reln->smgr_rnode.node.spcNode,
reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode,
RelnGetSpcOid(reln),
RelnGetDbOid(reln), RelnGetRelNumber(reln),
forknum, blocknum,
(uint32) (lsn >> 32), (uint32) lsn);
lfc_write(reln->smgr_rnode.node, forknum, blocknum, buffer);
lfc_write(rnode, forknum, blocknum, buffer);
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
@@ -2194,6 +2230,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
BlockNumber n_blocks;
bool latest;
XLogRecPtr request_lsn;
RelFileNode rnode = RelnGetRnode(reln);
switch (reln->smgr_relpersistence)
{
@@ -2212,23 +2249,23 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
if (get_cached_relsize(reln->smgr_rnode.node, forknum, &n_blocks))
if (get_cached_relsize(RelnGetRnode(reln), forknum, &n_blocks))
{
elog(SmgrTrace, "cached nblocks for %u/%u/%u.%u: %u blocks",
reln->smgr_rnode.node.spcNode,
reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode,
RelnGetSpcOid(reln),
RelnGetDbOid(reln),
RelnGetRelNumber(reln),
forknum, n_blocks);
return n_blocks;
}
request_lsn = neon_get_request_lsn(&latest, reln->smgr_rnode.node, forknum, REL_METADATA_PSEUDO_BLOCKNO);
request_lsn = neon_get_request_lsn(&latest, rnode, forknum, REL_METADATA_PSEUDO_BLOCKNO);
{
NeonNblocksRequest request = {
.req.tag = T_NeonNblocksRequest,
.req.latest = latest,
.req.lsn = request_lsn,
.rnode = reln->smgr_rnode.node,
.rnode = rnode,
.forknum = forknum,
};
@@ -2245,9 +2282,9 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X",
reln->smgr_rnode.node.spcNode,
reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode,
RelnGetSpcOid(reln),
RelnGetDbOid(reln),
RelnGetRelNumber(reln),
forknum,
(uint32) (request_lsn >> 32), (uint32) request_lsn),
errdetail("page server returned error: %s",
@@ -2257,12 +2294,11 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
default:
elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
}
update_cached_relsize(reln->smgr_rnode.node, forknum, n_blocks);
update_cached_relsize(rnode, forknum, n_blocks);
elog(SmgrTrace, "neon_nblocks: rel %u/%u/%u fork %u (request LSN %X/%08X): %u blocks",
reln->smgr_rnode.node.spcNode,
reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode,
RelnGetSpcOid(reln),
RelnGetDbOid(reln), RelnGetRelNumber(reln),
forknum,
(uint32) (request_lsn >> 32), (uint32) request_lsn,
n_blocks);
@@ -2275,7 +2311,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
* neon_db_size() -- Get the size of the database in bytes.
*/
int64
neon_dbsize(Oid dbNode)
neon_dbsize(Oid dbOid)
{
NeonResponse *resp;
int64 db_size;
@@ -2289,7 +2325,7 @@ neon_dbsize(Oid dbNode)
.req.tag = T_NeonDbSizeRequest,
.req.latest = latest,
.req.lsn = request_lsn,
.dbNode = dbNode,
.dbOid = dbOid,
};
resp = page_server_request(&request);
@@ -2305,7 +2341,7 @@ neon_dbsize(Oid dbNode)
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("could not read db size of db %u from page server at lsn %X/%08X",
dbNode,
dbOid,
(uint32) (request_lsn >> 32), (uint32) request_lsn),
errdetail("page server returned error: %s",
((NeonErrorResponse *) resp)->message)));
@@ -2316,7 +2352,7 @@ neon_dbsize(Oid dbNode)
}
elog(SmgrTrace, "neon_dbsize: db %u (request LSN %X/%08X): %ld bytes",
dbNode,
dbOid,
(uint32) (request_lsn >> 32), (uint32) request_lsn,
db_size);
@@ -2331,6 +2367,7 @@ void
neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
{
XLogRecPtr lsn;
RelFileNode rnode = RelnGetRnode(reln);
switch (reln->smgr_relpersistence)
{
@@ -2350,7 +2387,7 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
set_cached_relsize(reln->smgr_rnode.node, forknum, nblocks);
set_cached_relsize(rnode, forknum, nblocks);
/*
* Truncating a relation drops all its buffers from the buffer cache
@@ -2378,7 +2415,7 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
* for the extended pages, so there's no harm in leaving behind obsolete
* entries for the truncated chunks.
*/
SetLastWrittenLSNForRelation(lsn, reln->smgr_rnode.node, forknum);
SetLastWrittenLSNForRelation(lsn, rnode, forknum);
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
@@ -2448,9 +2485,9 @@ neon_start_unlogged_build(SMgrRelation reln)
ereport(SmgrTrace,
(errmsg("starting unlogged build of relation %u/%u/%u",
reln->smgr_rnode.node.spcNode,
reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode)));
RelnGetSpcOid(reln),
RelnGetDbOid(reln),
RelnGetRelNumber(reln))));
switch (reln->smgr_relpersistence)
{
@@ -2500,9 +2537,9 @@ neon_finish_unlogged_build_phase_1(SMgrRelation reln)
ereport(SmgrTrace,
(errmsg("finishing phase 1 of unlogged build of relation %u/%u/%u",
reln->smgr_rnode.node.spcNode,
reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode)));
RelnGetSpcOid(reln),
RelnGetDbOid(reln),
RelnGetRelNumber(reln))));
if (unlogged_build_phase == UNLOGGED_BUILD_NOT_PERMANENT)
return;
@@ -2529,9 +2566,9 @@ neon_end_unlogged_build(SMgrRelation reln)
ereport(SmgrTrace,
(errmsg("ending unlogged build of relation %u/%u/%u",
reln->smgr_rnode.node.spcNode,
reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode)));
RelnGetSpcOid(reln),
RelnGetDbOid(reln),
RelnGetRelNumber(reln))));
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_PERMANENT)
{
@@ -2544,16 +2581,24 @@ neon_end_unlogged_build(SMgrRelation reln)
reln->smgr_relpersistence = RELPERSISTENCE_PERMANENT;
/* Remove local copy */
rnode = reln->smgr_rnode;
#if PG_VERSION_NUM >= 160000
rnode.locator = RelnGetRnode(reln);
#else
rnode.node = RelnGetRnode(reln);
#endif
for (int forknum = 0; forknum <= MAX_FORKNUM; forknum++)
{
elog(SmgrTrace, "forgetting cached relsize for %u/%u/%u.%u",
rnode.node.spcNode,
rnode.node.dbNode,
rnode.node.relNode,
RelnGetSpcOid(reln),
RelnGetDbOid(reln),
RelnGetRelNumber(reln),
forknum);
#if PG_VERSION_NUM >= 160000
forget_cached_relsize(rnode.locator, forknum);
#else
forget_cached_relsize(rnode.node, forknum);
#endif
mdclose(reln, forknum);
/* use isRedo == true, so that we drop it immediately */
mdunlink(rnode, forknum, true);
@@ -2706,10 +2751,16 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
* regardless of whether the block is stored in shared buffers.
* See also this function's top comment.
*/
if (!OidIsValid(rnode.dbNode))
if (!OidIsValid(RnodeGetDbOid(rnode)))
return false;
#if PG_VERSION_NUM >= 160000
InitBufferTag(&tag, &rnode, forknum, blkno);
#else
INIT_BUFFERTAG(tag, rnode, forknum, blkno);
#endif
hash = BufTableHashCode(&tag);
partitionLock = BufMappingPartitionLock(hash);

View File

@@ -15,7 +15,11 @@
#include "postgres.h"
#include "pagestore_client.h"
#if PG_VERSION_NUM >= 160000
#include "storage/relfilelocator.h"
#else
#include "storage/relfilenode.h"
#endif
#include "storage/smgr.h"
#include "storage/lwlock.h"
#include "storage/ipc.h"
@@ -28,6 +32,7 @@
#include "miscadmin.h"
#endif
typedef struct
{
RelFileNode rnode;

View File

@@ -1394,7 +1394,12 @@ WalProposerRecovery(int donor, TimeLineID timeline, XLogRecPtr startpos, XLogRec
WalReceiverConn *wrconn;
WalRcvStreamOptions options;
#if PG_VERSION_NUM >= 160000
bool must_use_password = false;
wrconn = walrcv_connect(safekeeper[donor].conninfo, false, must_use_password, "wal_proposer_recovery", &err);
#else
wrconn = walrcv_connect(safekeeper[donor].conninfo, false, "wal_proposer_recovery", &err);
#endif
if (!wrconn)
{
ereport(WARNING,

View File

@@ -26,6 +26,10 @@
#include "access/xlogrecovery.h"
#endif
#if PG_VERSION_NUM >= 160000
#include "utils/guc.h"
#endif
/*
* These variables are used similarly to openLogFile/SegNo,
* but for walproposer to write the XLOG during recovery. walpropFileTLI is the TimeLineID

View File

@@ -128,7 +128,11 @@ clear_buffer_cache(PG_FUNCTION_ARGS)
else
isvalid = false;
bufferid = BufferDescriptorGetBuffer(bufHdr);
#if PG_VERSION_NUM >= 160000
rnode = BufTagGetRelFileLocator(&bufHdr->tag);
#else
rnode = bufHdr->tag.rnode;
#endif
forknum = bufHdr->tag.forkNum;
blocknum = bufHdr->tag.blockNum;
@@ -238,7 +242,7 @@ get_raw_page_at_lsn(PG_FUNCTION_ARGS)
SET_VARSIZE(raw_page, BLCKSZ + VARHDRSZ);
raw_page_data = VARDATA(raw_page);
neon_read_at_lsn(rel->rd_node, forknum, blkno, read_lsn, request_latest, raw_page_data);
neon_read_at_lsn(RelnGetRnode(RelationGetSmgr(rel)), forknum, blkno, read_lsn, request_latest, raw_page_data);
relation_close(rel, AccessShareLock);
@@ -267,11 +271,17 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS)
PG_RETURN_NULL();
{
#if PG_VERSION_NUM >= 160000
RelFileLocator rnode = {
.spcOid = PG_GETARG_OID(0),
.dbOid = PG_GETARG_OID(1),
.relNumber = PG_GETARG_OID(2)};
#else
RelFileNode rnode = {
.spcNode = PG_GETARG_OID(0),
.dbNode = PG_GETARG_OID(1),
.relNode = PG_GETARG_OID(2)};
#endif
ForkNumber forknum = PG_GETARG_UINT32(3);
uint32 blkno = PG_GETARG_UINT32(4);

View File

@@ -21,7 +21,6 @@
#include "access/xlog.h"
#include "storage/block.h"
#include "storage/buf_internals.h"
#include "storage/relfilenode.h"
#include "storage/smgr.h"
#if PG_VERSION_NUM >= 150000
@@ -30,6 +29,7 @@
#include "inmem_smgr.h"
/* Size of the in-memory smgr */
#define MAX_PAGES 64
@@ -46,12 +46,22 @@ locate_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno)
/* We only hold a small number of pages, so linear search */
for (int i = 0; i < used_pages; i++)
{
if (RelFileNodeEquals(reln->smgr_rnode.node, page_tag[i].rnode)
#if PG_VERSION_NUM >= 160000
if (BufTagMatchesRelFileLocator(&page_tag[i], &reln->smgr_rlocator.locator)
&& forknum == page_tag[i].forkNum
&& blkno == page_tag[i].blockNum)
{
return i;
}
#else
if (RelFileNodeEquals(RelnGetRnode(reln), page_tag[i].rnode)
&& forknum == page_tag[i].forkNum
&& blkno == page_tag[i].blockNum)
{
return i;
}
#endif
}
return -1;
}
@@ -97,8 +107,12 @@ inmem_exists(SMgrRelation reln, ForkNumber forknum)
{
for (int i = 0; i < used_pages; i++)
{
if (RelFileNodeEquals(reln->smgr_rnode.node, page_tag[i].rnode)
&& forknum == page_tag[i].forkNum)
#if PG_VERSION_NUM >= 160000
if (BufTagMatchesRelFileLocator(&page_tag[i], &reln->smgr_rlocator.locator)
#else
if (RelFileNodeEquals(RelnGetRnode(reln), page_tag[i].rnode)
#endif
&& forknum == page_tag[i].forkNum)
{
return true;
}
@@ -216,9 +230,9 @@ inmem_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
*/
elog(used_pages >= WARN_PAGES ? WARNING : DEBUG1,
"inmem_write() called for %u/%u/%u.%u blk %u: used_pages %u",
reln->smgr_rnode.node.spcNode,
reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode,
RelnGetSpcOid(reln),
RelnGetDbOid(reln),
RelnGetRelNumber(reln),
forknum,
blocknum,
used_pages);
@@ -227,14 +241,19 @@ inmem_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
pg = used_pages;
used_pages++;
INIT_BUFFERTAG(page_tag[pg], reln->smgr_rnode.node, forknum, blocknum);
#if PG_VERSION_NUM >= 160000
InitBufferTag(&page_tag[pg], &RelnGetRnode(reln), forknum, blocknum);
#else
INIT_BUFFERTAG(page_tag[pg], RelnGetRnode(reln), forknum, blocknum);
#endif
}
else
{
elog(DEBUG1, "inmem_write() called for %u/%u/%u.%u blk %u: found at %u",
reln->smgr_rnode.node.spcNode,
reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode,
RelnGetSpcOid(reln),
RelnGetDbOid(reln),
RelnGetRelNumber(reln),
forknum,
blocknum,
used_pages);

View File

@@ -11,6 +11,40 @@
#ifndef INMEM_SMGR_H
#define INMEM_SMGR_H
#if PG_VERSION_NUM >= 160000
#include "storage/relfilelocator.h"
#else
#include "storage/relfilenode.h"
#endif
// This is a hack to avoid too many ifdefs in the function definitions.
#if PG_VERSION_NUM >= 160000
typedef RelFileLocator RelFileNode;
typedef RelFileLocatorBackend RelFileNodeBackend;
#define RelFileNodeBackendIsTemp RelFileLocatorBackendIsTemp
#endif
#if PG_VERSION_NUM >= 160000
#define RelnGetRnode(reln) (reln->smgr_rlocator.locator)
#define RnodeGetSpcOid(rnode) (rnode.spcOid)
#define RnodeGetDbOid(rnode) (rnode.dbOid)
#define RnodeGetRelNumber(rnode) (rnode.relNumber)
#define BufTagGetRnode(tag) (BufTagGetRelFileLocator(&tag))
#else
#define RelnGetRnode(reln) (reln->smgr_rnode.node)
#define RnodeGetSpcOid(rnode) (rnode.spcNode)
#define RnodeGetDbOid(rnode) (rnode.dbNode)
#define RnodeGetRelNumber(rnode) (rnode.relNode)
#define BufTagGetRnode(tag) (tag.rnode)
#endif
#define RelnGetSpcOid(reln) (RnodeGetRelNumber(RelnGetRnode(reln)))
#define RelnGetDbOid(reln) (RnodeGetDbOid(RelnGetRnode(reln)))
#define RelnGetRelNumber(reln) (RnodeGetRelNumber(RelnGetRnode(reln)))
extern const f_smgr *smgr_inmem(BackendId backend, RelFileNode rnode);
extern void smgr_init_inmem(void);

View File

@@ -62,8 +62,10 @@
#endif
#ifndef HAVE_GETRUSAGE
#if PG_VERSION_NUM < 160000
#include "rusagestub.h"
#endif
#endif
#include "access/clog.h"
#include "access/commit_ts.h"
@@ -117,6 +119,7 @@
#include "neon_seccomp.h"
#endif
PG_MODULE_MAGIC;
static int ReadRedoCommand(StringInfo inBuf);
@@ -662,18 +665,31 @@ BeginRedoForBlock(StringInfo input_message)
* BlockNumber
*/
forknum = pq_getmsgbyte(input_message);
#if PG_VERSION_NUM >= 160000
rnode.spcOid = pq_getmsgint(input_message, 4);
rnode.dbOid = pq_getmsgint(input_message, 4);
rnode.relNumber = pq_getmsgint(input_message, 4);
#else
rnode.spcNode = pq_getmsgint(input_message, 4);
rnode.dbNode = pq_getmsgint(input_message, 4);
rnode.relNode = pq_getmsgint(input_message, 4);
#endif
blknum = pq_getmsgint(input_message, 4);
wal_redo_buffer = InvalidBuffer;
#if PG_VERSION_NUM >= 160000
InitBufferTag(&target_redo_tag, &rnode, forknum, blknum);
#else
INIT_BUFFERTAG(target_redo_tag, rnode, forknum, blknum);
#endif
elog(TRACE, "BeginRedoForBlock %u/%u/%u.%d blk %u",
target_redo_tag.rnode.spcNode,
target_redo_tag.rnode.dbNode,
target_redo_tag.rnode.relNode,
#if PG_VERSION_NUM >= 160000
target_redo_tag.spcOid, target_redo_tag.dbOid, target_redo_tag.relNumber,
#else
target_redo_tag.rnode.spcNode, target_redo_tag.rnode.dbNode, target_redo_tag.rnode.relNode,
#endif
target_redo_tag.forkNum,
target_redo_tag.blockNum);
@@ -709,9 +725,15 @@ PushPage(StringInfo input_message)
* 8k page content
*/
forknum = pq_getmsgbyte(input_message);
#if PG_VERSION_NUM >= 160000
rnode.spcOid = pq_getmsgint(input_message, 4);
rnode.dbOid = pq_getmsgint(input_message, 4);
rnode.relNumber = pq_getmsgint(input_message, 4);
#else
rnode.spcNode = pq_getmsgint(input_message, 4);
rnode.dbNode = pq_getmsgint(input_message, 4);
rnode.relNode = pq_getmsgint(input_message, 4);
#endif
blknum = pq_getmsgint(input_message, 4);
content = pq_getmsgbytes(input_message, BLCKSZ);
@@ -831,7 +853,12 @@ ApplyRecord(StringInfo input_message)
*/
if (BufferIsInvalid(wal_redo_buffer))
{
wal_redo_buffer = NeonRedoReadBuffer(target_redo_tag.rnode,
wal_redo_buffer = NeonRedoReadBuffer(
#if PG_VERSION_NUM >= 160000
BufTagGetRelFileLocator(&target_redo_tag),
#else
target_redo_tag.rnode,
#endif
target_redo_tag.forkNum,
target_redo_tag.blockNum,
RBM_NORMAL);
@@ -873,12 +900,43 @@ apply_error_callback(void *arg)
}
#if PG_VERSION_NUM >= 160000
static bool
redo_block_filter(XLogReaderState *record, uint8 block_id)
{
BufferTag target_tag;
RelFileLocator rlocator;
XLogRecGetBlockTag(record, block_id,
&rlocator, &target_tag.forkNum, &target_tag.blockNum);
target_tag.spcOid = rlocator.spcOid;
target_tag.dbOid = rlocator.dbOid;
target_tag.relNumber = rlocator.relNumber;
/*
* Can a WAL redo function ever access a relation other than the one that
* it modifies? I don't see why it would.
*/
if (RelFileLocatorEquals(BufTagGetRelFileLocator(&target_tag), BufTagGetRelFileLocator(&target_redo_tag)))
elog(WARNING, "REDO accessing unexpected page: %u/%u/%u.%u blk %u",
target_tag.spcOid, target_tag.dbOid, target_tag.relNumber,
target_tag.forkNum, target_tag.blockNum);
/*
* If this block isn't one we are currently restoring, then return 'true'
* so that this gets ignored
*/
return !BufferTagsEqual(&target_tag, &target_redo_tag);
}
#else
static bool
redo_block_filter(XLogReaderState *record, uint8 block_id)
{
BufferTag target_tag;
#if PG_VERSION_NUM >= 150000
XLogRecGetBlockTag(record, block_id,
&target_tag.rnode, &target_tag.forkNum, &target_tag.blockNum);
@@ -897,14 +955,18 @@ redo_block_filter(XLogReaderState *record, uint8 block_id)
*/
if (!RelFileNodeEquals(target_tag.rnode, target_redo_tag.rnode))
elog(WARNING, "REDO accessing unexpected page: %u/%u/%u.%u blk %u",
target_tag.rnode.spcNode, target_tag.rnode.dbNode, target_tag.rnode.relNode, target_tag.forkNum, target_tag.blockNum);
target_tag.rnode.spcNode, target_tag.rnode.dbNode, target_tag.rnode.relNode,
target_tag.forkNum, target_tag.blockNum);
/*
* If this block isn't one we are currently restoring, then return 'true'
* so that this gets ignored
*/
return !BUFFERTAGS_EQUAL(target_tag, target_redo_tag);
}
#endif
/*
* Get a page image back from buffer cache.
@@ -931,9 +993,15 @@ GetPage(StringInfo input_message)
* BlockNumber
*/
forknum = pq_getmsgbyte(input_message);
#if PG_VERSION_NUM >= 160000
rnode.spcOid = pq_getmsgint(input_message, 4);
rnode.dbOid = pq_getmsgint(input_message, 4);
rnode.relNumber = pq_getmsgint(input_message, 4);
#else
rnode.spcNode = pq_getmsgint(input_message, 4);
rnode.dbNode = pq_getmsgint(input_message, 4);
rnode.relNode = pq_getmsgint(input_message, 4);
#endif
blknum = pq_getmsgint(input_message, 4);
/* FIXME: check that we got a BeginRedoForBlock message or this earlier */
@@ -961,7 +1029,11 @@ GetPage(StringInfo input_message)
} while (tot_written < BLCKSZ);
ReleaseBuffer(buf);
#if PG_VERSION_NUM >= 160000
DropRelationAllLocalBuffers(rnode);
#else
DropRelFileNodeAllLocalBuffers(rnode);
#endif
wal_redo_buffer = InvalidBuffer;
elog(TRACE, "Page sent back for block %u", blknum);