Compare commits

...

3 Commits

Author SHA1 Message Date
Anastasia Lubennikova
12f0c0ec8f Add vendor/postgres-v16 and v16 support in pgxn/neon 2023-07-17 17:51:27 +03:00
Anastasia Lubennikova
13bd44a1f0 Add vendor/postgres-v16 submodule 2023-07-17 17:46:22 +03:00
Anastasia Lubennikova
cda148d40d Add version 16 support in the rust and python code 2023-07-17 17:32:10 +03:00
24 changed files with 454 additions and 159 deletions

View File

@@ -18,6 +18,7 @@
!trace/ !trace/
!vendor/postgres-v14/ !vendor/postgres-v14/
!vendor/postgres-v15/ !vendor/postgres-v15/
!vendor/postgres-v16/
!workspace_hack/ !workspace_hack/
!neon_local/ !neon_local/
!scripts/ninstall.sh !scripts/ninstall.sh

4
.gitmodules vendored
View File

@@ -6,3 +6,7 @@
path = vendor/postgres-v15 path = vendor/postgres-v15
url = https://github.com/neondatabase/postgres.git url = https://github.com/neondatabase/postgres.git
branch = REL_15_STABLE_neon branch = REL_15_STABLE_neon
[submodule "vendor/postgres-v16"]
path = vendor/postgres-v16
url = https://github.com/neondatabase/postgres.git
branch = REL_16_STABLE_neon

View File

@@ -12,6 +12,7 @@ WORKDIR /home/nonroot
COPY --chown=nonroot vendor/postgres-v14 vendor/postgres-v14 COPY --chown=nonroot vendor/postgres-v14 vendor/postgres-v14
COPY --chown=nonroot vendor/postgres-v15 vendor/postgres-v15 COPY --chown=nonroot vendor/postgres-v15 vendor/postgres-v15
COPY --chown=nonroot vendor/postgres-v16 vendor/postgres-v16
COPY --chown=nonroot pgxn pgxn COPY --chown=nonroot pgxn pgxn
COPY --chown=nonroot Makefile Makefile COPY --chown=nonroot Makefile Makefile
COPY --chown=nonroot scripts/ninstall.sh scripts/ninstall.sh COPY --chown=nonroot scripts/ninstall.sh scripts/ninstall.sh
@@ -39,6 +40,7 @@ ARG CACHEPOT_BUCKET=neon-github-dev
COPY --from=pg-build /home/nonroot/pg_install/v14/include/postgresql/server pg_install/v14/include/postgresql/server COPY --from=pg-build /home/nonroot/pg_install/v14/include/postgresql/server pg_install/v14/include/postgresql/server
COPY --from=pg-build /home/nonroot/pg_install/v15/include/postgresql/server pg_install/v15/include/postgresql/server COPY --from=pg-build /home/nonroot/pg_install/v15/include/postgresql/server pg_install/v15/include/postgresql/server
COPY --from=pg-build /home/nonroot/pg_install/v16/include/postgresql/server pg_install/v16/include/postgresql/server
COPY --chown=nonroot . . COPY --chown=nonroot . .
# Show build caching stats to check if it was used in the end. # Show build caching stats to check if it was used in the end.
@@ -79,6 +81,7 @@ COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy
COPY --from=pg-build /home/nonroot/pg_install/v14 /usr/local/v14/ COPY --from=pg-build /home/nonroot/pg_install/v14 /usr/local/v14/
COPY --from=pg-build /home/nonroot/pg_install/v15 /usr/local/v15/ COPY --from=pg-build /home/nonroot/pg_install/v15 /usr/local/v15/
COPY --from=pg-build /home/nonroot/pg_install/v16 /usr/local/v16/
COPY --from=pg-build /home/nonroot/postgres_install.tar.gz /data/ COPY --from=pg-build /home/nonroot/postgres_install.tar.gz /data/
# By default, pageserver uses `.neon/` working directory in WORKDIR, so create one and fill it with the dummy config. # By default, pageserver uses `.neon/` working directory in WORKDIR, so create one and fill it with the dummy config.

View File

@@ -83,6 +83,8 @@ $(POSTGRES_INSTALL_DIR)/build/%/config.status:
# I'm not sure why it wouldn't work, but this is the only place (apart from # I'm not sure why it wouldn't work, but this is the only place (apart from
# the "build-all-versions" entry points) where direct mention of PostgreSQL # the "build-all-versions" entry points) where direct mention of PostgreSQL
# versions is used. # versions is used.
.PHONY: postgres-configure-v16
postgres-configure-v16: $(POSTGRES_INSTALL_DIR)/build/v16/config.status
.PHONY: postgres-configure-v15 .PHONY: postgres-configure-v15
postgres-configure-v15: $(POSTGRES_INSTALL_DIR)/build/v15/config.status postgres-configure-v15: $(POSTGRES_INSTALL_DIR)/build/v15/config.status
.PHONY: postgres-configure-v14 .PHONY: postgres-configure-v14
@@ -165,28 +167,33 @@ neon-pg-ext-clean-%:
.PHONY: neon-pg-ext .PHONY: neon-pg-ext
neon-pg-ext: \ neon-pg-ext: \
neon-pg-ext-v14 \ neon-pg-ext-v14 \
neon-pg-ext-v15 neon-pg-ext-v15 \
neon-pg-ext-v16
.PHONY: neon-pg-ext-clean .PHONY: neon-pg-ext-clean
neon-pg-ext-clean: \ neon-pg-ext-clean: \
neon-pg-ext-clean-v14 \ neon-pg-ext-clean-v14 \
neon-pg-ext-clean-v15 neon-pg-ext-clean-v15 \
neon-pg-ext-clean-v16
# shorthand to build all Postgres versions # shorthand to build all Postgres versions
.PHONY: postgres .PHONY: postgres
postgres: \ postgres: \
postgres-v14 \ postgres-v14 \
postgres-v15 postgres-v15 \
postgres-v16
.PHONY: postgres-headers .PHONY: postgres-headers
postgres-headers: \ postgres-headers: \
postgres-headers-v14 \ postgres-headers-v14 \
postgres-headers-v15 postgres-headers-v15 \
postgres-headers-v16
.PHONY: postgres-clean .PHONY: postgres-clean
postgres-clean: \ postgres-clean: \
postgres-clean-v14 \ postgres-clean-v14 \
postgres-clean-v15 postgres-clean-v15 \
postgres-clean-v16
# This doesn't remove the effects of 'configure'. # This doesn't remove the effects of 'configure'.
.PHONY: clean .PHONY: clean

View File

@@ -169,6 +169,7 @@ impl LocalEnv {
match pg_version { match pg_version {
14 => Ok(path.join(format!("v{pg_version}"))), 14 => Ok(path.join(format!("v{pg_version}"))),
15 => Ok(path.join(format!("v{pg_version}"))), 15 => Ok(path.join(format!("v{pg_version}"))),
16 => Ok(path.join(format!("v{pg_version}"))),
_ => bail!("Unsupported postgres version: {}", pg_version), _ => bail!("Unsupported postgres version: {}", pg_version),
} }
} }
@@ -177,6 +178,7 @@ impl LocalEnv {
match pg_version { match pg_version {
14 => Ok(self.pg_distrib_dir(pg_version)?.join("bin")), 14 => Ok(self.pg_distrib_dir(pg_version)?.join("bin")),
15 => Ok(self.pg_distrib_dir(pg_version)?.join("bin")), 15 => Ok(self.pg_distrib_dir(pg_version)?.join("bin")),
16 => Ok(self.pg_distrib_dir(pg_version)?.join("bin")),
_ => bail!("Unsupported postgres version: {}", pg_version), _ => bail!("Unsupported postgres version: {}", pg_version),
} }
} }
@@ -184,6 +186,7 @@ impl LocalEnv {
match pg_version { match pg_version {
14 => Ok(self.pg_distrib_dir(pg_version)?.join("lib")), 14 => Ok(self.pg_distrib_dir(pg_version)?.join("lib")),
15 => Ok(self.pg_distrib_dir(pg_version)?.join("lib")), 15 => Ok(self.pg_distrib_dir(pg_version)?.join("lib")),
16 => Ok(self.pg_distrib_dir(pg_version)?.join("lib")),
_ => bail!("Unsupported postgres version: {}", pg_version), _ => bail!("Unsupported postgres version: {}", pg_version),
} }
} }

View File

@@ -56,7 +56,7 @@ fn main() -> anyhow::Result<()> {
PathBuf::from("pg_install") PathBuf::from("pg_install")
}; };
for pg_version in &["v14", "v15"] { for pg_version in &["v14", "v15", "v16"] {
let mut pg_install_dir_versioned = pg_install_dir.join(pg_version); let mut pg_install_dir_versioned = pg_install_dir.join(pg_version);
if pg_install_dir_versioned.is_relative() { if pg_install_dir_versioned.is_relative() {
let cwd = env::current_dir().context("Failed to get current_dir")?; let cwd = env::current_dir().context("Failed to get current_dir")?;

View File

@@ -51,6 +51,7 @@ macro_rules! for_all_postgres_versions {
($macro:tt) => { ($macro:tt) => {
$macro!(v14); $macro!(v14);
$macro!(v15); $macro!(v15);
$macro!(v16);
}; };
} }
@@ -92,9 +93,10 @@ pub use v14::bindings::DBState_DB_SHUTDOWNED;
pub fn bkpimage_is_compressed(bimg_info: u8, version: u32) -> anyhow::Result<bool> { pub fn bkpimage_is_compressed(bimg_info: u8, version: u32) -> anyhow::Result<bool> {
match version { match version {
14 => Ok(bimg_info & v14::bindings::BKPIMAGE_IS_COMPRESSED != 0), 14 => Ok(bimg_info & v14::bindings::BKPIMAGE_IS_COMPRESSED != 0),
15 => Ok(bimg_info & v15::bindings::BKPIMAGE_COMPRESS_PGLZ != 0 15 | 16 => Ok(bimg_info & v15::bindings::BKPIMAGE_COMPRESS_PGLZ != 0
|| bimg_info & v15::bindings::BKPIMAGE_COMPRESS_LZ4 != 0 || bimg_info & v15::bindings::BKPIMAGE_COMPRESS_LZ4 != 0
|| bimg_info & v15::bindings::BKPIMAGE_COMPRESS_ZSTD != 0), || bimg_info & v15::bindings::BKPIMAGE_COMPRESS_ZSTD != 0),
_ => anyhow::bail!("Unknown version {}", version), _ => anyhow::bail!("Unknown version {}", version),
} }
} }
@@ -110,6 +112,7 @@ pub fn generate_wal_segment(
match pg_version { match pg_version {
14 => v14::xlog_utils::generate_wal_segment(segno, system_id, lsn), 14 => v14::xlog_utils::generate_wal_segment(segno, system_id, lsn),
15 => v15::xlog_utils::generate_wal_segment(segno, system_id, lsn), 15 => v15::xlog_utils::generate_wal_segment(segno, system_id, lsn),
16 => v16::xlog_utils::generate_wal_segment(segno, system_id, lsn),
_ => Err(SerializeError::BadInput), _ => Err(SerializeError::BadInput),
} }
} }
@@ -123,6 +126,7 @@ pub fn generate_pg_control(
match pg_version { match pg_version {
14 => v14::xlog_utils::generate_pg_control(pg_control_bytes, checkpoint_bytes, lsn), 14 => v14::xlog_utils::generate_pg_control(pg_control_bytes, checkpoint_bytes, lsn),
15 => v15::xlog_utils::generate_pg_control(pg_control_bytes, checkpoint_bytes, lsn), 15 => v15::xlog_utils::generate_pg_control(pg_control_bytes, checkpoint_bytes, lsn),
16 => v16::xlog_utils::generate_pg_control(pg_control_bytes, checkpoint_bytes, lsn),
_ => anyhow::bail!("Unknown version {}", pg_version), _ => anyhow::bail!("Unknown version {}", pg_version),
} }
} }
@@ -197,7 +201,7 @@ pub fn fsm_logical_to_physical(addr: BlockNumber) -> BlockNumber {
pub mod waldecoder { pub mod waldecoder {
use crate::{v14, v15}; use crate::{v14, v15, v16};
use bytes::{Buf, Bytes, BytesMut}; use bytes::{Buf, Bytes, BytesMut};
use std::num::NonZeroU32; use std::num::NonZeroU32;
use thiserror::Error; use thiserror::Error;
@@ -259,6 +263,10 @@ pub mod waldecoder {
use self::v15::waldecoder_handler::WalStreamDecoderHandler; use self::v15::waldecoder_handler::WalStreamDecoderHandler;
self.poll_decode_internal() self.poll_decode_internal()
} }
16 => {
use self::v16::waldecoder_handler::WalStreamDecoderHandler;
self.poll_decode_internal()
}
_ => Err(WalDecodeError { _ => Err(WalDecodeError {
msg: format!("Unknown version {}", self.pg_version), msg: format!("Unknown version {}", self.pg_version),
lsn: self.lsn, lsn: self.lsn,

View File

@@ -0,0 +1 @@

View File

@@ -52,6 +52,7 @@ impl Conf {
match self.pg_version { match self.pg_version {
14 => Ok(path.join(format!("v{}", self.pg_version))), 14 => Ok(path.join(format!("v{}", self.pg_version))),
15 => Ok(path.join(format!("v{}", self.pg_version))), 15 => Ok(path.join(format!("v{}", self.pg_version))),
16 => Ok(path.join(format!("v{}", self.pg_version))),
_ => bail!("Unsupported postgres version: {}", self.pg_version), _ => bail!("Unsupported postgres version: {}", self.pg_version),
} }
} }

View File

@@ -655,6 +655,7 @@ impl PageServerConf {
match pg_version { match pg_version {
14 => Ok(path.join(format!("v{pg_version}"))), 14 => Ok(path.join(format!("v{pg_version}"))),
15 => Ok(path.join(format!("v{pg_version}"))), 15 => Ok(path.join(format!("v{pg_version}"))),
16 => Ok(path.join(format!("v{pg_version}"))),
_ => bail!("Unsupported postgres version: {}", pg_version), _ => bail!("Unsupported postgres version: {}", pg_version),
} }
} }
@@ -663,6 +664,7 @@ impl PageServerConf {
match pg_version { match pg_version {
14 => Ok(self.pg_distrib_dir(pg_version)?.join("bin")), 14 => Ok(self.pg_distrib_dir(pg_version)?.join("bin")),
15 => Ok(self.pg_distrib_dir(pg_version)?.join("bin")), 15 => Ok(self.pg_distrib_dir(pg_version)?.join("bin")),
16 => Ok(self.pg_distrib_dir(pg_version)?.join("bin")),
_ => bail!("Unsupported postgres version: {}", pg_version), _ => bail!("Unsupported postgres version: {}", pg_version),
} }
} }
@@ -670,6 +672,7 @@ impl PageServerConf {
match pg_version { match pg_version {
14 => Ok(self.pg_distrib_dir(pg_version)?.join("lib")), 14 => Ok(self.pg_distrib_dir(pg_version)?.join("lib")),
15 => Ok(self.pg_distrib_dir(pg_version)?.join("lib")), 15 => Ok(self.pg_distrib_dir(pg_version)?.join("lib")),
16 => Ok(self.pg_distrib_dir(pg_version)?.join("lib")),
_ => bail!("Unsupported postgres version: {}", pg_version), _ => bail!("Unsupported postgres version: {}", pg_version),
} }
} }

View File

@@ -360,6 +360,7 @@ impl XlXactParsedRecord {
} }
} }
let mut xnodes = Vec::<RelFileNode>::new(); let mut xnodes = Vec::<RelFileNode>::new();
// In v16 this XACT_XINFO_HAS_RELFILENODES is renamed to XACT_XINFO_HAS_RELFILELOCATORS
if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 { if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
let nrels = buf.get_i32_le(); let nrels = buf.get_i32_le();
for _i in 0..nrels { for _i in 0..nrels {

View File

@@ -25,7 +25,11 @@
#include "pagestore_client.h" #include "pagestore_client.h"
#include "access/parallel.h" #include "access/parallel.h"
#include "postmaster/bgworker.h" #include "postmaster/bgworker.h"
#if PG_VERSION_NUM >= 160000
#include "storage/relfilelocator.h"
#else
#include "storage/relfilenode.h" #include "storage/relfilenode.h"
#endif
#include "storage/buf_internals.h" #include "storage/buf_internals.h"
#include "storage/latch.h" #include "storage/latch.h"
#include "storage/ipc.h" #include "storage/ipc.h"
@@ -39,6 +43,7 @@
#include "postmaster/bgworker.h" #include "postmaster/bgworker.h"
#include "postmaster/interrupt.h" #include "postmaster/interrupt.h"
/* /*
* Local file cache is used to temporary store relations pages in local file system. * Local file cache is used to temporary store relations pages in local file system.
* All blocks of all relations are stored inside one file and addressed using shared hash map. * All blocks of all relations are stored inside one file and addressed using shared hash map.
@@ -360,9 +365,12 @@ lfc_cache_contains(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno)
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */ if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
return false; return false;
tag.rnode = rnode; #if PG_VERSION_NUM >= 160000
tag.forkNum = forkNum; InitBufferTag(&tag, &rnode, forkNum, (blkno & ~(BLOCKS_PER_CHUNK-1)));
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1); #else
INIT_BUFFERTAG(tag, rnode, forkNum, (blkno & ~(BLOCKS_PER_CHUNK-1)));
#endif
hash = get_hash_value(lfc_hash, &tag); hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_SHARED); LWLockAcquire(lfc_lock, LW_SHARED);
@@ -387,7 +395,11 @@ lfc_evict(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno)
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */ if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
return; return;
#if PG_VERSION_NUM >= 160000
InitBufferTag(&tag, &rnode, forkNum, (blkno & ~(BLOCKS_PER_CHUNK-1)));
#else
INIT_BUFFERTAG(tag, rnode, forkNum, (blkno & ~(BLOCKS_PER_CHUNK-1))); INIT_BUFFERTAG(tag, rnode, forkNum, (blkno & ~(BLOCKS_PER_CHUNK-1)));
#endif
hash = get_hash_value(lfc_hash, &tag); hash = get_hash_value(lfc_hash, &tag);
@@ -457,10 +469,12 @@ lfc_read(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */ if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
return false; return false;
#if PG_VERSION_NUM >= 160000
InitBufferTag(&tag, &rnode, forkNum, (blkno & ~(BLOCKS_PER_CHUNK-1)));
#else
INIT_BUFFERTAG(tag, rnode, forkNum, (blkno & ~(BLOCKS_PER_CHUNK-1)));
#endif
tag.rnode = rnode;
tag.forkNum = forkNum;
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1);
hash = get_hash_value(lfc_hash, &tag); hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE); LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
@@ -526,9 +540,12 @@ lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */ if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
return; return;
tag.rnode = rnode; #if PG_VERSION_NUM >= 160000
tag.forkNum = forkNum; InitBufferTag(&tag, &rnode, forkNum, (blkno & ~(BLOCKS_PER_CHUNK-1)));
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1); #else
INIT_BUFFERTAG(tag, rnode, forkNum, (blkno & ~(BLOCKS_PER_CHUNK-1)));
#endif
hash = get_hash_value(lfc_hash, &tag); hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE); LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
@@ -722,9 +739,16 @@ local_cache_pages(PG_FUNCTION_ARGS)
if (entry->bitmap[i >> 5] & (1 << (i & 31))) if (entry->bitmap[i >> 5] & (1 << (i & 31)))
{ {
fctx->record[n_pages].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i; fctx->record[n_pages].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i;
#if PG_VERSION_NUM >= 160000
fctx->record[n_pages].relfilenode = entry->key.relNumber;
fctx->record[n_pages].reltablespace = entry->key.spcOid;
fctx->record[n_pages].reldatabase = entry->key.dbOid;
#else
fctx->record[n_pages].relfilenode = entry->key.rnode.relNode; fctx->record[n_pages].relfilenode = entry->key.rnode.relNode;
fctx->record[n_pages].reltablespace = entry->key.rnode.spcNode; fctx->record[n_pages].reltablespace = entry->key.rnode.spcNode;
fctx->record[n_pages].reldatabase = entry->key.rnode.dbNode; fctx->record[n_pages].reldatabase = entry->key.rnode.dbNode;
#endif
fctx->record[n_pages].forknum = entry->key.forkNum; fctx->record[n_pages].forknum = entry->key.forkNum;
fctx->record[n_pages].blocknum = entry->key.blockNum + i; fctx->record[n_pages].blocknum = entry->key.blockNum + i;
fctx->record[n_pages].accesscount = entry->access_count; fctx->record[n_pages].accesscount = entry->access_count;

View File

@@ -16,7 +16,11 @@
#include "postgres.h" #include "postgres.h"
#include "access/xlogdefs.h" #include "access/xlogdefs.h"
#if PG_VERSION_NUM >= 160000
#include "storage/relfilelocator.h"
#else
#include "storage/relfilenode.h" #include "storage/relfilenode.h"
#endif
#include "storage/block.h" #include "storage/block.h"
#include "storage/smgr.h" #include "storage/smgr.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
@@ -25,6 +29,34 @@
#include "pg_config.h" #include "pg_config.h"
// This is a hack to avoid too many ifdefs in the function definitions.
#if PG_VERSION_NUM >= 160000
typedef RelFileLocator RelFileNode;
typedef RelFileLocatorBackend RelFileNodeBackend;
#define RelFileNodeBackendIsTemp RelFileLocatorBackendIsTemp
#endif
#if PG_VERSION_NUM >= 160000
#define RelnGetRnode(reln) (reln->smgr_rlocator.locator)
#define RnodeGetSpcOid(rnode) (rnode.spcOid)
#define RnodeGetDbOid(rnode) (rnode.dbOid)
#define RnodeGetRelNumber(rnode) (rnode.relNumber)
#define BufTagGetRnode(tag) (BufTagGetRelFileLocator(&tag))
#else
#define RelnGetRnode(reln) (reln->smgr_rnode.node)
#define RnodeGetSpcOid(rnode) (rnode.spcNode)
#define RnodeGetDbOid(rnode) (rnode.dbNode)
#define RnodeGetRelNumber(rnode) (rnode.relNode)
#define BufTagGetRnode(tag) (tag.rnode)
#endif
#define RelnGetSpcOid(reln) (RnodeGetRelNumber(RelnGetRnode(reln)))
#define RelnGetDbOid(reln) (RnodeGetDbOid(RelnGetRnode(reln)))
#define RelnGetRelNumber(reln) (RnodeGetRelNumber(RelnGetRnode(reln)))
typedef enum typedef enum
{ {
/* pagestore_client -> pagestore */ /* pagestore_client -> pagestore */
@@ -85,7 +117,7 @@ typedef struct
typedef struct typedef struct
{ {
NeonRequest req; NeonRequest req;
Oid dbNode; Oid dbOid;
} NeonDbSizeRequest; } NeonDbSizeRequest;
typedef struct typedef struct

View File

@@ -58,7 +58,11 @@
#include "postmaster/autovacuum.h" #include "postmaster/autovacuum.h"
#include "replication/walsender.h" #include "replication/walsender.h"
#include "storage/bufmgr.h" #include "storage/bufmgr.h"
#if PG_VERSION_NUM >= 160000
#include "storage/relfilelocator.h"
#else
#include "storage/relfilenode.h" #include "storage/relfilenode.h"
#endif
#include "storage/buf_internals.h" #include "storage/buf_internals.h"
#include "storage/smgr.h" #include "storage/smgr.h"
#include "storage/md.h" #include "storage/md.h"
@@ -70,6 +74,8 @@
#include "access/xlogrecovery.h" #include "access/xlogrecovery.h"
#endif #endif
/* /*
* If DEBUG_COMPARE_LOCAL is defined, we pass through all the SMGR API * If DEBUG_COMPARE_LOCAL is defined, we pass through all the SMGR API
* calls to md.c, and *also* do the calls to the Page Server. On every * calls to md.c, and *also* do the calls to the Page Server. On every
@@ -86,7 +92,10 @@
static char *hexdump_page(char *page); static char *hexdump_page(char *page);
#endif #endif
#define IS_LOCAL_REL(reln) (reln->smgr_rnode.node.dbNode != 0 && reln->smgr_rnode.node.relNode > FirstNormalObjectId)
#define IS_LOCAL_REL(reln) (RelnGetDbOid(reln) != 0 && RelnGetRelNumber(reln) > FirstNormalObjectId)
const int SmgrTrace = DEBUG5; const int SmgrTrace = DEBUG5;
@@ -184,7 +193,13 @@ typedef struct PrfHashEntry {
sizeof(BufferTag) \ sizeof(BufferTag) \
) )
#if PG_VERSION_NUM >= 160000
#define SH_EQUAL(tb, a, b) (BufferTagsEqual(&((a)->buftag),&((b)->buftag)))
#else
#define SH_EQUAL(tb, a, b) (BUFFERTAGS_EQUAL((a)->buftag, (b)->buftag)) #define SH_EQUAL(tb, a, b) (BUFFERTAGS_EQUAL((a)->buftag, (b)->buftag))
#endif
#define SH_SCOPE static inline #define SH_SCOPE static inline
#define SH_DEFINE #define SH_DEFINE
#define SH_DECLARE #define SH_DECLARE
@@ -634,7 +649,7 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
.req.tag = T_NeonGetPageRequest, .req.tag = T_NeonGetPageRequest,
.req.latest = false, .req.latest = false,
.req.lsn = 0, .req.lsn = 0,
.rnode = slot->buftag.rnode, .rnode = BufTagGetRnode(slot->buftag),
.forknum = slot->buftag.forkNum, .forknum = slot->buftag.forkNum,
.blkno = slot->buftag.blockNum, .blkno = slot->buftag.blockNum,
}; };
@@ -649,7 +664,7 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
{ {
XLogRecPtr lsn = neon_get_request_lsn( XLogRecPtr lsn = neon_get_request_lsn(
&request.req.latest, &request.req.latest,
slot->buftag.rnode, BufTagGetRnode(slot->buftag),
slot->buftag.forkNum, slot->buftag.forkNum,
slot->buftag.blockNum slot->buftag.blockNum
); );
@@ -729,8 +744,11 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
Assert(slot->status != PRFS_UNUSED); Assert(slot->status != PRFS_UNUSED);
Assert(MyPState->ring_last <= ring_index && Assert(MyPState->ring_last <= ring_index &&
ring_index < MyPState->ring_unused); ring_index < MyPState->ring_unused);
#if PG_VERSION_NUM >= 160000
Assert(BufferTagsEqual(&slot->buftag, &tag));
#else
Assert(BUFFERTAGS_EQUAL(slot->buftag, tag)); Assert(BUFFERTAGS_EQUAL(slot->buftag, tag));
#endif
/* /*
* If we want a specific lsn, we do not accept requests that were made * If we want a specific lsn, we do not accept requests that were made
* with a potentially different LSN. * with a potentially different LSN.
@@ -893,9 +911,9 @@ nm_pack_request(NeonRequest * msg)
pq_sendbyte(&s, msg_req->req.latest); pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn); pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, msg_req->rnode.spcNode); pq_sendint32(&s, RnodeGetSpcOid(msg_req->rnode));
pq_sendint32(&s, msg_req->rnode.dbNode); pq_sendint32(&s, RnodeGetDbOid(msg_req->rnode));
pq_sendint32(&s, msg_req->rnode.relNode); pq_sendint32(&s, RnodeGetRelNumber(msg_req->rnode));
pq_sendbyte(&s, msg_req->forknum); pq_sendbyte(&s, msg_req->forknum);
break; break;
@@ -906,9 +924,9 @@ nm_pack_request(NeonRequest * msg)
pq_sendbyte(&s, msg_req->req.latest); pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn); pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, msg_req->rnode.spcNode); pq_sendint32(&s, RnodeGetSpcOid(msg_req->rnode));
pq_sendint32(&s, msg_req->rnode.dbNode); pq_sendint32(&s, RnodeGetDbOid(msg_req->rnode));
pq_sendint32(&s, msg_req->rnode.relNode); pq_sendint32(&s, RnodeGetRelNumber(msg_req->rnode));
pq_sendbyte(&s, msg_req->forknum); pq_sendbyte(&s, msg_req->forknum);
break; break;
@@ -919,7 +937,7 @@ nm_pack_request(NeonRequest * msg)
pq_sendbyte(&s, msg_req->req.latest); pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn); pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, msg_req->dbNode); pq_sendint32(&s, msg_req->dbOid);
break; break;
} }
@@ -929,9 +947,9 @@ nm_pack_request(NeonRequest * msg)
pq_sendbyte(&s, msg_req->req.latest); pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn); pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, msg_req->rnode.spcNode); pq_sendint32(&s, RnodeGetSpcOid(msg_req->rnode));
pq_sendint32(&s, msg_req->rnode.dbNode); pq_sendint32(&s, RnodeGetDbOid(msg_req->rnode));
pq_sendint32(&s, msg_req->rnode.relNode); pq_sendint32(&s, RnodeGetRelNumber(msg_req->rnode));
pq_sendbyte(&s, msg_req->forknum); pq_sendbyte(&s, msg_req->forknum);
pq_sendint32(&s, msg_req->blkno); pq_sendint32(&s, msg_req->blkno);
@@ -1064,9 +1082,9 @@ nm_to_string(NeonMessage * msg)
appendStringInfoString(&s, "{\"type\": \"NeonExistsRequest\""); appendStringInfoString(&s, "{\"type\": \"NeonExistsRequest\"");
appendStringInfo(&s, ", \"rnode\": \"%u/%u/%u\"", appendStringInfo(&s, ", \"rnode\": \"%u/%u/%u\"",
msg_req->rnode.spcNode, RnodeGetSpcOid(msg_req->rnode),
msg_req->rnode.dbNode, RnodeGetDbOid(msg_req->rnode),
msg_req->rnode.relNode); RnodeGetRelNumber(msg_req->rnode));
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum); appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn)); appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest); appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
@@ -1080,9 +1098,9 @@ nm_to_string(NeonMessage * msg)
appendStringInfoString(&s, "{\"type\": \"NeonNblocksRequest\""); appendStringInfoString(&s, "{\"type\": \"NeonNblocksRequest\"");
appendStringInfo(&s, ", \"rnode\": \"%u/%u/%u\"", appendStringInfo(&s, ", \"rnode\": \"%u/%u/%u\"",
msg_req->rnode.spcNode, RnodeGetSpcOid(msg_req->rnode),
msg_req->rnode.dbNode, RnodeGetDbOid(msg_req->rnode),
msg_req->rnode.relNode); RnodeGetRelNumber(msg_req->rnode));
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum); appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn)); appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest); appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
@@ -1096,9 +1114,9 @@ nm_to_string(NeonMessage * msg)
appendStringInfoString(&s, "{\"type\": \"NeonGetPageRequest\""); appendStringInfoString(&s, "{\"type\": \"NeonGetPageRequest\"");
appendStringInfo(&s, ", \"rnode\": \"%u/%u/%u\"", appendStringInfo(&s, ", \"rnode\": \"%u/%u/%u\"",
msg_req->rnode.spcNode, RnodeGetSpcOid(msg_req->rnode),
msg_req->rnode.dbNode, RnodeGetDbOid(msg_req->rnode),
msg_req->rnode.relNode); RnodeGetRelNumber(msg_req->rnode));
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum); appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
appendStringInfo(&s, ", \"blkno\": %u", msg_req->blkno); appendStringInfo(&s, ", \"blkno\": %u", msg_req->blkno);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn)); appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
@@ -1111,7 +1129,7 @@ nm_to_string(NeonMessage * msg)
NeonDbSizeRequest *msg_req = (NeonDbSizeRequest *) msg; NeonDbSizeRequest *msg_req = (NeonDbSizeRequest *) msg;
appendStringInfoString(&s, "{\"type\": \"NeonDbSizeRequest\""); appendStringInfoString(&s, "{\"type\": \"NeonDbSizeRequest\"");
appendStringInfo(&s, ", \"dbnode\": \"%u\"", msg_req->dbNode); appendStringInfo(&s, ", \"dbnode\": \"%u\"", msg_req->dbOid);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn)); appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest); appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
appendStringInfoChar(&s, '}'); appendStringInfoChar(&s, '}');
@@ -1213,6 +1231,7 @@ static void
neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool force) neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool force)
{ {
XLogRecPtr lsn = PageGetLSN(buffer); XLogRecPtr lsn = PageGetLSN(buffer);
RelFileNode rnode = RelnGetRnode(reln);
if (ShutdownRequestPending) if (ShutdownRequestPending)
return; return;
@@ -1232,15 +1251,16 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ch
/* FSM is never WAL-logged and we don't care. */ /* FSM is never WAL-logged and we don't care. */
XLogRecPtr recptr; XLogRecPtr recptr;
recptr = log_newpage_copy(&reln->smgr_rnode.node, forknum, blocknum, buffer, false);
recptr = log_newpage_copy(&rnode, forknum, blocknum, buffer, false);
XLogFlush(recptr); XLogFlush(recptr);
lsn = recptr; lsn = recptr;
ereport(SmgrTrace, ereport(SmgrTrace,
(errmsg("Page %u of relation %u/%u/%u.%u was force logged. Evicted at lsn=%X/%X", (errmsg("Page %u of relation %u/%u/%u.%u was force logged. Evicted at lsn=%X/%X",
blocknum, blocknum,
reln->smgr_rnode.node.spcNode, RelnGetSpcOid(reln),
reln->smgr_rnode.node.dbNode, RelnGetDbOid(reln),
reln->smgr_rnode.node.relNode, RelnGetRelNumber(reln),
forknum, LSN_FORMAT_ARGS(lsn)))); forknum, LSN_FORMAT_ARGS(lsn))));
} }
else if (lsn == InvalidXLogRecPtr) else if (lsn == InvalidXLogRecPtr)
@@ -1268,9 +1288,9 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ch
ereport(SmgrTrace, ereport(SmgrTrace,
(errmsg("Page %u of relation %u/%u/%u.%u is all-zeros", (errmsg("Page %u of relation %u/%u/%u.%u is all-zeros",
blocknum, blocknum,
reln->smgr_rnode.node.spcNode, RelnGetSpcOid(reln),
reln->smgr_rnode.node.dbNode, RelnGetDbOid(reln),
reln->smgr_rnode.node.relNode, RelnGetRelNumber(reln),
forknum))); forknum)));
} }
else if (PageIsEmptyHeapPage(buffer)) else if (PageIsEmptyHeapPage(buffer))
@@ -1278,9 +1298,9 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ch
ereport(SmgrTrace, ereport(SmgrTrace,
(errmsg("Page %u of relation %u/%u/%u.%u is an empty heap page with no LSN", (errmsg("Page %u of relation %u/%u/%u.%u is an empty heap page with no LSN",
blocknum, blocknum,
reln->smgr_rnode.node.spcNode, RelnGetSpcOid(reln),
reln->smgr_rnode.node.dbNode, RelnGetDbOid(reln),
reln->smgr_rnode.node.relNode, RelnGetRelNumber(reln),
forknum))); forknum)));
} }
else else
@@ -1288,9 +1308,9 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ch
ereport(PANIC, ereport(PANIC,
(errmsg("Page %u of relation %u/%u/%u.%u is evicted with zero LSN", (errmsg("Page %u of relation %u/%u/%u.%u is evicted with zero LSN",
blocknum, blocknum,
reln->smgr_rnode.node.spcNode, RelnGetSpcOid(reln),
reln->smgr_rnode.node.dbNode, RelnGetDbOid(reln),
reln->smgr_rnode.node.relNode, RelnGetRelNumber(reln),
forknum))); forknum)));
} }
} }
@@ -1299,9 +1319,9 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ch
ereport(SmgrTrace, ereport(SmgrTrace,
(errmsg("Page %u of relation %u/%u/%u.%u is already wal logged at lsn=%X/%X", (errmsg("Page %u of relation %u/%u/%u.%u is already wal logged at lsn=%X/%X",
blocknum, blocknum,
reln->smgr_rnode.node.spcNode, RelnGetSpcOid(reln),
reln->smgr_rnode.node.dbNode, RelnGetDbOid(reln),
reln->smgr_rnode.node.relNode, RelnGetRelNumber(reln),
forknum, LSN_FORMAT_ARGS(lsn)))); forknum, LSN_FORMAT_ARGS(lsn))));
} }
@@ -1309,7 +1329,7 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ch
* Remember the LSN on this page. When we read the page again, we must * Remember the LSN on this page. When we read the page again, we must
* read the same or newer version of it. * read the same or newer version of it.
*/ */
SetLastWrittenLSNForBlock(lsn, reln->smgr_rnode.node, forknum, blocknum); SetLastWrittenLSNForBlock(lsn, rnode, forknum, blocknum);
} }
/* /*
@@ -1459,6 +1479,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
BlockNumber n_blocks; BlockNumber n_blocks;
bool latest; bool latest;
XLogRecPtr request_lsn; XLogRecPtr request_lsn;
RelFileNode rnode = RelnGetRnode(reln);
switch (reln->smgr_relpersistence) switch (reln->smgr_relpersistence)
{ {
@@ -1485,7 +1506,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
} }
if (get_cached_relsize(reln->smgr_rnode.node, forkNum, &n_blocks)) if (get_cached_relsize(RelnGetRnode(reln), forkNum, &n_blocks))
{ {
return true; return true;
} }
@@ -1500,20 +1521,20 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
* *
* For now, handle that special case here. * For now, handle that special case here.
*/ */
if (reln->smgr_rnode.node.spcNode == 0 && if (RelnGetSpcOid(reln) == 0 &&
reln->smgr_rnode.node.dbNode == 0 && RelnGetDbOid(reln) == 0 &&
reln->smgr_rnode.node.relNode == 0) RelnGetRelNumber(reln) == 0)
{ {
return false; return false;
} }
request_lsn = neon_get_request_lsn(&latest, reln->smgr_rnode.node, forkNum, REL_METADATA_PSEUDO_BLOCKNO); request_lsn = neon_get_request_lsn(&latest, rnode, forkNum, REL_METADATA_PSEUDO_BLOCKNO);
{ {
NeonExistsRequest request = { NeonExistsRequest request = {
.req.tag = T_NeonExistsRequest, .req.tag = T_NeonExistsRequest,
.req.latest = latest, .req.latest = latest,
.req.lsn = request_lsn, .req.lsn = request_lsn,
.rnode = reln->smgr_rnode.node, .rnode = rnode,
.forknum = forkNum}; .forknum = forkNum};
resp = page_server_request(&request); resp = page_server_request(&request);
@@ -1529,9 +1550,9 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_IO_ERROR), (errcode(ERRCODE_IO_ERROR),
errmsg("could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X", errmsg("could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X",
reln->smgr_rnode.node.spcNode, RelnGetSpcOid(reln),
reln->smgr_rnode.node.dbNode, RelnGetDbOid(reln),
reln->smgr_rnode.node.relNode, RelnGetRelNumber(reln),
forkNum, forkNum,
(uint32) (request_lsn >> 32), (uint32) request_lsn), (uint32) (request_lsn >> 32), (uint32) request_lsn),
errdetail("page server returned error: %s", errdetail("page server returned error: %s",
@@ -1553,6 +1574,8 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
void void
neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo) neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
{ {
RelFileNode rnode = RelnGetRnode(reln);
switch (reln->smgr_relpersistence) switch (reln->smgr_relpersistence)
{ {
case 0: case 0:
@@ -1571,9 +1594,8 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
} }
elog(SmgrTrace, "Create relation %u/%u/%u.%u", elog(SmgrTrace, "Create relation %u/%u/%u.%u",
reln->smgr_rnode.node.spcNode, RelnGetSpcOid(reln),
reln->smgr_rnode.node.dbNode, RelnGetDbOid(reln), RelnGetRelNumber(reln),
reln->smgr_rnode.node.relNode,
forkNum); forkNum);
/* /*
@@ -1597,12 +1619,12 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
*/ */
if (isRedo) if (isRedo)
{ {
update_cached_relsize(reln->smgr_rnode.node, forkNum, 0); update_cached_relsize(rnode, forkNum, 0);
get_cached_relsize(reln->smgr_rnode.node, forkNum, get_cached_relsize(rnode, forkNum,
&reln->smgr_cached_nblocks[forkNum]); &reln->smgr_cached_nblocks[forkNum]);
} }
else else
set_cached_relsize(reln->smgr_rnode.node, forkNum, 0); set_cached_relsize(rnode, forkNum, 0);
#ifdef DEBUG_COMPARE_LOCAL #ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln)) if (IS_LOCAL_REL(reln))
@@ -1639,7 +1661,12 @@ neon_unlink(RelFileNodeBackend rnode, ForkNumber forkNum, bool isRedo)
mdunlink(rnode, forkNum, isRedo); mdunlink(rnode, forkNum, isRedo);
if (!RelFileNodeBackendIsTemp(rnode)) if (!RelFileNodeBackendIsTemp(rnode))
{ {
#if PG_VERSION_NUM >= 160000
forget_cached_relsize(rnode.locator, forkNum);
#else
forget_cached_relsize(rnode.node, forkNum); forget_cached_relsize(rnode.node, forkNum);
#endif
} }
} }
@@ -1658,6 +1685,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
{ {
XLogRecPtr lsn; XLogRecPtr lsn;
BlockNumber n_blocks = 0; BlockNumber n_blocks = 0;
RelFileNode rnode = RelnGetRnode(reln);
switch (reln->smgr_relpersistence) switch (reln->smgr_relpersistence)
{ {
@@ -1707,17 +1735,16 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
neon_wallog_page(reln, forkNum, n_blocks++, buffer, true); neon_wallog_page(reln, forkNum, n_blocks++, buffer, true);
neon_wallog_page(reln, forkNum, blkno, buffer, false); neon_wallog_page(reln, forkNum, blkno, buffer, false);
set_cached_relsize(reln->smgr_rnode.node, forkNum, blkno + 1); set_cached_relsize(rnode, forkNum, blkno + 1);
lsn = PageGetLSN(buffer); lsn = PageGetLSN(buffer);
elog(SmgrTrace, "smgrextend called for %u/%u/%u.%u blk %u, page LSN: %X/%08X", elog(SmgrTrace, "smgrextend called for %u/%u/%u.%u blk %u, page LSN: %X/%08X",
reln->smgr_rnode.node.spcNode, RelnGetSpcOid(reln),
reln->smgr_rnode.node.dbNode, RelnGetDbOid(reln), RelnGetRelNumber(reln),
reln->smgr_rnode.node.relNode,
forkNum, blkno, forkNum, blkno,
(uint32) (lsn >> 32), (uint32) lsn); (uint32) (lsn >> 32), (uint32) lsn);
lfc_write(reln->smgr_rnode.node, forkNum, blkno, buffer); lfc_write(rnode, forkNum, blkno, buffer);
#ifdef DEBUG_COMPARE_LOCAL #ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln)) if (IS_LOCAL_REL(reln))
@@ -1732,9 +1759,9 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
if (lsn == InvalidXLogRecPtr) if (lsn == InvalidXLogRecPtr)
{ {
lsn = GetXLogInsertRecPtr(); lsn = GetXLogInsertRecPtr();
SetLastWrittenLSNForBlock(lsn, reln->smgr_rnode.node, forkNum, blkno); SetLastWrittenLSNForBlock(lsn, rnode, forkNum, blkno);
} }
SetLastWrittenLSNForRelation(lsn, reln->smgr_rnode.node, forkNum); SetLastWrittenLSNForRelation(lsn, rnode, forkNum);
} }
/* /*
@@ -1778,6 +1805,8 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
BufferTag tag; BufferTag tag;
uint64 ring_index PG_USED_FOR_ASSERTS_ONLY; uint64 ring_index PG_USED_FOR_ASSERTS_ONLY;
RelFileNode rnode = RelnGetRnode(reln);
switch (reln->smgr_relpersistence) switch (reln->smgr_relpersistence)
{ {
case 0: /* probably shouldn't happen, but ignore it */ case 0: /* probably shouldn't happen, but ignore it */
@@ -1792,15 +1821,18 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
} }
if (lfc_cache_contains(reln->smgr_rnode.node, forknum, blocknum)) if (lfc_cache_contains(rnode, forknum, blocknum))
return false; return false;
#if PG_VERSION_NUM >= 160000
InitBufferTag(&tag, &rnode, forknum, blocknum);
#else
tag = (BufferTag) { tag = (BufferTag) {
.rnode = reln->smgr_rnode.node, .rnode = rnode,
.forkNum = forknum, .forkNum = forknum,
.blockNum = blocknum .blockNum = blocknum
}; };
#endif
ring_index = prefetch_register_buffer(tag, NULL, NULL); ring_index = prefetch_register_buffer(tag, NULL, NULL);
Assert(ring_index < MyPState->ring_unused && Assert(ring_index < MyPState->ring_unused &&
@@ -1861,11 +1893,15 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
PrfHashEntry *entry; PrfHashEntry *entry;
PrefetchRequest *slot; PrefetchRequest *slot;
#if PG_VERSION_NUM >= 160000
InitBufferTag(&buftag, &rnode, forkNum, blkno);
#else
buftag = (BufferTag) { buftag = (BufferTag) {
.rnode = rnode, .rnode = rnode,
.forkNum = forkNum, .forkNum = forkNum,
.blockNum = blkno, .blockNum = blkno
}; };
#endif
/* /*
* The redo process does not lock pages that it needs to replay but are * The redo process does not lock pages that it needs to replay but are
@@ -1965,9 +2001,9 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
(errcode(ERRCODE_IO_ERROR), (errcode(ERRCODE_IO_ERROR),
errmsg("could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X", errmsg("could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X",
blkno, blkno,
rnode.spcNode, RnodeGetSpcOid(rnode),
rnode.dbNode, RnodeGetDbOid(rnode),
rnode.relNode, RnodeGetRelNumber(rnode),
forkNum, forkNum,
(uint32) (request_lsn >> 32), (uint32) request_lsn), (uint32) (request_lsn >> 32), (uint32) request_lsn),
errdetail("page server returned error: %s", errdetail("page server returned error: %s",
@@ -1991,6 +2027,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
{ {
bool latest; bool latest;
XLogRecPtr request_lsn; XLogRecPtr request_lsn;
RelFileNode rnode = RelnGetRnode(reln);
switch (reln->smgr_relpersistence) switch (reln->smgr_relpersistence)
{ {
@@ -2010,13 +2047,13 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
} }
/* Try to read from local file cache */ /* Try to read from local file cache */
if (lfc_read(reln->smgr_rnode.node, forkNum, blkno, buffer)) if (lfc_read(RelnGetRnode(reln), forkNum, blkno, buffer))
{ {
return; return;
} }
request_lsn = neon_get_request_lsn(&latest, reln->smgr_rnode.node, forkNum, blkno); request_lsn = neon_get_request_lsn(&latest, rnode, forkNum, blkno);
neon_read_at_lsn(reln->smgr_rnode.node, forkNum, blkno, request_lsn, latest, buffer); neon_read_at_lsn(rnode, forkNum, blkno, request_lsn, latest, buffer);
#ifdef DEBUG_COMPARE_LOCAL #ifdef DEBUG_COMPARE_LOCAL
if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln)) if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
@@ -2036,9 +2073,9 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
{ {
elog(PANIC, "page is new in MD but not in Page Server at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n", elog(PANIC, "page is new in MD but not in Page Server at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n",
blkno, blkno,
reln->smgr_rnode.node.spcNode, RelnGetSpcOid(reln),
reln->smgr_rnode.node.dbNode, RelnGetDbOid(reln),
reln->smgr_rnode.node.relNode, RelnGetRelNumber(reln),
forkNum, forkNum,
(uint32) (request_lsn >> 32), (uint32) request_lsn, (uint32) (request_lsn >> 32), (uint32) request_lsn,
hexdump_page(buffer)); hexdump_page(buffer));
@@ -2048,9 +2085,9 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
{ {
elog(PANIC, "page is new in Page Server but not in MD at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n", elog(PANIC, "page is new in Page Server but not in MD at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n",
blkno, blkno,
reln->smgr_rnode.node.spcNode, RelnGetSpcOid(reln),
reln->smgr_rnode.node.dbNode, RelnGetDbOid(reln),
reln->smgr_rnode.node.relNode, RelnGetRelNumber(reln),
forkNum, forkNum,
(uint32) (request_lsn >> 32), (uint32) request_lsn, (uint32) (request_lsn >> 32), (uint32) request_lsn,
hexdump_page(mdbuf)); hexdump_page(mdbuf));
@@ -2065,9 +2102,9 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
{ {
elog(PANIC, "heap buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n", elog(PANIC, "heap buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n",
blkno, blkno,
reln->smgr_rnode.node.spcNode, RelnGetSpcOid(reln),
reln->smgr_rnode.node.dbNode, RelnGetDbOid(reln),
reln->smgr_rnode.node.relNode, RelnGetRelNumber(reln),
forkNum, forkNum,
(uint32) (request_lsn >> 32), (uint32) request_lsn, (uint32) (request_lsn >> 32), (uint32) request_lsn,
hexdump_page(mdbuf_masked), hexdump_page(mdbuf_masked),
@@ -2086,9 +2123,9 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
{ {
elog(PANIC, "btree buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n", elog(PANIC, "btree buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n",
blkno, blkno,
reln->smgr_rnode.node.spcNode, RelnGetSpcOid(reln),
reln->smgr_rnode.node.dbNode, RelnGetDbOid(reln),
reln->smgr_rnode.node.relNode, RelnGetRelNumber(reln),
forkNum, forkNum,
(uint32) (request_lsn >> 32), (uint32) request_lsn, (uint32) (request_lsn >> 32), (uint32) request_lsn,
hexdump_page(mdbuf_masked), hexdump_page(mdbuf_masked),
@@ -2133,7 +2170,7 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
char *buffer, bool skipFsync) char *buffer, bool skipFsync)
{ {
XLogRecPtr lsn; XLogRecPtr lsn;
RelFileNode rnode = RelnGetRnode(reln);
switch (reln->smgr_relpersistence) switch (reln->smgr_relpersistence)
{ {
case 0: case 0:
@@ -2170,13 +2207,12 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
lsn = PageGetLSN(buffer); lsn = PageGetLSN(buffer);
elog(SmgrTrace, "smgrwrite called for %u/%u/%u.%u blk %u, page LSN: %X/%08X", elog(SmgrTrace, "smgrwrite called for %u/%u/%u.%u blk %u, page LSN: %X/%08X",
reln->smgr_rnode.node.spcNode, RelnGetSpcOid(reln),
reln->smgr_rnode.node.dbNode, RelnGetDbOid(reln), RelnGetRelNumber(reln),
reln->smgr_rnode.node.relNode,
forknum, blocknum, forknum, blocknum,
(uint32) (lsn >> 32), (uint32) lsn); (uint32) (lsn >> 32), (uint32) lsn);
lfc_write(reln->smgr_rnode.node, forknum, blocknum, buffer); lfc_write(rnode, forknum, blocknum, buffer);
#ifdef DEBUG_COMPARE_LOCAL #ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln)) if (IS_LOCAL_REL(reln))
@@ -2194,6 +2230,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
BlockNumber n_blocks; BlockNumber n_blocks;
bool latest; bool latest;
XLogRecPtr request_lsn; XLogRecPtr request_lsn;
RelFileNode rnode = RelnGetRnode(reln);
switch (reln->smgr_relpersistence) switch (reln->smgr_relpersistence)
{ {
@@ -2212,23 +2249,23 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
} }
if (get_cached_relsize(reln->smgr_rnode.node, forknum, &n_blocks)) if (get_cached_relsize(RelnGetRnode(reln), forknum, &n_blocks))
{ {
elog(SmgrTrace, "cached nblocks for %u/%u/%u.%u: %u blocks", elog(SmgrTrace, "cached nblocks for %u/%u/%u.%u: %u blocks",
reln->smgr_rnode.node.spcNode, RelnGetSpcOid(reln),
reln->smgr_rnode.node.dbNode, RelnGetDbOid(reln),
reln->smgr_rnode.node.relNode, RelnGetRelNumber(reln),
forknum, n_blocks); forknum, n_blocks);
return n_blocks; return n_blocks;
} }
request_lsn = neon_get_request_lsn(&latest, reln->smgr_rnode.node, forknum, REL_METADATA_PSEUDO_BLOCKNO); request_lsn = neon_get_request_lsn(&latest, rnode, forknum, REL_METADATA_PSEUDO_BLOCKNO);
{ {
NeonNblocksRequest request = { NeonNblocksRequest request = {
.req.tag = T_NeonNblocksRequest, .req.tag = T_NeonNblocksRequest,
.req.latest = latest, .req.latest = latest,
.req.lsn = request_lsn, .req.lsn = request_lsn,
.rnode = reln->smgr_rnode.node, .rnode = rnode,
.forknum = forknum, .forknum = forknum,
}; };
@@ -2245,9 +2282,9 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_IO_ERROR), (errcode(ERRCODE_IO_ERROR),
errmsg("could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X", errmsg("could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X",
reln->smgr_rnode.node.spcNode, RelnGetSpcOid(reln),
reln->smgr_rnode.node.dbNode, RelnGetDbOid(reln),
reln->smgr_rnode.node.relNode, RelnGetRelNumber(reln),
forknum, forknum,
(uint32) (request_lsn >> 32), (uint32) request_lsn), (uint32) (request_lsn >> 32), (uint32) request_lsn),
errdetail("page server returned error: %s", errdetail("page server returned error: %s",
@@ -2257,12 +2294,11 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
default: default:
elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag); elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
} }
update_cached_relsize(reln->smgr_rnode.node, forknum, n_blocks); update_cached_relsize(rnode, forknum, n_blocks);
elog(SmgrTrace, "neon_nblocks: rel %u/%u/%u fork %u (request LSN %X/%08X): %u blocks", elog(SmgrTrace, "neon_nblocks: rel %u/%u/%u fork %u (request LSN %X/%08X): %u blocks",
reln->smgr_rnode.node.spcNode, RelnGetSpcOid(reln),
reln->smgr_rnode.node.dbNode, RelnGetDbOid(reln), RelnGetRelNumber(reln),
reln->smgr_rnode.node.relNode,
forknum, forknum,
(uint32) (request_lsn >> 32), (uint32) request_lsn, (uint32) (request_lsn >> 32), (uint32) request_lsn,
n_blocks); n_blocks);
@@ -2275,7 +2311,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
* neon_db_size() -- Get the size of the database in bytes. * neon_db_size() -- Get the size of the database in bytes.
*/ */
int64 int64
neon_dbsize(Oid dbNode) neon_dbsize(Oid dbOid)
{ {
NeonResponse *resp; NeonResponse *resp;
int64 db_size; int64 db_size;
@@ -2289,7 +2325,7 @@ neon_dbsize(Oid dbNode)
.req.tag = T_NeonDbSizeRequest, .req.tag = T_NeonDbSizeRequest,
.req.latest = latest, .req.latest = latest,
.req.lsn = request_lsn, .req.lsn = request_lsn,
.dbNode = dbNode, .dbOid = dbOid,
}; };
resp = page_server_request(&request); resp = page_server_request(&request);
@@ -2305,7 +2341,7 @@ neon_dbsize(Oid dbNode)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_IO_ERROR), (errcode(ERRCODE_IO_ERROR),
errmsg("could not read db size of db %u from page server at lsn %X/%08X", errmsg("could not read db size of db %u from page server at lsn %X/%08X",
dbNode, dbOid,
(uint32) (request_lsn >> 32), (uint32) request_lsn), (uint32) (request_lsn >> 32), (uint32) request_lsn),
errdetail("page server returned error: %s", errdetail("page server returned error: %s",
((NeonErrorResponse *) resp)->message))); ((NeonErrorResponse *) resp)->message)));
@@ -2316,7 +2352,7 @@ neon_dbsize(Oid dbNode)
} }
elog(SmgrTrace, "neon_dbsize: db %u (request LSN %X/%08X): %ld bytes", elog(SmgrTrace, "neon_dbsize: db %u (request LSN %X/%08X): %ld bytes",
dbNode, dbOid,
(uint32) (request_lsn >> 32), (uint32) request_lsn, (uint32) (request_lsn >> 32), (uint32) request_lsn,
db_size); db_size);
@@ -2331,6 +2367,7 @@ void
neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks) neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
{ {
XLogRecPtr lsn; XLogRecPtr lsn;
RelFileNode rnode = RelnGetRnode(reln);
switch (reln->smgr_relpersistence) switch (reln->smgr_relpersistence)
{ {
@@ -2350,7 +2387,7 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
} }
set_cached_relsize(reln->smgr_rnode.node, forknum, nblocks); set_cached_relsize(rnode, forknum, nblocks);
/* /*
* Truncating a relation drops all its buffers from the buffer cache * Truncating a relation drops all its buffers from the buffer cache
@@ -2378,7 +2415,7 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
* for the extended pages, so there's no harm in leaving behind obsolete * for the extended pages, so there's no harm in leaving behind obsolete
* entries for the truncated chunks. * entries for the truncated chunks.
*/ */
SetLastWrittenLSNForRelation(lsn, reln->smgr_rnode.node, forknum); SetLastWrittenLSNForRelation(lsn, rnode, forknum);
#ifdef DEBUG_COMPARE_LOCAL #ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln)) if (IS_LOCAL_REL(reln))
@@ -2448,9 +2485,9 @@ neon_start_unlogged_build(SMgrRelation reln)
ereport(SmgrTrace, ereport(SmgrTrace,
(errmsg("starting unlogged build of relation %u/%u/%u", (errmsg("starting unlogged build of relation %u/%u/%u",
reln->smgr_rnode.node.spcNode, RelnGetSpcOid(reln),
reln->smgr_rnode.node.dbNode, RelnGetDbOid(reln),
reln->smgr_rnode.node.relNode))); RelnGetRelNumber(reln))));
switch (reln->smgr_relpersistence) switch (reln->smgr_relpersistence)
{ {
@@ -2500,9 +2537,9 @@ neon_finish_unlogged_build_phase_1(SMgrRelation reln)
ereport(SmgrTrace, ereport(SmgrTrace,
(errmsg("finishing phase 1 of unlogged build of relation %u/%u/%u", (errmsg("finishing phase 1 of unlogged build of relation %u/%u/%u",
reln->smgr_rnode.node.spcNode, RelnGetSpcOid(reln),
reln->smgr_rnode.node.dbNode, RelnGetDbOid(reln),
reln->smgr_rnode.node.relNode))); RelnGetRelNumber(reln))));
if (unlogged_build_phase == UNLOGGED_BUILD_NOT_PERMANENT) if (unlogged_build_phase == UNLOGGED_BUILD_NOT_PERMANENT)
return; return;
@@ -2529,9 +2566,9 @@ neon_end_unlogged_build(SMgrRelation reln)
ereport(SmgrTrace, ereport(SmgrTrace,
(errmsg("ending unlogged build of relation %u/%u/%u", (errmsg("ending unlogged build of relation %u/%u/%u",
reln->smgr_rnode.node.spcNode, RelnGetSpcOid(reln),
reln->smgr_rnode.node.dbNode, RelnGetDbOid(reln),
reln->smgr_rnode.node.relNode))); RelnGetRelNumber(reln))));
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_PERMANENT) if (unlogged_build_phase != UNLOGGED_BUILD_NOT_PERMANENT)
{ {
@@ -2544,16 +2581,24 @@ neon_end_unlogged_build(SMgrRelation reln)
reln->smgr_relpersistence = RELPERSISTENCE_PERMANENT; reln->smgr_relpersistence = RELPERSISTENCE_PERMANENT;
/* Remove local copy */ /* Remove local copy */
rnode = reln->smgr_rnode; #if PG_VERSION_NUM >= 160000
rnode.locator = RelnGetRnode(reln);
#else
rnode.node = RelnGetRnode(reln);
#endif
for (int forknum = 0; forknum <= MAX_FORKNUM; forknum++) for (int forknum = 0; forknum <= MAX_FORKNUM; forknum++)
{ {
elog(SmgrTrace, "forgetting cached relsize for %u/%u/%u.%u", elog(SmgrTrace, "forgetting cached relsize for %u/%u/%u.%u",
rnode.node.spcNode, RelnGetSpcOid(reln),
rnode.node.dbNode, RelnGetDbOid(reln),
rnode.node.relNode, RelnGetRelNumber(reln),
forknum); forknum);
#if PG_VERSION_NUM >= 160000
forget_cached_relsize(rnode.locator, forknum);
#else
forget_cached_relsize(rnode.node, forknum); forget_cached_relsize(rnode.node, forknum);
#endif
mdclose(reln, forknum); mdclose(reln, forknum);
/* use isRedo == true, so that we drop it immediately */ /* use isRedo == true, so that we drop it immediately */
mdunlink(rnode, forknum, true); mdunlink(rnode, forknum, true);
@@ -2706,10 +2751,16 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
* regardless of whether the block is stored in shared buffers. * regardless of whether the block is stored in shared buffers.
* See also this function's top comment. * See also this function's top comment.
*/ */
if (!OidIsValid(rnode.dbNode))
if (!OidIsValid(RnodeGetDbOid(rnode)))
return false; return false;
#if PG_VERSION_NUM >= 160000
InitBufferTag(&tag, &rnode, forknum, blkno);
#else
INIT_BUFFERTAG(tag, rnode, forknum, blkno); INIT_BUFFERTAG(tag, rnode, forknum, blkno);
#endif
hash = BufTableHashCode(&tag); hash = BufTableHashCode(&tag);
partitionLock = BufMappingPartitionLock(hash); partitionLock = BufMappingPartitionLock(hash);

View File

@@ -15,7 +15,11 @@
#include "postgres.h" #include "postgres.h"
#include "pagestore_client.h" #include "pagestore_client.h"
#if PG_VERSION_NUM >= 160000
#include "storage/relfilelocator.h"
#else
#include "storage/relfilenode.h" #include "storage/relfilenode.h"
#endif
#include "storage/smgr.h" #include "storage/smgr.h"
#include "storage/lwlock.h" #include "storage/lwlock.h"
#include "storage/ipc.h" #include "storage/ipc.h"
@@ -28,6 +32,7 @@
#include "miscadmin.h" #include "miscadmin.h"
#endif #endif
typedef struct typedef struct
{ {
RelFileNode rnode; RelFileNode rnode;

View File

@@ -1394,7 +1394,12 @@ WalProposerRecovery(int donor, TimeLineID timeline, XLogRecPtr startpos, XLogRec
WalReceiverConn *wrconn; WalReceiverConn *wrconn;
WalRcvStreamOptions options; WalRcvStreamOptions options;
#if PG_VERSION_NUM >= 160000
bool must_use_password = false;
wrconn = walrcv_connect(safekeeper[donor].conninfo, false, must_use_password, "wal_proposer_recovery", &err);
#else
wrconn = walrcv_connect(safekeeper[donor].conninfo, false, "wal_proposer_recovery", &err); wrconn = walrcv_connect(safekeeper[donor].conninfo, false, "wal_proposer_recovery", &err);
#endif
if (!wrconn) if (!wrconn)
{ {
ereport(WARNING, ereport(WARNING,

View File

@@ -26,6 +26,10 @@
#include "access/xlogrecovery.h" #include "access/xlogrecovery.h"
#endif #endif
#if PG_VERSION_NUM >= 160000
#include "utils/guc.h"
#endif
/* /*
* These variables are used similarly to openLogFile/SegNo, * These variables are used similarly to openLogFile/SegNo,
* but for walproposer to write the XLOG during recovery. walpropFileTLI is the TimeLineID * but for walproposer to write the XLOG during recovery. walpropFileTLI is the TimeLineID

View File

@@ -128,7 +128,11 @@ clear_buffer_cache(PG_FUNCTION_ARGS)
else else
isvalid = false; isvalid = false;
bufferid = BufferDescriptorGetBuffer(bufHdr); bufferid = BufferDescriptorGetBuffer(bufHdr);
#if PG_VERSION_NUM >= 160000
rnode = BufTagGetRelFileLocator(&bufHdr->tag);
#else
rnode = bufHdr->tag.rnode; rnode = bufHdr->tag.rnode;
#endif
forknum = bufHdr->tag.forkNum; forknum = bufHdr->tag.forkNum;
blocknum = bufHdr->tag.blockNum; blocknum = bufHdr->tag.blockNum;
@@ -238,7 +242,7 @@ get_raw_page_at_lsn(PG_FUNCTION_ARGS)
SET_VARSIZE(raw_page, BLCKSZ + VARHDRSZ); SET_VARSIZE(raw_page, BLCKSZ + VARHDRSZ);
raw_page_data = VARDATA(raw_page); raw_page_data = VARDATA(raw_page);
neon_read_at_lsn(rel->rd_node, forknum, blkno, read_lsn, request_latest, raw_page_data); neon_read_at_lsn(RelnGetRnode(RelationGetSmgr(rel)), forknum, blkno, read_lsn, request_latest, raw_page_data);
relation_close(rel, AccessShareLock); relation_close(rel, AccessShareLock);
@@ -267,11 +271,17 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS)
PG_RETURN_NULL(); PG_RETURN_NULL();
{ {
#if PG_VERSION_NUM >= 160000
RelFileLocator rnode = {
.spcOid = PG_GETARG_OID(0),
.dbOid = PG_GETARG_OID(1),
.relNumber = PG_GETARG_OID(2)};
#else
RelFileNode rnode = { RelFileNode rnode = {
.spcNode = PG_GETARG_OID(0), .spcNode = PG_GETARG_OID(0),
.dbNode = PG_GETARG_OID(1), .dbNode = PG_GETARG_OID(1),
.relNode = PG_GETARG_OID(2)}; .relNode = PG_GETARG_OID(2)};
#endif
ForkNumber forknum = PG_GETARG_UINT32(3); ForkNumber forknum = PG_GETARG_UINT32(3);
uint32 blkno = PG_GETARG_UINT32(4); uint32 blkno = PG_GETARG_UINT32(4);

View File

@@ -21,7 +21,6 @@
#include "access/xlog.h" #include "access/xlog.h"
#include "storage/block.h" #include "storage/block.h"
#include "storage/buf_internals.h" #include "storage/buf_internals.h"
#include "storage/relfilenode.h"
#include "storage/smgr.h" #include "storage/smgr.h"
#if PG_VERSION_NUM >= 150000 #if PG_VERSION_NUM >= 150000
@@ -30,6 +29,7 @@
#include "inmem_smgr.h" #include "inmem_smgr.h"
/* Size of the in-memory smgr */ /* Size of the in-memory smgr */
#define MAX_PAGES 64 #define MAX_PAGES 64
@@ -46,12 +46,22 @@ locate_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno)
/* We only hold a small number of pages, so linear search */ /* We only hold a small number of pages, so linear search */
for (int i = 0; i < used_pages; i++) for (int i = 0; i < used_pages; i++)
{ {
if (RelFileNodeEquals(reln->smgr_rnode.node, page_tag[i].rnode)
#if PG_VERSION_NUM >= 160000
if (BufTagMatchesRelFileLocator(&page_tag[i], &reln->smgr_rlocator.locator)
&& forknum == page_tag[i].forkNum && forknum == page_tag[i].forkNum
&& blkno == page_tag[i].blockNum) && blkno == page_tag[i].blockNum)
{ {
return i; return i;
} }
#else
if (RelFileNodeEquals(RelnGetRnode(reln), page_tag[i].rnode)
&& forknum == page_tag[i].forkNum
&& blkno == page_tag[i].blockNum)
{
return i;
}
#endif
} }
return -1; return -1;
} }
@@ -97,8 +107,12 @@ inmem_exists(SMgrRelation reln, ForkNumber forknum)
{ {
for (int i = 0; i < used_pages; i++) for (int i = 0; i < used_pages; i++)
{ {
if (RelFileNodeEquals(reln->smgr_rnode.node, page_tag[i].rnode) #if PG_VERSION_NUM >= 160000
&& forknum == page_tag[i].forkNum) if (BufTagMatchesRelFileLocator(&page_tag[i], &reln->smgr_rlocator.locator)
#else
if (RelFileNodeEquals(RelnGetRnode(reln), page_tag[i].rnode)
#endif
&& forknum == page_tag[i].forkNum)
{ {
return true; return true;
} }
@@ -216,9 +230,9 @@ inmem_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
*/ */
elog(used_pages >= WARN_PAGES ? WARNING : DEBUG1, elog(used_pages >= WARN_PAGES ? WARNING : DEBUG1,
"inmem_write() called for %u/%u/%u.%u blk %u: used_pages %u", "inmem_write() called for %u/%u/%u.%u blk %u: used_pages %u",
reln->smgr_rnode.node.spcNode, RelnGetSpcOid(reln),
reln->smgr_rnode.node.dbNode, RelnGetDbOid(reln),
reln->smgr_rnode.node.relNode, RelnGetRelNumber(reln),
forknum, forknum,
blocknum, blocknum,
used_pages); used_pages);
@@ -227,14 +241,19 @@ inmem_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
pg = used_pages; pg = used_pages;
used_pages++; used_pages++;
INIT_BUFFERTAG(page_tag[pg], reln->smgr_rnode.node, forknum, blocknum);
#if PG_VERSION_NUM >= 160000
InitBufferTag(&page_tag[pg], &RelnGetRnode(reln), forknum, blocknum);
#else
INIT_BUFFERTAG(page_tag[pg], RelnGetRnode(reln), forknum, blocknum);
#endif
} }
else else
{ {
elog(DEBUG1, "inmem_write() called for %u/%u/%u.%u blk %u: found at %u", elog(DEBUG1, "inmem_write() called for %u/%u/%u.%u blk %u: found at %u",
reln->smgr_rnode.node.spcNode, RelnGetSpcOid(reln),
reln->smgr_rnode.node.dbNode, RelnGetDbOid(reln),
reln->smgr_rnode.node.relNode, RelnGetRelNumber(reln),
forknum, forknum,
blocknum, blocknum,
used_pages); used_pages);

View File

@@ -11,6 +11,40 @@
#ifndef INMEM_SMGR_H #ifndef INMEM_SMGR_H
#define INMEM_SMGR_H #define INMEM_SMGR_H
#if PG_VERSION_NUM >= 160000
#include "storage/relfilelocator.h"
#else
#include "storage/relfilenode.h"
#endif
// This is a hack to avoid too many ifdefs in the function definitions.
#if PG_VERSION_NUM >= 160000
typedef RelFileLocator RelFileNode;
typedef RelFileLocatorBackend RelFileNodeBackend;
#define RelFileNodeBackendIsTemp RelFileLocatorBackendIsTemp
#endif
#if PG_VERSION_NUM >= 160000
#define RelnGetRnode(reln) (reln->smgr_rlocator.locator)
#define RnodeGetSpcOid(rnode) (rnode.spcOid)
#define RnodeGetDbOid(rnode) (rnode.dbOid)
#define RnodeGetRelNumber(rnode) (rnode.relNumber)
#define BufTagGetRnode(tag) (BufTagGetRelFileLocator(&tag))
#else
#define RelnGetRnode(reln) (reln->smgr_rnode.node)
#define RnodeGetSpcOid(rnode) (rnode.spcNode)
#define RnodeGetDbOid(rnode) (rnode.dbNode)
#define RnodeGetRelNumber(rnode) (rnode.relNode)
#define BufTagGetRnode(tag) (tag.rnode)
#endif
#define RelnGetSpcOid(reln) (RnodeGetRelNumber(RelnGetRnode(reln)))
#define RelnGetDbOid(reln) (RnodeGetDbOid(RelnGetRnode(reln)))
#define RelnGetRelNumber(reln) (RnodeGetRelNumber(RelnGetRnode(reln)))
extern const f_smgr *smgr_inmem(BackendId backend, RelFileNode rnode); extern const f_smgr *smgr_inmem(BackendId backend, RelFileNode rnode);
extern void smgr_init_inmem(void); extern void smgr_init_inmem(void);

View File

@@ -62,8 +62,10 @@
#endif #endif
#ifndef HAVE_GETRUSAGE #ifndef HAVE_GETRUSAGE
#if PG_VERSION_NUM < 160000
#include "rusagestub.h" #include "rusagestub.h"
#endif #endif
#endif
#include "access/clog.h" #include "access/clog.h"
#include "access/commit_ts.h" #include "access/commit_ts.h"
@@ -117,6 +119,7 @@
#include "neon_seccomp.h" #include "neon_seccomp.h"
#endif #endif
PG_MODULE_MAGIC; PG_MODULE_MAGIC;
static int ReadRedoCommand(StringInfo inBuf); static int ReadRedoCommand(StringInfo inBuf);
@@ -662,18 +665,31 @@ BeginRedoForBlock(StringInfo input_message)
* BlockNumber * BlockNumber
*/ */
forknum = pq_getmsgbyte(input_message); forknum = pq_getmsgbyte(input_message);
#if PG_VERSION_NUM >= 160000
rnode.spcOid = pq_getmsgint(input_message, 4);
rnode.dbOid = pq_getmsgint(input_message, 4);
rnode.relNumber = pq_getmsgint(input_message, 4);
#else
rnode.spcNode = pq_getmsgint(input_message, 4); rnode.spcNode = pq_getmsgint(input_message, 4);
rnode.dbNode = pq_getmsgint(input_message, 4); rnode.dbNode = pq_getmsgint(input_message, 4);
rnode.relNode = pq_getmsgint(input_message, 4); rnode.relNode = pq_getmsgint(input_message, 4);
#endif
blknum = pq_getmsgint(input_message, 4); blknum = pq_getmsgint(input_message, 4);
wal_redo_buffer = InvalidBuffer; wal_redo_buffer = InvalidBuffer;
#if PG_VERSION_NUM >= 160000
InitBufferTag(&target_redo_tag, &rnode, forknum, blknum);
#else
INIT_BUFFERTAG(target_redo_tag, rnode, forknum, blknum); INIT_BUFFERTAG(target_redo_tag, rnode, forknum, blknum);
#endif
elog(TRACE, "BeginRedoForBlock %u/%u/%u.%d blk %u", elog(TRACE, "BeginRedoForBlock %u/%u/%u.%d blk %u",
target_redo_tag.rnode.spcNode, #if PG_VERSION_NUM >= 160000
target_redo_tag.rnode.dbNode, target_redo_tag.spcOid, target_redo_tag.dbOid, target_redo_tag.relNumber,
target_redo_tag.rnode.relNode, #else
target_redo_tag.rnode.spcNode, target_redo_tag.rnode.dbNode, target_redo_tag.rnode.relNode,
#endif
target_redo_tag.forkNum, target_redo_tag.forkNum,
target_redo_tag.blockNum); target_redo_tag.blockNum);
@@ -709,9 +725,15 @@ PushPage(StringInfo input_message)
* 8k page content * 8k page content
*/ */
forknum = pq_getmsgbyte(input_message); forknum = pq_getmsgbyte(input_message);
#if PG_VERSION_NUM >= 160000
rnode.spcOid = pq_getmsgint(input_message, 4);
rnode.dbOid = pq_getmsgint(input_message, 4);
rnode.relNumber = pq_getmsgint(input_message, 4);
#else
rnode.spcNode = pq_getmsgint(input_message, 4); rnode.spcNode = pq_getmsgint(input_message, 4);
rnode.dbNode = pq_getmsgint(input_message, 4); rnode.dbNode = pq_getmsgint(input_message, 4);
rnode.relNode = pq_getmsgint(input_message, 4); rnode.relNode = pq_getmsgint(input_message, 4);
#endif
blknum = pq_getmsgint(input_message, 4); blknum = pq_getmsgint(input_message, 4);
content = pq_getmsgbytes(input_message, BLCKSZ); content = pq_getmsgbytes(input_message, BLCKSZ);
@@ -831,7 +853,12 @@ ApplyRecord(StringInfo input_message)
*/ */
if (BufferIsInvalid(wal_redo_buffer)) if (BufferIsInvalid(wal_redo_buffer))
{ {
wal_redo_buffer = NeonRedoReadBuffer(target_redo_tag.rnode, wal_redo_buffer = NeonRedoReadBuffer(
#if PG_VERSION_NUM >= 160000
BufTagGetRelFileLocator(&target_redo_tag),
#else
target_redo_tag.rnode,
#endif
target_redo_tag.forkNum, target_redo_tag.forkNum,
target_redo_tag.blockNum, target_redo_tag.blockNum,
RBM_NORMAL); RBM_NORMAL);
@@ -873,12 +900,43 @@ apply_error_callback(void *arg)
} }
#if PG_VERSION_NUM >= 160000
static bool static bool
redo_block_filter(XLogReaderState *record, uint8 block_id) redo_block_filter(XLogReaderState *record, uint8 block_id)
{ {
BufferTag target_tag; BufferTag target_tag;
RelFileLocator rlocator;
XLogRecGetBlockTag(record, block_id,
&rlocator, &target_tag.forkNum, &target_tag.blockNum);
target_tag.spcOid = rlocator.spcOid;
target_tag.dbOid = rlocator.dbOid;
target_tag.relNumber = rlocator.relNumber;
/*
* Can a WAL redo function ever access a relation other than the one that
* it modifies? I don't see why it would.
*/
if (RelFileLocatorEquals(BufTagGetRelFileLocator(&target_tag), BufTagGetRelFileLocator(&target_redo_tag)))
elog(WARNING, "REDO accessing unexpected page: %u/%u/%u.%u blk %u",
target_tag.spcOid, target_tag.dbOid, target_tag.relNumber,
target_tag.forkNum, target_tag.blockNum);
/*
* If this block isn't one we are currently restoring, then return 'true'
* so that this gets ignored
*/
return !BufferTagsEqual(&target_tag, &target_redo_tag);
}
#else
static bool
redo_block_filter(XLogReaderState *record, uint8 block_id)
{
BufferTag target_tag;
#if PG_VERSION_NUM >= 150000 #if PG_VERSION_NUM >= 150000
XLogRecGetBlockTag(record, block_id, XLogRecGetBlockTag(record, block_id,
&target_tag.rnode, &target_tag.forkNum, &target_tag.blockNum); &target_tag.rnode, &target_tag.forkNum, &target_tag.blockNum);
@@ -897,14 +955,18 @@ redo_block_filter(XLogReaderState *record, uint8 block_id)
*/ */
if (!RelFileNodeEquals(target_tag.rnode, target_redo_tag.rnode)) if (!RelFileNodeEquals(target_tag.rnode, target_redo_tag.rnode))
elog(WARNING, "REDO accessing unexpected page: %u/%u/%u.%u blk %u", elog(WARNING, "REDO accessing unexpected page: %u/%u/%u.%u blk %u",
target_tag.rnode.spcNode, target_tag.rnode.dbNode, target_tag.rnode.relNode, target_tag.forkNum, target_tag.blockNum); target_tag.rnode.spcNode, target_tag.rnode.dbNode, target_tag.rnode.relNode,
target_tag.forkNum, target_tag.blockNum);
/* /*
* If this block isn't one we are currently restoring, then return 'true' * If this block isn't one we are currently restoring, then return 'true'
* so that this gets ignored * so that this gets ignored
*/ */
return !BUFFERTAGS_EQUAL(target_tag, target_redo_tag); return !BUFFERTAGS_EQUAL(target_tag, target_redo_tag);
} }
#endif
/* /*
* Get a page image back from buffer cache. * Get a page image back from buffer cache.
@@ -931,9 +993,15 @@ GetPage(StringInfo input_message)
* BlockNumber * BlockNumber
*/ */
forknum = pq_getmsgbyte(input_message); forknum = pq_getmsgbyte(input_message);
#if PG_VERSION_NUM >= 160000
rnode.spcOid = pq_getmsgint(input_message, 4);
rnode.dbOid = pq_getmsgint(input_message, 4);
rnode.relNumber = pq_getmsgint(input_message, 4);
#else
rnode.spcNode = pq_getmsgint(input_message, 4); rnode.spcNode = pq_getmsgint(input_message, 4);
rnode.dbNode = pq_getmsgint(input_message, 4); rnode.dbNode = pq_getmsgint(input_message, 4);
rnode.relNode = pq_getmsgint(input_message, 4); rnode.relNode = pq_getmsgint(input_message, 4);
#endif
blknum = pq_getmsgint(input_message, 4); blknum = pq_getmsgint(input_message, 4);
/* FIXME: check that we got a BeginRedoForBlock message or this earlier */ /* FIXME: check that we got a BeginRedoForBlock message or this earlier */
@@ -961,7 +1029,11 @@ GetPage(StringInfo input_message)
} while (tot_written < BLCKSZ); } while (tot_written < BLCKSZ);
ReleaseBuffer(buf); ReleaseBuffer(buf);
#if PG_VERSION_NUM >= 160000
DropRelationAllLocalBuffers(rnode);
#else
DropRelFileNodeAllLocalBuffers(rnode); DropRelFileNodeAllLocalBuffers(rnode);
#endif
wal_redo_buffer = InvalidBuffer; wal_redo_buffer = InvalidBuffer;
elog(TRACE, "Page sent back for block %u", blknum); elog(TRACE, "Page sent back for block %u", blknum);

View File

@@ -149,6 +149,11 @@ impl PhysicalStorage {
wal_seg_size, wal_seg_size,
state.commit_lsn, state.commit_lsn,
)?, )?,
16 => postgres_ffi::v16::xlog_utils::find_end_of_wal(
&timeline_dir,
wal_seg_size,
state.commit_lsn,
)?,
_ => bail!("unsupported postgres version: {}", state.server.pg_version), _ => bail!("unsupported postgres version: {}", state.server.pg_version),
} }
}; };

View File

@@ -17,6 +17,7 @@ This fixture is used to determine which version of Postgres to use for tests.
class PgVersion(str, enum.Enum): class PgVersion(str, enum.Enum):
V14 = "14" V14 = "14"
V15 = "15" V15 = "15"
V16 = "16"
# Instead of making version an optional parameter in methods, we can use this fake entry # Instead of making version an optional parameter in methods, we can use this fake entry
# to explicitly rely on the default server version (could be different from pg_version fixture value) # to explicitly rely on the default server version (could be different from pg_version fixture value)
NOT_SET = "<-POSTRGRES VERSION IS NOT SET->" NOT_SET = "<-POSTRGRES VERSION IS NOT SET->"

1
vendor/postgres-v16 vendored Submodule

Submodule vendor/postgres-v16 added at 346980e0fa