mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-23 06:09:59 +00:00
Move walredo process code under pgxn in the main 'neon' repository.
- Refactor the way the WalProposerMain function is called when started with --sync-safekeepers. The postgres binary now explicitly loads the 'neon.so' library and calls the WalProposerMain in it. This is simpler than the global function callback "hook" we previously used. - Move the WAL redo process code to a new library, neon_walredo.so, and use the same mechanism as for --sync-safekeepers to call the WalRedoMain function, when launched with --walredo argument. - Also move the seccomp code to neon_walredo.so library. I kept the configure check in the postgres side for now, though.
This commit is contained in:
10
Makefile
10
Makefile
@@ -151,6 +151,11 @@ neon-pg-ext-v14: postgres-v14
|
|||||||
(cd $(POSTGRES_INSTALL_DIR)/build/neon-v14 && \
|
(cd $(POSTGRES_INSTALL_DIR)/build/neon-v14 && \
|
||||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v14/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
|
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v14/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
|
||||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile install)
|
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile install)
|
||||||
|
+@echo "Compiling neon_walredo v14"
|
||||||
|
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-walredo-v14
|
||||||
|
(cd $(POSTGRES_INSTALL_DIR)/build/neon-walredo-v14 && \
|
||||||
|
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v14/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
|
||||||
|
-f $(ROOT_PROJECT_DIR)/pgxn/neon_walredo/Makefile install)
|
||||||
+@echo "Compiling neon_test_utils" v14
|
+@echo "Compiling neon_test_utils" v14
|
||||||
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-v14
|
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-v14
|
||||||
(cd $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-v14 && \
|
(cd $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-v14 && \
|
||||||
@@ -163,6 +168,11 @@ neon-pg-ext-v15: postgres-v15
|
|||||||
(cd $(POSTGRES_INSTALL_DIR)/build/neon-v15 && \
|
(cd $(POSTGRES_INSTALL_DIR)/build/neon-v15 && \
|
||||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v15/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
|
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v15/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
|
||||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile install)
|
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile install)
|
||||||
|
+@echo "Compiling neon_walredo v15"
|
||||||
|
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-walredo-v15
|
||||||
|
(cd $(POSTGRES_INSTALL_DIR)/build/neon-walredo-v15 && \
|
||||||
|
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v15/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
|
||||||
|
-f $(ROOT_PROJECT_DIR)/pgxn/neon_walredo/Makefile install)
|
||||||
+@echo "Compiling neon_test_utils" v15
|
+@echo "Compiling neon_test_utils" v15
|
||||||
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-v15
|
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-v15
|
||||||
(cd $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-v15 && \
|
(cd $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-v15 && \
|
||||||
|
|||||||
@@ -52,6 +52,10 @@ PostgreSQL extension that implements storage manager API and network communicati
|
|||||||
|
|
||||||
PostgreSQL extension that contains functions needed for testing and debugging.
|
PostgreSQL extension that contains functions needed for testing and debugging.
|
||||||
|
|
||||||
|
`/pgxn/neon_walredo`:
|
||||||
|
|
||||||
|
Library to run Postgres as a "WAL redo process" in the pageserver.
|
||||||
|
|
||||||
`/safekeeper`:
|
`/safekeeper`:
|
||||||
|
|
||||||
The neon WAL service that receives WAL from a primary compute nodes and streams it to the pageserver.
|
The neon WAL service that receives WAL from a primary compute nodes and streams it to the pageserver.
|
||||||
|
|||||||
@@ -10,7 +10,7 @@
|
|||||||
//! process. Then we get the page image back. Communication with the
|
//! process. Then we get the page image back. Communication with the
|
||||||
//! postgres process happens via stdin/stdout
|
//! postgres process happens via stdin/stdout
|
||||||
//!
|
//!
|
||||||
//! See src/backend/tcop/zenith_wal_redo.c for the other side of
|
//! See pgxn/neon_walredo/walredoproc.c for the other side of
|
||||||
//! this communication.
|
//! this communication.
|
||||||
//!
|
//!
|
||||||
//! The Postgres process is assumed to be secure against malicious WAL
|
//! The Postgres process is assumed to be secure against malicious WAL
|
||||||
@@ -644,14 +644,12 @@ impl PostgresRedoProcess {
|
|||||||
),
|
),
|
||||||
));
|
));
|
||||||
} else {
|
} else {
|
||||||
// Limit shared cache for wal-redo-postres
|
// Limit shared cache for wal-redo-postgres
|
||||||
let mut config = OpenOptions::new()
|
let mut config = OpenOptions::new()
|
||||||
.append(true)
|
.append(true)
|
||||||
.open(PathBuf::from(&datadir).join("postgresql.conf"))?;
|
.open(PathBuf::from(&datadir).join("postgresql.conf"))?;
|
||||||
config.write_all(b"shared_buffers=128kB\n")?;
|
config.write_all(b"shared_buffers=128kB\n")?;
|
||||||
config.write_all(b"fsync=off\n")?;
|
config.write_all(b"fsync=off\n")?;
|
||||||
config.write_all(b"shared_preload_libraries=neon\n")?;
|
|
||||||
config.write_all(b"neon.wal_redo=on\n")?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start postgres itself
|
// Start postgres itself
|
||||||
@@ -664,10 +662,11 @@ impl PostgresRedoProcess {
|
|||||||
.env("LD_LIBRARY_PATH", &pg_lib_dir_path)
|
.env("LD_LIBRARY_PATH", &pg_lib_dir_path)
|
||||||
.env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
|
.env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
|
||||||
.env("PGDATA", &datadir)
|
.env("PGDATA", &datadir)
|
||||||
// The redo process is not trusted, so it runs in seccomp mode
|
// The redo process is not trusted, and runs in seccomp mode that
|
||||||
// (see seccomp in zenith_wal_redo.c). We have to make sure it doesn't
|
// doesn't allow it to open any files. We have to also make sure it
|
||||||
// inherit any file descriptors from the pageserver that would allow
|
// doesn't inherit any file descriptors from the pageserver, that
|
||||||
// an attacker to do bad things.
|
// would allow an attacker to read any files that happen to be open
|
||||||
|
// in the pageserver.
|
||||||
//
|
//
|
||||||
// The Rust standard library makes sure to mark any file descriptors with
|
// The Rust standard library makes sure to mark any file descriptors with
|
||||||
// as close-on-exec by default, but that's not enough, since we use
|
// as close-on-exec by default, but that's not enough, since we use
|
||||||
@@ -844,7 +843,7 @@ impl PostgresRedoProcess {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Functions for constructing messages to send to the postgres WAL redo
|
// Functions for constructing messages to send to the postgres WAL redo
|
||||||
// process. See vendor/postgres/src/backend/tcop/zenith_wal_redo.c for
|
// process. See pgxn/neon_walredo/walredoproc.c for
|
||||||
// explanation of the protocol.
|
// explanation of the protocol.
|
||||||
|
|
||||||
fn build_begin_redo_for_block_msg(tag: BufferTag, buf: &mut Vec<u8>) {
|
fn build_begin_redo_for_block_msg(tag: BufferTag, buf: &mut Vec<u8>) {
|
||||||
|
|||||||
@@ -4,7 +4,6 @@
|
|||||||
MODULE_big = neon
|
MODULE_big = neon
|
||||||
OBJS = \
|
OBJS = \
|
||||||
$(WIN32RES) \
|
$(WIN32RES) \
|
||||||
inmem_smgr.o \
|
|
||||||
libpagestore.o \
|
libpagestore.o \
|
||||||
libpqwalproposer.o \
|
libpqwalproposer.o \
|
||||||
pagestore_smgr.o \
|
pagestore_smgr.o \
|
||||||
|
|||||||
@@ -419,15 +419,6 @@ pg_init_libpagestore(void)
|
|||||||
0, /* no flags required */
|
0, /* no flags required */
|
||||||
check_neon_id, NULL, NULL);
|
check_neon_id, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomBoolVariable("neon.wal_redo",
|
|
||||||
"start in wal-redo mode",
|
|
||||||
NULL,
|
|
||||||
&wal_redo,
|
|
||||||
false,
|
|
||||||
PGC_POSTMASTER,
|
|
||||||
0,
|
|
||||||
NULL, NULL, NULL);
|
|
||||||
|
|
||||||
DefineCustomIntVariable("neon.max_cluster_size",
|
DefineCustomIntVariable("neon.max_cluster_size",
|
||||||
"cluster size limit",
|
"cluster size limit",
|
||||||
NULL,
|
NULL,
|
||||||
@@ -452,13 +443,7 @@ pg_init_libpagestore(void)
|
|||||||
neon_timeline_walproposer = neon_timeline;
|
neon_timeline_walproposer = neon_timeline;
|
||||||
neon_tenant_walproposer = neon_tenant;
|
neon_tenant_walproposer = neon_tenant;
|
||||||
|
|
||||||
if (wal_redo)
|
if (page_server_connstring && page_server_connstring[0])
|
||||||
{
|
|
||||||
neon_log(PageStoreTrace, "set inmem_smgr hook");
|
|
||||||
smgr_hook = smgr_inmem;
|
|
||||||
smgr_init_hook = smgr_init_inmem;
|
|
||||||
}
|
|
||||||
else if (page_server_connstring && page_server_connstring[0])
|
|
||||||
{
|
{
|
||||||
neon_log(PageStoreTrace, "set neon_smgr hook");
|
neon_log(PageStoreTrace, "set neon_smgr hook");
|
||||||
smgr_hook = smgr_neon;
|
smgr_hook = smgr_neon;
|
||||||
|
|||||||
@@ -155,10 +155,6 @@ extern int32 max_cluster_size;
|
|||||||
extern const f_smgr *smgr_neon(BackendId backend, RelFileNode rnode);
|
extern const f_smgr *smgr_neon(BackendId backend, RelFileNode rnode);
|
||||||
extern void smgr_init_neon(void);
|
extern void smgr_init_neon(void);
|
||||||
|
|
||||||
extern const f_smgr *smgr_inmem(BackendId backend, RelFileNode rnode);
|
|
||||||
extern void smgr_init_inmem(void);
|
|
||||||
extern void smgr_shutdown_inmem(void);
|
|
||||||
|
|
||||||
/* Neon storage manager functionality */
|
/* Neon storage manager functionality */
|
||||||
|
|
||||||
extern void neon_init(void);
|
extern void neon_init(void);
|
||||||
@@ -188,29 +184,6 @@ extern void neon_truncate(SMgrRelation reln, ForkNumber forknum,
|
|||||||
BlockNumber nblocks);
|
BlockNumber nblocks);
|
||||||
extern void neon_immedsync(SMgrRelation reln, ForkNumber forknum);
|
extern void neon_immedsync(SMgrRelation reln, ForkNumber forknum);
|
||||||
|
|
||||||
/* neon wal-redo storage manager functionality */
|
|
||||||
|
|
||||||
extern void inmem_init(void);
|
|
||||||
extern void inmem_open(SMgrRelation reln);
|
|
||||||
extern void inmem_close(SMgrRelation reln, ForkNumber forknum);
|
|
||||||
extern void inmem_create(SMgrRelation reln, ForkNumber forknum, bool isRedo);
|
|
||||||
extern bool inmem_exists(SMgrRelation reln, ForkNumber forknum);
|
|
||||||
extern void inmem_unlink(RelFileNodeBackend rnode, ForkNumber forknum, bool isRedo);
|
|
||||||
extern void inmem_extend(SMgrRelation reln, ForkNumber forknum,
|
|
||||||
BlockNumber blocknum, char *buffer, bool skipFsync);
|
|
||||||
extern bool inmem_prefetch(SMgrRelation reln, ForkNumber forknum,
|
|
||||||
BlockNumber blocknum);
|
|
||||||
extern void inmem_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
|
||||||
char *buffer);
|
|
||||||
extern void inmem_write(SMgrRelation reln, ForkNumber forknum,
|
|
||||||
BlockNumber blocknum, char *buffer, bool skipFsync);
|
|
||||||
extern void inmem_writeback(SMgrRelation reln, ForkNumber forknum,
|
|
||||||
BlockNumber blocknum, BlockNumber nblocks);
|
|
||||||
extern BlockNumber inmem_nblocks(SMgrRelation reln, ForkNumber forknum);
|
|
||||||
extern void inmem_truncate(SMgrRelation reln, ForkNumber forknum,
|
|
||||||
BlockNumber nblocks);
|
|
||||||
extern void inmem_immedsync(SMgrRelation reln, ForkNumber forknum);
|
|
||||||
|
|
||||||
/* utils for neon relsize cache */
|
/* utils for neon relsize cache */
|
||||||
extern void relsize_hash_init(void);
|
extern void relsize_hash_init(void);
|
||||||
extern bool get_cached_relsize(RelFileNode rnode, ForkNumber forknum, BlockNumber *size);
|
extern bool get_cached_relsize(RelFileNode rnode, ForkNumber forknum, BlockNumber *size);
|
||||||
|
|||||||
@@ -99,7 +99,6 @@ char *page_server_connstring;
|
|||||||
/*with substituted password*/
|
/*with substituted password*/
|
||||||
char *neon_timeline;
|
char *neon_timeline;
|
||||||
char *neon_tenant;
|
char *neon_tenant;
|
||||||
bool wal_redo = false;
|
|
||||||
int32 max_cluster_size;
|
int32 max_cluster_size;
|
||||||
|
|
||||||
/* unlogged relation build states */
|
/* unlogged relation build states */
|
||||||
|
|||||||
@@ -43,6 +43,7 @@
|
|||||||
#if PG_VERSION_NUM >= 150000
|
#if PG_VERSION_NUM >= 150000
|
||||||
#include "access/xlogrecovery.h"
|
#include "access/xlogrecovery.h"
|
||||||
#endif
|
#endif
|
||||||
|
#include "storage/fd.h"
|
||||||
#include "storage/latch.h"
|
#include "storage/latch.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
#include "pgstat.h"
|
#include "pgstat.h"
|
||||||
@@ -69,7 +70,8 @@
|
|||||||
#include "neon.h"
|
#include "neon.h"
|
||||||
#include "walproposer.h"
|
#include "walproposer.h"
|
||||||
#include "walproposer_utils.h"
|
#include "walproposer_utils.h"
|
||||||
#include "replication/walpropshim.h"
|
|
||||||
|
static bool syncSafekeepers = false;
|
||||||
|
|
||||||
char *wal_acceptors_list;
|
char *wal_acceptors_list;
|
||||||
int wal_acceptor_reconnect_timeout;
|
int wal_acceptor_reconnect_timeout;
|
||||||
@@ -117,8 +119,8 @@ static TimestampTz last_reconnect_attempt;
|
|||||||
static WalproposerShmemState * walprop_shared;
|
static WalproposerShmemState * walprop_shared;
|
||||||
|
|
||||||
/* Prototypes for private functions */
|
/* Prototypes for private functions */
|
||||||
static void WalProposerInitImpl(XLogRecPtr flushRecPtr, uint64 systemId);
|
static void WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId);
|
||||||
static void WalProposerStartImpl(void);
|
static void WalProposerStart(void);
|
||||||
static void WalProposerLoop(void);
|
static void WalProposerLoop(void);
|
||||||
static void InitEventSet(void);
|
static void InitEventSet(void);
|
||||||
static void UpdateEventSet(Safekeeper *sk, uint32 events);
|
static void UpdateEventSet(Safekeeper *sk, uint32 events);
|
||||||
@@ -186,9 +188,56 @@ pg_init_walproposer(void)
|
|||||||
ProcessInterruptsCallback = backpressure_throttling_impl;
|
ProcessInterruptsCallback = backpressure_throttling_impl;
|
||||||
|
|
||||||
WalProposerRegister();
|
WalProposerRegister();
|
||||||
|
}
|
||||||
|
|
||||||
WalProposerInit = &WalProposerInitImpl;
|
/*
|
||||||
WalProposerStart = &WalProposerStartImpl;
|
* Entry point for `postgres --sync-safekeepers`.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
WalProposerSync(int argc, char *argv[])
|
||||||
|
{
|
||||||
|
struct stat stat_buf;
|
||||||
|
|
||||||
|
syncSafekeepers = true;
|
||||||
|
#if PG_VERSION_NUM < 150000
|
||||||
|
ThisTimeLineID = 1;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Initialize postmaster_alive_fds as WaitEventSet checks them.
|
||||||
|
*
|
||||||
|
* Copied from InitPostmasterDeathWatchHandle()
|
||||||
|
*/
|
||||||
|
if (pipe(postmaster_alive_fds) < 0)
|
||||||
|
ereport(FATAL,
|
||||||
|
(errcode_for_file_access(),
|
||||||
|
errmsg_internal("could not create pipe to monitor postmaster death: %m")));
|
||||||
|
if (fcntl(postmaster_alive_fds[POSTMASTER_FD_WATCH], F_SETFL, O_NONBLOCK) == -1)
|
||||||
|
ereport(FATAL,
|
||||||
|
(errcode_for_socket_access(),
|
||||||
|
errmsg_internal("could not set postmaster death monitoring pipe to nonblocking mode: %m")));
|
||||||
|
|
||||||
|
ChangeToDataDir();
|
||||||
|
|
||||||
|
/* Create pg_wal directory, if it doesn't exist */
|
||||||
|
if (stat(XLOGDIR, &stat_buf) != 0)
|
||||||
|
{
|
||||||
|
ereport(LOG, (errmsg("creating missing WAL directory \"%s\"", XLOGDIR)));
|
||||||
|
if (MakePGDirectory(XLOGDIR) < 0)
|
||||||
|
{
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode_for_file_access(),
|
||||||
|
errmsg("could not create directory \"%s\": %m",
|
||||||
|
XLOGDIR)));
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
WalProposerInit(0, 0);
|
||||||
|
|
||||||
|
BackgroundWorkerUnblockSignals();
|
||||||
|
|
||||||
|
WalProposerStart();
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
@@ -429,7 +478,7 @@ WalProposerRegister(void)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
WalProposerInitImpl(XLogRecPtr flushRecPtr, uint64 systemId)
|
WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId)
|
||||||
{
|
{
|
||||||
char *host;
|
char *host;
|
||||||
char *sep;
|
char *sep;
|
||||||
@@ -508,7 +557,7 @@ WalProposerInitImpl(XLogRecPtr flushRecPtr, uint64 systemId)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
WalProposerStartImpl(void)
|
WalProposerStart(void)
|
||||||
{
|
{
|
||||||
|
|
||||||
/* Initiate connections to all safekeeper nodes */
|
/* Initiate connections to all safekeeper nodes */
|
||||||
|
|||||||
22
pgxn/neon_walredo/Makefile
Normal file
22
pgxn/neon_walredo/Makefile
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
# pgxs/neon_walredo/Makefile
|
||||||
|
|
||||||
|
MODULE_big = neon_walredo
|
||||||
|
OBJS = \
|
||||||
|
$(WIN32RES) \
|
||||||
|
inmem_smgr.o \
|
||||||
|
walredoproc.o \
|
||||||
|
|
||||||
|
# This really should be guarded by $(with_libseccomp), but I couldn't
|
||||||
|
# make that work with pgxs. So we always compile it, but its contents
|
||||||
|
# are wrapped in #ifdef HAVE_LIBSECCOMP instead.
|
||||||
|
OBJS += seccomp.o
|
||||||
|
|
||||||
|
PGFILEDESC = "neon_walredo - helper process that runs in Neon pageserver"
|
||||||
|
|
||||||
|
PG_CONFIG = pg_config
|
||||||
|
PGXS := $(shell $(PG_CONFIG) --pgxs)
|
||||||
|
include $(PGXS)
|
||||||
|
|
||||||
|
ifeq ($(with_libseccomp),yes)
|
||||||
|
SHLIB_LINK += -lseccomp
|
||||||
|
endif
|
||||||
@@ -3,9 +3,8 @@
|
|||||||
* inmem_smgr.c
|
* inmem_smgr.c
|
||||||
*
|
*
|
||||||
* This is an implementation of the SMGR interface, used in the WAL redo
|
* This is an implementation of the SMGR interface, used in the WAL redo
|
||||||
* process (see src/backend/tcop/zenith_wal_redo.c). It has no persistent
|
* process. It has no persistent storage, the pages that are written out
|
||||||
* storage, the pages that are written out are kept in a small number of
|
* are kept in a small number of in-memory buffers.
|
||||||
* in-memory buffers.
|
|
||||||
*
|
*
|
||||||
* Normally, replaying a WAL record only needs to access a handful of
|
* Normally, replaying a WAL record only needs to access a handful of
|
||||||
* buffers, which fit in the normal buffer cache, so this is just for
|
* buffers, which fit in the normal buffer cache, so this is just for
|
||||||
@@ -15,15 +14,11 @@
|
|||||||
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
|
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
|
||||||
* Portions Copyright (c) 1994, Regents of the University of California
|
* Portions Copyright (c) 1994, Regents of the University of California
|
||||||
*
|
*
|
||||||
* IDENTIFICATION
|
|
||||||
* contrib/neon/inmem_smgr.c
|
|
||||||
*
|
|
||||||
*-------------------------------------------------------------------------
|
*-------------------------------------------------------------------------
|
||||||
*/
|
*/
|
||||||
#include "postgres.h"
|
#include "postgres.h"
|
||||||
|
|
||||||
#include "access/xlog.h"
|
#include "access/xlog.h"
|
||||||
#include "pagestore_client.h"
|
|
||||||
#include "storage/block.h"
|
#include "storage/block.h"
|
||||||
#include "storage/buf_internals.h"
|
#include "storage/buf_internals.h"
|
||||||
#include "storage/relfilenode.h"
|
#include "storage/relfilenode.h"
|
||||||
@@ -33,6 +28,8 @@
|
|||||||
#include "access/xlogutils.h"
|
#include "access/xlogutils.h"
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#include "inmem_smgr.h"
|
||||||
|
|
||||||
/* Size of the in-memory smgr */
|
/* Size of the in-memory smgr */
|
||||||
#define MAX_PAGES 64
|
#define MAX_PAGES 64
|
||||||
|
|
||||||
@@ -59,10 +56,34 @@ locate_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* neon wal-redo storage manager functionality */
|
||||||
|
static void inmem_init(void);
|
||||||
|
static void inmem_open(SMgrRelation reln);
|
||||||
|
static void inmem_close(SMgrRelation reln, ForkNumber forknum);
|
||||||
|
static void inmem_create(SMgrRelation reln, ForkNumber forknum, bool isRedo);
|
||||||
|
static bool inmem_exists(SMgrRelation reln, ForkNumber forknum);
|
||||||
|
static void inmem_unlink(RelFileNodeBackend rnode, ForkNumber forknum, bool isRedo);
|
||||||
|
static void inmem_extend(SMgrRelation reln, ForkNumber forknum,
|
||||||
|
BlockNumber blocknum, char *buffer, bool skipFsync);
|
||||||
|
static bool inmem_prefetch(SMgrRelation reln, ForkNumber forknum,
|
||||||
|
BlockNumber blocknum);
|
||||||
|
static void inmem_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||||
|
char *buffer);
|
||||||
|
static void inmem_write(SMgrRelation reln, ForkNumber forknum,
|
||||||
|
BlockNumber blocknum, char *buffer, bool skipFsync);
|
||||||
|
static void inmem_writeback(SMgrRelation reln, ForkNumber forknum,
|
||||||
|
BlockNumber blocknum, BlockNumber nblocks);
|
||||||
|
static BlockNumber inmem_nblocks(SMgrRelation reln, ForkNumber forknum);
|
||||||
|
static void inmem_truncate(SMgrRelation reln, ForkNumber forknum,
|
||||||
|
BlockNumber nblocks);
|
||||||
|
static void inmem_immedsync(SMgrRelation reln, ForkNumber forknum);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* inmem_init() -- Initialize private state
|
* inmem_init() -- Initialize private state
|
||||||
*/
|
*/
|
||||||
void
|
static void
|
||||||
inmem_init(void)
|
inmem_init(void)
|
||||||
{
|
{
|
||||||
used_pages = 0;
|
used_pages = 0;
|
||||||
@@ -71,7 +92,7 @@ inmem_init(void)
|
|||||||
/*
|
/*
|
||||||
* inmem_exists() -- Does the physical file exist?
|
* inmem_exists() -- Does the physical file exist?
|
||||||
*/
|
*/
|
||||||
bool
|
static bool
|
||||||
inmem_exists(SMgrRelation reln, ForkNumber forknum)
|
inmem_exists(SMgrRelation reln, ForkNumber forknum)
|
||||||
{
|
{
|
||||||
for (int i = 0; i < used_pages; i++)
|
for (int i = 0; i < used_pages; i++)
|
||||||
@@ -90,7 +111,7 @@ inmem_exists(SMgrRelation reln, ForkNumber forknum)
|
|||||||
*
|
*
|
||||||
* If isRedo is true, it's okay for the relation to exist already.
|
* If isRedo is true, it's okay for the relation to exist already.
|
||||||
*/
|
*/
|
||||||
void
|
static void
|
||||||
inmem_create(SMgrRelation reln, ForkNumber forknum, bool isRedo)
|
inmem_create(SMgrRelation reln, ForkNumber forknum, bool isRedo)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@@ -98,7 +119,7 @@ inmem_create(SMgrRelation reln, ForkNumber forknum, bool isRedo)
|
|||||||
/*
|
/*
|
||||||
* inmem_unlink() -- Unlink a relation.
|
* inmem_unlink() -- Unlink a relation.
|
||||||
*/
|
*/
|
||||||
void
|
static void
|
||||||
inmem_unlink(RelFileNodeBackend rnode, ForkNumber forknum, bool isRedo)
|
inmem_unlink(RelFileNodeBackend rnode, ForkNumber forknum, bool isRedo)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@@ -112,7 +133,7 @@ inmem_unlink(RelFileNodeBackend rnode, ForkNumber forknum, bool isRedo)
|
|||||||
* EOF). Note that we assume writing a block beyond current EOF
|
* EOF). Note that we assume writing a block beyond current EOF
|
||||||
* causes intervening file space to become filled with zeroes.
|
* causes intervening file space to become filled with zeroes.
|
||||||
*/
|
*/
|
||||||
void
|
static void
|
||||||
inmem_extend(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
|
inmem_extend(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
|
||||||
char *buffer, bool skipFsync)
|
char *buffer, bool skipFsync)
|
||||||
{
|
{
|
||||||
@@ -123,7 +144,7 @@ inmem_extend(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
|
|||||||
/*
|
/*
|
||||||
* inmem_open() -- Initialize newly-opened relation.
|
* inmem_open() -- Initialize newly-opened relation.
|
||||||
*/
|
*/
|
||||||
void
|
static void
|
||||||
inmem_open(SMgrRelation reln)
|
inmem_open(SMgrRelation reln)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@@ -131,7 +152,7 @@ inmem_open(SMgrRelation reln)
|
|||||||
/*
|
/*
|
||||||
* inmem_close() -- Close the specified relation, if it isn't closed already.
|
* inmem_close() -- Close the specified relation, if it isn't closed already.
|
||||||
*/
|
*/
|
||||||
void
|
static void
|
||||||
inmem_close(SMgrRelation reln, ForkNumber forknum)
|
inmem_close(SMgrRelation reln, ForkNumber forknum)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@@ -139,7 +160,7 @@ inmem_close(SMgrRelation reln, ForkNumber forknum)
|
|||||||
/*
|
/*
|
||||||
* inmem_prefetch() -- Initiate asynchronous read of the specified block of a relation
|
* inmem_prefetch() -- Initiate asynchronous read of the specified block of a relation
|
||||||
*/
|
*/
|
||||||
bool
|
static bool
|
||||||
inmem_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
|
inmem_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
@@ -148,7 +169,7 @@ inmem_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
|
|||||||
/*
|
/*
|
||||||
* inmem_writeback() -- Tell the kernel to write pages back to storage.
|
* inmem_writeback() -- Tell the kernel to write pages back to storage.
|
||||||
*/
|
*/
|
||||||
void
|
static void
|
||||||
inmem_writeback(SMgrRelation reln, ForkNumber forknum,
|
inmem_writeback(SMgrRelation reln, ForkNumber forknum,
|
||||||
BlockNumber blocknum, BlockNumber nblocks)
|
BlockNumber blocknum, BlockNumber nblocks)
|
||||||
{
|
{
|
||||||
@@ -157,7 +178,7 @@ inmem_writeback(SMgrRelation reln, ForkNumber forknum,
|
|||||||
/*
|
/*
|
||||||
* inmem_read() -- Read the specified block from a relation.
|
* inmem_read() -- Read the specified block from a relation.
|
||||||
*/
|
*/
|
||||||
void
|
static void
|
||||||
inmem_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
|
inmem_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
|
||||||
char *buffer)
|
char *buffer)
|
||||||
{
|
{
|
||||||
@@ -177,7 +198,7 @@ inmem_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
|
|||||||
* relation (ie, those before the current EOF). To extend a relation,
|
* relation (ie, those before the current EOF). To extend a relation,
|
||||||
* use mdextend().
|
* use mdextend().
|
||||||
*/
|
*/
|
||||||
void
|
static void
|
||||||
inmem_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
inmem_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||||
char *buffer, bool skipFsync)
|
char *buffer, bool skipFsync)
|
||||||
{
|
{
|
||||||
@@ -224,7 +245,7 @@ inmem_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
|||||||
/*
|
/*
|
||||||
* inmem_nblocks() -- Get the number of blocks stored in a relation.
|
* inmem_nblocks() -- Get the number of blocks stored in a relation.
|
||||||
*/
|
*/
|
||||||
BlockNumber
|
static BlockNumber
|
||||||
inmem_nblocks(SMgrRelation reln, ForkNumber forknum)
|
inmem_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
@@ -243,7 +264,7 @@ inmem_nblocks(SMgrRelation reln, ForkNumber forknum)
|
|||||||
/*
|
/*
|
||||||
* inmem_truncate() -- Truncate relation to specified number of blocks.
|
* inmem_truncate() -- Truncate relation to specified number of blocks.
|
||||||
*/
|
*/
|
||||||
void
|
static void
|
||||||
inmem_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
|
inmem_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@@ -251,7 +272,7 @@ inmem_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
|
|||||||
/*
|
/*
|
||||||
* inmem_immedsync() -- Immediately sync a relation to stable storage.
|
* inmem_immedsync() -- Immediately sync a relation to stable storage.
|
||||||
*/
|
*/
|
||||||
void
|
static void
|
||||||
inmem_immedsync(SMgrRelation reln, ForkNumber forknum)
|
inmem_immedsync(SMgrRelation reln, ForkNumber forknum)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
17
pgxn/neon_walredo/inmem_smgr.h
Normal file
17
pgxn/neon_walredo/inmem_smgr.h
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* inmem_smgr.h
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
|
||||||
|
* Portions Copyright (c) 1994, Regents of the University of California
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
#ifndef INMEM_SMGR_H
|
||||||
|
#define INMEM_SMGR_H
|
||||||
|
|
||||||
|
extern const f_smgr *smgr_inmem(BackendId backend, RelFileNode rnode);
|
||||||
|
extern void smgr_init_inmem(void);
|
||||||
|
|
||||||
|
#endif /* INMEM_SMGR_H */
|
||||||
22
pgxn/neon_walredo/neon_seccomp.h
Normal file
22
pgxn/neon_walredo/neon_seccomp.h
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
#ifndef NEON_SECCOMP_H
|
||||||
|
#define NEON_SECCOMP_H
|
||||||
|
|
||||||
|
#include <seccomp.h>
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int psr_syscall; /* syscall number */
|
||||||
|
uint32 psr_action; /* libseccomp action, e.g. SCMP_ACT_ALLOW */
|
||||||
|
} PgSeccompRule;
|
||||||
|
|
||||||
|
#define PG_SCMP(syscall, action) \
|
||||||
|
(PgSeccompRule) { \
|
||||||
|
.psr_syscall = SCMP_SYS(syscall), \
|
||||||
|
.psr_action = (action), \
|
||||||
|
}
|
||||||
|
|
||||||
|
#define PG_SCMP_ALLOW(syscall) \
|
||||||
|
PG_SCMP(syscall, SCMP_ACT_ALLOW)
|
||||||
|
|
||||||
|
extern void seccomp_load_rules(PgSeccompRule *syscalls, int count);
|
||||||
|
|
||||||
|
#endif /* NEON_SECCOMP_H */
|
||||||
257
pgxn/neon_walredo/seccomp.c
Normal file
257
pgxn/neon_walredo/seccomp.c
Normal file
@@ -0,0 +1,257 @@
|
|||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* seccomp.c
|
||||||
|
* Secure Computing BPF API wrapper.
|
||||||
|
*
|
||||||
|
* Pageserver delegates complex WAL decoding duties to postgres,
|
||||||
|
* which means that the latter might fall victim to carefully designed
|
||||||
|
* malicious WAL records and start doing harmful things to the system.
|
||||||
|
* To prevent this, it has been decided to limit possible interactions
|
||||||
|
* with the outside world using the Secure Computing BPF mode.
|
||||||
|
*
|
||||||
|
* We use this mode to disable all syscalls not in the allowlist. This
|
||||||
|
* approach has its pros & cons:
|
||||||
|
*
|
||||||
|
* - We have to carefully handpick and maintain the set of syscalls
|
||||||
|
* required for the WAL redo process. Core dumps help with that.
|
||||||
|
* The method of trial and error seems to work reasonably well,
|
||||||
|
* but it would be nice to find a proper way to "prove" that
|
||||||
|
* the set in question is both necessary and sufficient.
|
||||||
|
*
|
||||||
|
* - Once we enter the seccomp bpf mode, it's impossible to lift those
|
||||||
|
* restrictions (otherwise, what kind of "protection" would that be?).
|
||||||
|
* Thus, we have to either enable extra syscalls for the clean shutdown,
|
||||||
|
* or exit the process immediately via _exit() instead of proc_exit().
|
||||||
|
*
|
||||||
|
* - Should we simply use SCMP_ACT_KILL_PROCESS, or implement a custom
|
||||||
|
* facility to deal with the forbidden syscalls? If we'd like to embed
|
||||||
|
* a startup security test, we should go with the latter; In that
|
||||||
|
* case, which one of the following options is preferable?
|
||||||
|
*
|
||||||
|
* * Catch the denied syscalls with a signal handler using SCMP_ACT_TRAP.
|
||||||
|
* Provide a common signal handler with a static switch to override
|
||||||
|
* its behavior for the test case. This would undermine the whole
|
||||||
|
* purpose of such protection, so we'd have to go further and remap
|
||||||
|
* the memory backing the switch as readonly, then ban mprotect().
|
||||||
|
* Ugly and fragile, to say the least.
|
||||||
|
*
|
||||||
|
* * Yet again, catch the denied syscalls using SCMP_ACT_TRAP.
|
||||||
|
* Provide 2 different signal handlers: one for a test case,
|
||||||
|
* another for the main processing loop. Install the first one,
|
||||||
|
* enable seccomp, perform the test, switch to the second one,
|
||||||
|
* finally ban sigaction(), presto!
|
||||||
|
*
|
||||||
|
* * Spoof the result of a syscall using SECCOMP_RET_ERRNO for the
|
||||||
|
* test, then ban it altogether with another filter. The downside
|
||||||
|
* of this solution is that we don't actually check that
|
||||||
|
* SCMP_ACT_KILL_PROCESS/SCMP_ACT_TRAP works.
|
||||||
|
*
|
||||||
|
* Either approach seems to require two eBPF filter programs,
|
||||||
|
* which is unfortunate: the man page tells this is uncommon.
|
||||||
|
* Maybe I (@funbringer) am missing something, though; I encourage
|
||||||
|
* any reader to get familiar with it and scrutinize my conclusions.
|
||||||
|
*
|
||||||
|
* TODOs and ideas in no particular order:
|
||||||
|
*
|
||||||
|
* - Do something about mmap() in musl's malloc().
|
||||||
|
* Definitely not a priority if we don't care about musl.
|
||||||
|
*
|
||||||
|
* - See if we can untangle PG's shutdown sequence (involving unlink()):
|
||||||
|
*
|
||||||
|
* * Simplify (or rather get rid of) shmem setup in PG's WAL redo mode.
|
||||||
|
* * Investigate chroot() or mount namespaces for better FS isolation.
|
||||||
|
* * (Per Heikki) Simply call _exit(), no big deal.
|
||||||
|
* * Come up with a better idea?
|
||||||
|
*
|
||||||
|
* - Make use of seccomp's argument inspection (for what?).
|
||||||
|
* Unfortunately, it views all syscall arguments as scalars,
|
||||||
|
* so it won't work for e.g. string comparison in unlink().
|
||||||
|
*
|
||||||
|
* - Benchmark with bpf jit on/off, try seccomp_syscall_priority().
|
||||||
|
*
|
||||||
|
* - Test against various linux distros & glibc versions.
|
||||||
|
* I suspect that certain libc functions might involve slightly
|
||||||
|
* different syscalls, e.g. select/pselect6/pselect6_time64/whatever.
|
||||||
|
*
|
||||||
|
* - Test on any arch other than amd64 to see if it works there.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
|
||||||
|
/*
|
||||||
|
* I couldn't find a good way to do a conditional OBJS += seccomp.o in
|
||||||
|
* the Makefile, so this file is compiled even when seccomp is disabled,
|
||||||
|
* it's just empty in that case.
|
||||||
|
*/
|
||||||
|
#ifdef HAVE_LIBSECCOMP
|
||||||
|
|
||||||
|
#include <fcntl.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#include "miscadmin.h"
|
||||||
|
|
||||||
|
#include "neon_seccomp.h"
|
||||||
|
|
||||||
|
static void die(int code, const char *str);
|
||||||
|
|
||||||
|
static bool seccomp_test_sighandler_done = false;
|
||||||
|
static void seccomp_test_sighandler(int signum, siginfo_t *info, void *cxt);
|
||||||
|
static void seccomp_deny_sighandler(int signum, siginfo_t *info, void *cxt);
|
||||||
|
|
||||||
|
static int do_seccomp_load_rules(PgSeccompRule *rules, int count, uint32 def_action);
|
||||||
|
|
||||||
|
void
|
||||||
|
seccomp_load_rules(PgSeccompRule *rules, int count)
|
||||||
|
{
|
||||||
|
struct sigaction action = { .sa_flags = SA_SIGINFO };
|
||||||
|
PgSeccompRule rule;
|
||||||
|
long fd;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Install a test signal handler.
|
||||||
|
* XXX: pqsignal() is too restrictive for our purposes,
|
||||||
|
* since we'd like to examine the contents of siginfo_t.
|
||||||
|
*/
|
||||||
|
action.sa_sigaction = seccomp_test_sighandler;
|
||||||
|
if (sigaction(SIGSYS, &action, NULL) != 0)
|
||||||
|
ereport(FATAL,
|
||||||
|
(errcode(ERRCODE_SYSTEM_ERROR),
|
||||||
|
errmsg("seccomp: could not install test SIGSYS handler")));
|
||||||
|
|
||||||
|
/*
|
||||||
|
* First, check that open of a well-known file works.
|
||||||
|
* XXX: We use raw syscall() to call the very open().
|
||||||
|
*/
|
||||||
|
fd = syscall(SCMP_SYS(open), "/dev/null", O_RDONLY, 0);
|
||||||
|
if (seccomp_test_sighandler_done)
|
||||||
|
ereport(FATAL,
|
||||||
|
(errcode(ERRCODE_SYSTEM_ERROR),
|
||||||
|
errmsg("seccomp: signal handler test flag was set unexpectedly")));
|
||||||
|
if (fd < 0)
|
||||||
|
ereport(FATAL,
|
||||||
|
(errcode(ERRCODE_SYSTEM_ERROR),
|
||||||
|
errmsg("seccomp: could not open /dev/null for seccomp testing: %m")));
|
||||||
|
close((int) fd);
|
||||||
|
|
||||||
|
/* Set a trap on open() to test seccomp bpf */
|
||||||
|
rule = PG_SCMP(open, SCMP_ACT_TRAP);
|
||||||
|
if (do_seccomp_load_rules(&rule, 1, SCMP_ACT_ALLOW) != 0)
|
||||||
|
ereport(FATAL,
|
||||||
|
(errcode(ERRCODE_SYSTEM_ERROR),
|
||||||
|
errmsg("seccomp: could not load test trap")));
|
||||||
|
|
||||||
|
/* Finally, check that open() now raises SIGSYS */
|
||||||
|
(void) syscall(SCMP_SYS(open), "/dev/null", O_RDONLY, 0);
|
||||||
|
if (!seccomp_test_sighandler_done)
|
||||||
|
ereport(FATAL,
|
||||||
|
(errcode(ERRCODE_SYSTEM_ERROR),
|
||||||
|
errmsg("seccomp: SIGSYS handler doesn't seem to work")));
|
||||||
|
|
||||||
|
/* Now that everything seems to work, install a proper handler */
|
||||||
|
action.sa_sigaction = seccomp_deny_sighandler;
|
||||||
|
if (sigaction(SIGSYS, &action, NULL) != 0)
|
||||||
|
ereport(FATAL,
|
||||||
|
(errcode(ERRCODE_SYSTEM_ERROR),
|
||||||
|
errmsg("seccomp: could not install SIGSYS handler")));
|
||||||
|
|
||||||
|
/* If this succeeds, any syscall not in the list will crash the process */
|
||||||
|
if (do_seccomp_load_rules(rules, count, SCMP_ACT_TRAP) != 0)
|
||||||
|
ereport(FATAL,
|
||||||
|
(errcode(ERRCODE_SYSTEM_ERROR),
|
||||||
|
errmsg("seccomp: could not enter seccomp mode")));
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Enter seccomp mode with a BPF filter that will only allow
|
||||||
|
* certain syscalls to proceed.
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
do_seccomp_load_rules(PgSeccompRule *rules, int count, uint32 def_action)
|
||||||
|
{
|
||||||
|
scmp_filter_ctx ctx;
|
||||||
|
int rc = -1;
|
||||||
|
|
||||||
|
/* Create a context with a default action for syscalls not in the list */
|
||||||
|
if ((ctx = seccomp_init(def_action)) == NULL)
|
||||||
|
goto cleanup;
|
||||||
|
|
||||||
|
for (int i = 0; i < count; i++)
|
||||||
|
{
|
||||||
|
PgSeccompRule *rule = &rules[i];
|
||||||
|
if ((rc = seccomp_rule_add(ctx, rule->psr_action, rule->psr_syscall, 0)) != 0)
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Try building & loading the program into the kernel */
|
||||||
|
if ((rc = seccomp_load(ctx)) != 0)
|
||||||
|
goto cleanup;
|
||||||
|
|
||||||
|
cleanup:
|
||||||
|
/*
|
||||||
|
* We don't need the context anymore regardless of the result,
|
||||||
|
* since either we failed or the eBPF program has already been
|
||||||
|
* loaded into the linux kernel.
|
||||||
|
*/
|
||||||
|
seccomp_release(ctx);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
die(int code, const char *str)
|
||||||
|
{
|
||||||
|
/* work around gcc ignoring that it shouldn't warn on (void) result being unused */
|
||||||
|
ssize_t _unused pg_attribute_unused();
|
||||||
|
/* Best effort write to stderr */
|
||||||
|
_unused = write(fileno(stderr), str, strlen(str));
|
||||||
|
|
||||||
|
/* XXX: we don't want to run any atexit callbacks */
|
||||||
|
_exit(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
seccomp_test_sighandler(int signum, siginfo_t *info, void *cxt pg_attribute_unused())
|
||||||
|
{
|
||||||
|
#define DIE_PREFIX "seccomp test signal handler: "
|
||||||
|
|
||||||
|
/* Check that this signal handler is used only for a single test case */
|
||||||
|
if (seccomp_test_sighandler_done)
|
||||||
|
die(1, DIE_PREFIX "test handler should only be used for 1 test\n");
|
||||||
|
seccomp_test_sighandler_done = true;
|
||||||
|
|
||||||
|
if (signum != SIGSYS)
|
||||||
|
die(1, DIE_PREFIX "bad signal number\n");
|
||||||
|
|
||||||
|
/* TODO: maybe somehow extract the hardcoded syscall number */
|
||||||
|
if (info->si_syscall != SCMP_SYS(open))
|
||||||
|
die(1, DIE_PREFIX "bad syscall number\n");
|
||||||
|
|
||||||
|
#undef DIE_PREFIX
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
seccomp_deny_sighandler(int signum, siginfo_t *info, void *cxt pg_attribute_unused())
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Unfortunately, we can't use seccomp_syscall_resolve_num_arch()
|
||||||
|
* to resolve the syscall's name, since it calls strdup()
|
||||||
|
* under the hood (wtf!).
|
||||||
|
*/
|
||||||
|
char buffer[128];
|
||||||
|
(void)snprintf(buffer, lengthof(buffer),
|
||||||
|
"---------------------------------------\n"
|
||||||
|
"seccomp: bad syscall %d\n"
|
||||||
|
"---------------------------------------\n",
|
||||||
|
info->si_syscall);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Instead of silently crashing the process with
|
||||||
|
* a fake SIGSYS caused by SCMP_ACT_KILL_PROCESS,
|
||||||
|
* we'd like to receive a real SIGSYS to print the
|
||||||
|
* message and *then* immediately exit.
|
||||||
|
*/
|
||||||
|
die(1, buffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif /* HAVE_LIBSECCOMP */
|
||||||
847
pgxn/neon_walredo/walredoproc.c
Normal file
847
pgxn/neon_walredo/walredoproc.c
Normal file
@@ -0,0 +1,847 @@
|
|||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* walredoproc.c
|
||||||
|
* Entry point for WAL redo helper
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* This file contains an alternative main() function for the 'postgres'
|
||||||
|
* binary. In the special mode, we go into a special mode that's similar
|
||||||
|
* to the single user mode. We don't launch postmaster or any auxiliary
|
||||||
|
* processes. Instead, we wait for command from 'stdin', and respond to
|
||||||
|
* 'stdout'.
|
||||||
|
*
|
||||||
|
* The protocol through stdin/stdout is loosely based on the libpq protocol.
|
||||||
|
* The process accepts messages through stdin, and each message has the format:
|
||||||
|
*
|
||||||
|
* char msgtype;
|
||||||
|
* int32 length; // length of message including 'length' but excluding
|
||||||
|
* // 'msgtype', in network byte order
|
||||||
|
* <payload>
|
||||||
|
*
|
||||||
|
* There are three message types:
|
||||||
|
*
|
||||||
|
* BeginRedoForBlock ('B'): Prepare for WAL replay for given block
|
||||||
|
* PushPage ('P'): Copy a page image (in the payload) to buffer cache
|
||||||
|
* ApplyRecord ('A'): Apply a WAL record (in the payload)
|
||||||
|
* GetPage ('G'): Return a page image from buffer cache.
|
||||||
|
*
|
||||||
|
* Currently, you only get a response to GetPage requests; the response is
|
||||||
|
* simply a 8k page, without any headers. Errors are logged to stderr.
|
||||||
|
*
|
||||||
|
* FIXME:
|
||||||
|
* - this currently requires a valid PGDATA, and creates a lock file there
|
||||||
|
* like a normal postmaster. There's no fundamental reason for that, though.
|
||||||
|
* - should have EndRedoForBlock, and flush page cache, to allow using this
|
||||||
|
* mechanism for more than one block without restarting the process.
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
|
||||||
|
* Portions Copyright (c) 1994, Regents of the University of California
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
|
||||||
|
#include <fcntl.h>
|
||||||
|
#include <limits.h>
|
||||||
|
#include <signal.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <sys/socket.h>
|
||||||
|
#ifdef HAVE_SYS_SELECT_H
|
||||||
|
#include <sys/select.h>
|
||||||
|
#endif
|
||||||
|
#ifdef HAVE_SYS_RESOURCE_H
|
||||||
|
#include <sys/time.h>
|
||||||
|
#include <sys/resource.h>
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#if defined(HAVE_LIBSECCOMP) && defined(__GLIBC__)
|
||||||
|
#define MALLOC_NO_MMAP
|
||||||
|
#include <malloc.h>
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#ifndef HAVE_GETRUSAGE
|
||||||
|
#include "rusagestub.h"
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include "access/xlog.h"
|
||||||
|
#include "access/xlog_internal.h"
|
||||||
|
#if PG_VERSION_NUM >= 150000
|
||||||
|
#include "access/xlogrecovery.h"
|
||||||
|
#endif
|
||||||
|
#include "access/xlogutils.h"
|
||||||
|
#include "catalog/pg_class.h"
|
||||||
|
#include "libpq/libpq.h"
|
||||||
|
#include "libpq/pqformat.h"
|
||||||
|
#include "miscadmin.h"
|
||||||
|
#include "postmaster/postmaster.h"
|
||||||
|
#include "storage/buf_internals.h"
|
||||||
|
#include "storage/bufmgr.h"
|
||||||
|
#include "storage/ipc.h"
|
||||||
|
#include "storage/proc.h"
|
||||||
|
#include "storage/smgr.h"
|
||||||
|
#include "tcop/tcopprot.h"
|
||||||
|
#include "utils/memutils.h"
|
||||||
|
#include "utils/ps_status.h"
|
||||||
|
|
||||||
|
#include "inmem_smgr.h"
|
||||||
|
|
||||||
|
#ifdef HAVE_LIBSECCOMP
|
||||||
|
#include "neon_seccomp.h"
|
||||||
|
#endif
|
||||||
|
|
||||||
|
PG_MODULE_MAGIC;
|
||||||
|
|
||||||
|
static int ReadRedoCommand(StringInfo inBuf);
|
||||||
|
static void BeginRedoForBlock(StringInfo input_message);
|
||||||
|
static void PushPage(StringInfo input_message);
|
||||||
|
static void ApplyRecord(StringInfo input_message);
|
||||||
|
static void apply_error_callback(void *arg);
|
||||||
|
static bool redo_block_filter(XLogReaderState *record, uint8 block_id);
|
||||||
|
static void GetPage(StringInfo input_message);
|
||||||
|
static ssize_t buffered_read(void *buf, size_t count);
|
||||||
|
|
||||||
|
static BufferTag target_redo_tag;
|
||||||
|
|
||||||
|
static XLogReaderState *reader_state;
|
||||||
|
|
||||||
|
#define TRACE DEBUG5
|
||||||
|
|
||||||
|
#ifdef HAVE_LIBSECCOMP
|
||||||
|
static void
|
||||||
|
enter_seccomp_mode(void)
|
||||||
|
{
|
||||||
|
PgSeccompRule syscalls[] =
|
||||||
|
{
|
||||||
|
/* Hard requirements */
|
||||||
|
PG_SCMP_ALLOW(exit_group),
|
||||||
|
PG_SCMP_ALLOW(pselect6),
|
||||||
|
PG_SCMP_ALLOW(read),
|
||||||
|
PG_SCMP_ALLOW(select),
|
||||||
|
PG_SCMP_ALLOW(write),
|
||||||
|
|
||||||
|
/* Memory allocation */
|
||||||
|
PG_SCMP_ALLOW(brk),
|
||||||
|
#ifndef MALLOC_NO_MMAP
|
||||||
|
/* TODO: musl doesn't have mallopt */
|
||||||
|
PG_SCMP_ALLOW(mmap),
|
||||||
|
PG_SCMP_ALLOW(munmap),
|
||||||
|
#endif
|
||||||
|
/*
|
||||||
|
* getpid() is called on assertion failure, in ExceptionalCondition.
|
||||||
|
* It's not really needed, but seems pointless to hide it either. The
|
||||||
|
* system call unlikely to expose a kernel vulnerability, and the PID
|
||||||
|
* is stored in MyProcPid anyway.
|
||||||
|
*/
|
||||||
|
PG_SCMP_ALLOW(getpid),
|
||||||
|
|
||||||
|
/* Enable those for a proper shutdown.
|
||||||
|
PG_SCMP_ALLOW(munmap),
|
||||||
|
PG_SCMP_ALLOW(shmctl),
|
||||||
|
PG_SCMP_ALLOW(shmdt),
|
||||||
|
PG_SCMP_ALLOW(unlink), // shm_unlink
|
||||||
|
*/
|
||||||
|
};
|
||||||
|
|
||||||
|
#ifdef MALLOC_NO_MMAP
|
||||||
|
/* Ask glibc not to use mmap() */
|
||||||
|
mallopt(M_MMAP_MAX, 0);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
seccomp_load_rules(syscalls, lengthof(syscalls));
|
||||||
|
}
|
||||||
|
#endif /* HAVE_LIBSECCOMP */
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Entry point for the WAL redo process.
|
||||||
|
*
|
||||||
|
* Performs similar initialization as PostgresMain does for normal
|
||||||
|
* backend processes. Some initialization was done in CallExtMain
|
||||||
|
* already.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
WalRedoMain(int argc, char *argv[])
|
||||||
|
{
|
||||||
|
int firstchar;
|
||||||
|
StringInfoData input_message;
|
||||||
|
#ifdef HAVE_LIBSECCOMP
|
||||||
|
bool enable_seccomp;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
am_wal_redo_postgres = true;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* WAL redo does not need a large number of buffers. And speed of
|
||||||
|
* DropRelFileNodeAllLocalBuffers() is proportional to the number of
|
||||||
|
* buffers. So let's keep it small (default value is 1024)
|
||||||
|
*/
|
||||||
|
num_temp_buffers = 4;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* install the simple in-memory smgr
|
||||||
|
*/
|
||||||
|
smgr_hook = smgr_inmem;
|
||||||
|
smgr_init_hook = smgr_init_inmem;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Validate we have been given a reasonable-looking DataDir and change into it.
|
||||||
|
*/
|
||||||
|
checkDataDir();
|
||||||
|
ChangeToDataDir();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Create lockfile for data directory.
|
||||||
|
*/
|
||||||
|
CreateDataDirLockFile(false);
|
||||||
|
|
||||||
|
/* read control file (error checking and contains config ) */
|
||||||
|
LocalProcessControlFile(false);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* process any libraries that should be preloaded at postmaster start
|
||||||
|
*/
|
||||||
|
process_shared_preload_libraries();
|
||||||
|
|
||||||
|
/* Initialize MaxBackends (if under postmaster, was done already) */
|
||||||
|
InitializeMaxBackends();
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= 150000
|
||||||
|
/*
|
||||||
|
* Give preloaded libraries a chance to request additional shared memory.
|
||||||
|
*/
|
||||||
|
process_shmem_requests();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Now that loadable modules have had their chance to request additional
|
||||||
|
* shared memory, determine the value of any runtime-computed GUCs that
|
||||||
|
* depend on the amount of shared memory required.
|
||||||
|
*/
|
||||||
|
InitializeShmemGUCs();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Now that modules have been loaded, we can process any custom resource
|
||||||
|
* managers specified in the wal_consistency_checking GUC.
|
||||||
|
*/
|
||||||
|
InitializeWalConsistencyChecking();
|
||||||
|
#endif
|
||||||
|
|
||||||
|
CreateSharedMemoryAndSemaphores();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Remember stand-alone backend startup time,roughly at the same point
|
||||||
|
* during startup that postmaster does so.
|
||||||
|
*/
|
||||||
|
PgStartTime = GetCurrentTimestamp();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Create a per-backend PGPROC struct in shared memory. We must do
|
||||||
|
* this before we can use LWLocks.
|
||||||
|
*/
|
||||||
|
InitAuxiliaryProcess();
|
||||||
|
|
||||||
|
SetProcessingMode(NormalProcessing);
|
||||||
|
|
||||||
|
/* Redo routines won't work if we're not "in recovery" */
|
||||||
|
InRecovery = true;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Create the memory context we will use in the main loop.
|
||||||
|
*
|
||||||
|
* MessageContext is reset once per iteration of the main loop, ie, upon
|
||||||
|
* completion of processing of each command message from the client.
|
||||||
|
*/
|
||||||
|
MessageContext = AllocSetContextCreate(TopMemoryContext,
|
||||||
|
"MessageContext",
|
||||||
|
ALLOCSET_DEFAULT_SIZES);
|
||||||
|
|
||||||
|
/* we need a ResourceOwner to hold buffer pins */
|
||||||
|
Assert(CurrentResourceOwner == NULL);
|
||||||
|
CurrentResourceOwner = ResourceOwnerCreate(NULL, "wal redo");
|
||||||
|
|
||||||
|
/* Initialize resource managers */
|
||||||
|
for (int rmid = 0; rmid <= RM_MAX_ID; rmid++)
|
||||||
|
{
|
||||||
|
if (RmgrTable[rmid].rm_startup != NULL)
|
||||||
|
RmgrTable[rmid].rm_startup();
|
||||||
|
}
|
||||||
|
reader_state = XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(), NULL);
|
||||||
|
|
||||||
|
#ifdef HAVE_LIBSECCOMP
|
||||||
|
/* We prefer opt-out to opt-in for greater security */
|
||||||
|
enable_seccomp = true;
|
||||||
|
for (int i = 1; i < argc; i++)
|
||||||
|
if (strcmp(argv[i], "--disable-seccomp") == 0)
|
||||||
|
enable_seccomp = false;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We deliberately delay the transition to the seccomp mode
|
||||||
|
* until it's time to enter the main processing loop;
|
||||||
|
* else we'd have to add a lot more syscalls to the allowlist.
|
||||||
|
*/
|
||||||
|
if (enable_seccomp)
|
||||||
|
enter_seccomp_mode();
|
||||||
|
#endif /* HAVE_LIBSECCOMP */
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Main processing loop
|
||||||
|
*/
|
||||||
|
MemoryContextSwitchTo(MessageContext);
|
||||||
|
initStringInfo(&input_message);
|
||||||
|
|
||||||
|
for (;;)
|
||||||
|
{
|
||||||
|
/* Release memory left over from prior query cycle. */
|
||||||
|
resetStringInfo(&input_message);
|
||||||
|
|
||||||
|
set_ps_display("idle");
|
||||||
|
|
||||||
|
/*
|
||||||
|
* (3) read a command (loop blocks here)
|
||||||
|
*/
|
||||||
|
firstchar = ReadRedoCommand(&input_message);
|
||||||
|
switch (firstchar)
|
||||||
|
{
|
||||||
|
case 'B': /* BeginRedoForBlock */
|
||||||
|
BeginRedoForBlock(&input_message);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case 'P': /* PushPage */
|
||||||
|
PushPage(&input_message);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case 'A': /* ApplyRecord */
|
||||||
|
ApplyRecord(&input_message);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case 'G': /* GetPage */
|
||||||
|
GetPage(&input_message);
|
||||||
|
break;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* EOF means we're done. Perform normal shutdown.
|
||||||
|
*/
|
||||||
|
case EOF:
|
||||||
|
ereport(LOG,
|
||||||
|
(errmsg("received EOF on stdin, shutting down")));
|
||||||
|
|
||||||
|
#ifdef HAVE_LIBSECCOMP
|
||||||
|
/*
|
||||||
|
* Skip the shutdown sequence, leaving some garbage behind.
|
||||||
|
* Hopefully, postgres will clean it up in the next run.
|
||||||
|
* This way we don't have to enable extra syscalls, which is nice.
|
||||||
|
* See enter_seccomp_mode() above.
|
||||||
|
*/
|
||||||
|
if (enable_seccomp)
|
||||||
|
_exit(0);
|
||||||
|
#endif /* HAVE_LIBSECCOMP */
|
||||||
|
/*
|
||||||
|
* NOTE: if you are tempted to add more code here, DON'T!
|
||||||
|
* Whatever you had in mind to do should be set up as an
|
||||||
|
* on_proc_exit or on_shmem_exit callback, instead. Otherwise
|
||||||
|
* it will fail to be called during other backend-shutdown
|
||||||
|
* scenarios.
|
||||||
|
*/
|
||||||
|
proc_exit(0);
|
||||||
|
|
||||||
|
default:
|
||||||
|
ereport(FATAL,
|
||||||
|
(errcode(ERRCODE_PROTOCOL_VIOLATION),
|
||||||
|
errmsg("invalid frontend message type %d",
|
||||||
|
firstchar)));
|
||||||
|
}
|
||||||
|
} /* end of input-reading loop */
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* Version compatility wrapper for ReadBufferWithoutRelcache */
|
||||||
|
static inline Buffer
|
||||||
|
NeonRedoReadBuffer(RelFileNode rnode,
|
||||||
|
ForkNumber forkNum, BlockNumber blockNum,
|
||||||
|
ReadBufferMode mode)
|
||||||
|
{
|
||||||
|
#if PG_VERSION_NUM >= 150000
|
||||||
|
return ReadBufferWithoutRelcache(rnode, forkNum, blockNum, mode,
|
||||||
|
NULL, /* no strategy */
|
||||||
|
true); /* WAL redo is only performed on permanent rels */
|
||||||
|
#else
|
||||||
|
return ReadBufferWithoutRelcache(rnode, forkNum, blockNum, mode,
|
||||||
|
NULL); /* no strategy */
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Some debug function that may be handy for now.
|
||||||
|
*/
|
||||||
|
pg_attribute_unused()
|
||||||
|
static char *
|
||||||
|
pprint_buffer(char *data, int len)
|
||||||
|
{
|
||||||
|
StringInfoData s;
|
||||||
|
|
||||||
|
initStringInfo(&s);
|
||||||
|
appendStringInfo(&s, "\n");
|
||||||
|
for (int i = 0; i < len; i++) {
|
||||||
|
|
||||||
|
appendStringInfo(&s, "%02x ", (*(((char *) data) + i) & 0xff) );
|
||||||
|
if (i % 32 == 31) {
|
||||||
|
appendStringInfo(&s, "\n");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
appendStringInfo(&s, "\n");
|
||||||
|
|
||||||
|
return s.data;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* ----------------------------------------------------------------
|
||||||
|
* routines to obtain user input
|
||||||
|
* ----------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Read next command from the client.
|
||||||
|
*
|
||||||
|
* the string entered by the user is placed in its parameter inBuf,
|
||||||
|
* and we act like a Q message was received.
|
||||||
|
*
|
||||||
|
* EOF is returned if end-of-file input is seen; time to shut down.
|
||||||
|
* ----------------
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
ReadRedoCommand(StringInfo inBuf)
|
||||||
|
{
|
||||||
|
ssize_t ret;
|
||||||
|
char hdr[1 + sizeof(int32)];
|
||||||
|
int qtype;
|
||||||
|
int32 len;
|
||||||
|
|
||||||
|
/* Read message type and message length */
|
||||||
|
ret = buffered_read(hdr, sizeof(hdr));
|
||||||
|
if (ret != sizeof(hdr))
|
||||||
|
{
|
||||||
|
if (ret == 0)
|
||||||
|
return EOF;
|
||||||
|
else if (ret < 0)
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_CONNECTION_FAILURE),
|
||||||
|
errmsg("could not read message header: %m")));
|
||||||
|
else
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_PROTOCOL_VIOLATION),
|
||||||
|
errmsg("unexpected EOF")));
|
||||||
|
}
|
||||||
|
|
||||||
|
qtype = hdr[0];
|
||||||
|
memcpy(&len, &hdr[1], sizeof(int32));
|
||||||
|
len = pg_ntoh32(len);
|
||||||
|
|
||||||
|
if (len < 4)
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_PROTOCOL_VIOLATION),
|
||||||
|
errmsg("invalid message length")));
|
||||||
|
|
||||||
|
len -= 4; /* discount length itself */
|
||||||
|
|
||||||
|
/* Read the message payload */
|
||||||
|
enlargeStringInfo(inBuf, len);
|
||||||
|
ret = buffered_read(inBuf->data, len);
|
||||||
|
if (ret != len)
|
||||||
|
{
|
||||||
|
if (ret < 0)
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_CONNECTION_FAILURE),
|
||||||
|
errmsg("could not read message: %m")));
|
||||||
|
else
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_PROTOCOL_VIOLATION),
|
||||||
|
errmsg("unexpected EOF")));
|
||||||
|
}
|
||||||
|
inBuf->len = len;
|
||||||
|
inBuf->data[len] = '\0';
|
||||||
|
|
||||||
|
return qtype;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Prepare for WAL replay on given block
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
BeginRedoForBlock(StringInfo input_message)
|
||||||
|
{
|
||||||
|
RelFileNode rnode;
|
||||||
|
ForkNumber forknum;
|
||||||
|
BlockNumber blknum;
|
||||||
|
SMgrRelation reln;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* message format:
|
||||||
|
*
|
||||||
|
* spcNode
|
||||||
|
* dbNode
|
||||||
|
* relNode
|
||||||
|
* ForkNumber
|
||||||
|
* BlockNumber
|
||||||
|
*/
|
||||||
|
forknum = pq_getmsgbyte(input_message);
|
||||||
|
rnode.spcNode = pq_getmsgint(input_message, 4);
|
||||||
|
rnode.dbNode = pq_getmsgint(input_message, 4);
|
||||||
|
rnode.relNode = pq_getmsgint(input_message, 4);
|
||||||
|
blknum = pq_getmsgint(input_message, 4);
|
||||||
|
wal_redo_buffer = InvalidBuffer;
|
||||||
|
|
||||||
|
INIT_BUFFERTAG(target_redo_tag, rnode, forknum, blknum);
|
||||||
|
|
||||||
|
elog(TRACE, "BeginRedoForBlock %u/%u/%u.%d blk %u",
|
||||||
|
target_redo_tag.rnode.spcNode,
|
||||||
|
target_redo_tag.rnode.dbNode,
|
||||||
|
target_redo_tag.rnode.relNode,
|
||||||
|
target_redo_tag.forkNum,
|
||||||
|
target_redo_tag.blockNum);
|
||||||
|
|
||||||
|
reln = smgropen(rnode, InvalidBackendId, RELPERSISTENCE_PERMANENT);
|
||||||
|
if (reln->smgr_cached_nblocks[forknum] == InvalidBlockNumber ||
|
||||||
|
reln->smgr_cached_nblocks[forknum] < blknum + 1)
|
||||||
|
{
|
||||||
|
reln->smgr_cached_nblocks[forknum] = blknum + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Receive a page given by the client, and put it into buffer cache.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
PushPage(StringInfo input_message)
|
||||||
|
{
|
||||||
|
RelFileNode rnode;
|
||||||
|
ForkNumber forknum;
|
||||||
|
BlockNumber blknum;
|
||||||
|
const char *content;
|
||||||
|
Buffer buf;
|
||||||
|
Page page;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* message format:
|
||||||
|
*
|
||||||
|
* spcNode
|
||||||
|
* dbNode
|
||||||
|
* relNode
|
||||||
|
* ForkNumber
|
||||||
|
* BlockNumber
|
||||||
|
* 8k page content
|
||||||
|
*/
|
||||||
|
forknum = pq_getmsgbyte(input_message);
|
||||||
|
rnode.spcNode = pq_getmsgint(input_message, 4);
|
||||||
|
rnode.dbNode = pq_getmsgint(input_message, 4);
|
||||||
|
rnode.relNode = pq_getmsgint(input_message, 4);
|
||||||
|
blknum = pq_getmsgint(input_message, 4);
|
||||||
|
content = pq_getmsgbytes(input_message, BLCKSZ);
|
||||||
|
|
||||||
|
buf = NeonRedoReadBuffer(rnode, forknum, blknum, RBM_ZERO_AND_LOCK);
|
||||||
|
wal_redo_buffer = buf;
|
||||||
|
page = BufferGetPage(buf);
|
||||||
|
memcpy(page, content, BLCKSZ);
|
||||||
|
MarkBufferDirty(buf); /* pro forma */
|
||||||
|
UnlockReleaseBuffer(buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Receive a WAL record, and apply it.
|
||||||
|
*
|
||||||
|
* All the pages should be loaded into the buffer cache by PushPage calls already.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ApplyRecord(StringInfo input_message)
|
||||||
|
{
|
||||||
|
char *errormsg;
|
||||||
|
XLogRecPtr lsn;
|
||||||
|
XLogRecord *record;
|
||||||
|
int nleft;
|
||||||
|
ErrorContextCallback errcallback;
|
||||||
|
#if PG_VERSION_NUM >= 150000
|
||||||
|
DecodedXLogRecord *decoded;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/*
|
||||||
|
* message format:
|
||||||
|
*
|
||||||
|
* LSN (the *end* of the record)
|
||||||
|
* record
|
||||||
|
*/
|
||||||
|
lsn = pq_getmsgint64(input_message);
|
||||||
|
|
||||||
|
smgrinit(); /* reset inmem smgr state */
|
||||||
|
|
||||||
|
/* note: the input must be aligned here */
|
||||||
|
record = (XLogRecord *) pq_getmsgbytes(input_message, sizeof(XLogRecord));
|
||||||
|
|
||||||
|
nleft = input_message->len - input_message->cursor;
|
||||||
|
if (record->xl_tot_len != sizeof(XLogRecord) + nleft)
|
||||||
|
elog(ERROR, "mismatch between record (%d) and message size (%d)",
|
||||||
|
record->xl_tot_len, (int) sizeof(XLogRecord) + nleft);
|
||||||
|
|
||||||
|
/* Setup error traceback support for ereport() */
|
||||||
|
errcallback.callback = apply_error_callback;
|
||||||
|
errcallback.arg = (void *) reader_state;
|
||||||
|
errcallback.previous = error_context_stack;
|
||||||
|
error_context_stack = &errcallback;
|
||||||
|
|
||||||
|
XLogBeginRead(reader_state, lsn);
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= 150000
|
||||||
|
decoded = (DecodedXLogRecord *) XLogReadRecordAlloc(reader_state, record->xl_tot_len, true);
|
||||||
|
|
||||||
|
if (!DecodeXLogRecord(reader_state, decoded, record, lsn, &errormsg))
|
||||||
|
elog(ERROR, "failed to decode WAL record: %s", errormsg);
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* Record the location of the next record. */
|
||||||
|
decoded->next_lsn = reader_state->NextRecPtr;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If it's in the decode buffer, mark the decode buffer space as
|
||||||
|
* occupied.
|
||||||
|
*/
|
||||||
|
if (!decoded->oversized)
|
||||||
|
{
|
||||||
|
/* The new decode buffer head must be MAXALIGNed. */
|
||||||
|
Assert(decoded->size == MAXALIGN(decoded->size));
|
||||||
|
if ((char *) decoded == reader_state->decode_buffer)
|
||||||
|
reader_state->decode_buffer_tail = reader_state->decode_buffer + decoded->size;
|
||||||
|
else
|
||||||
|
reader_state->decode_buffer_tail += decoded->size;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Insert it into the queue of decoded records. */
|
||||||
|
Assert(reader_state->decode_queue_tail != decoded);
|
||||||
|
if (reader_state->decode_queue_tail)
|
||||||
|
reader_state->decode_queue_tail->next = decoded;
|
||||||
|
reader_state->decode_queue_tail = decoded;
|
||||||
|
if (!reader_state->decode_queue_head)
|
||||||
|
reader_state->decode_queue_head = decoded;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Update the pointers to the beginning and one-past-the-end of this
|
||||||
|
* record, again for the benefit of historical code that expected the
|
||||||
|
* decoder to track this rather than accessing these fields of the record
|
||||||
|
* itself.
|
||||||
|
*/
|
||||||
|
reader_state->record = reader_state->decode_queue_head;
|
||||||
|
reader_state->ReadRecPtr = reader_state->record->lsn;
|
||||||
|
reader_state->EndRecPtr = reader_state->record->next_lsn;
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
/*
|
||||||
|
* In lieu of calling XLogReadRecord, store the record 'decoded_record'
|
||||||
|
* buffer directly.
|
||||||
|
*/
|
||||||
|
reader_state->ReadRecPtr = lsn;
|
||||||
|
reader_state->decoded_record = record;
|
||||||
|
if (!DecodeXLogRecord(reader_state, record, &errormsg))
|
||||||
|
elog(ERROR, "failed to decode WAL record: %s", errormsg);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/* Ignore any other blocks than the ones the caller is interested in */
|
||||||
|
redo_read_buffer_filter = redo_block_filter;
|
||||||
|
|
||||||
|
RmgrTable[record->xl_rmid].rm_redo(reader_state);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If no base image of the page was provided by PushPage, initialize
|
||||||
|
* wal_redo_buffer here. The first WAL record must initialize the page
|
||||||
|
* in that case.
|
||||||
|
*/
|
||||||
|
if (BufferIsInvalid(wal_redo_buffer))
|
||||||
|
{
|
||||||
|
wal_redo_buffer = NeonRedoReadBuffer(target_redo_tag.rnode,
|
||||||
|
target_redo_tag.forkNum,
|
||||||
|
target_redo_tag.blockNum,
|
||||||
|
RBM_NORMAL);
|
||||||
|
Assert(!BufferIsInvalid(wal_redo_buffer));
|
||||||
|
ReleaseBuffer(wal_redo_buffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
redo_read_buffer_filter = NULL;
|
||||||
|
|
||||||
|
/* Pop the error context stack */
|
||||||
|
error_context_stack = errcallback.previous;
|
||||||
|
|
||||||
|
elog(TRACE, "applied WAL record with LSN %X/%X",
|
||||||
|
(uint32) (lsn >> 32), (uint32) lsn);
|
||||||
|
#if PG_VERSION_NUM >= 150000
|
||||||
|
if (decoded && decoded->oversized)
|
||||||
|
pfree(decoded);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Error context callback for errors occurring during ApplyRecord
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
apply_error_callback(void *arg)
|
||||||
|
{
|
||||||
|
XLogReaderState *record = (XLogReaderState *) arg;
|
||||||
|
StringInfoData buf;
|
||||||
|
|
||||||
|
initStringInfo(&buf);
|
||||||
|
xlog_outdesc(&buf, record);
|
||||||
|
|
||||||
|
/* translator: %s is a WAL record description */
|
||||||
|
errcontext("WAL redo at %X/%X for %s",
|
||||||
|
LSN_FORMAT_ARGS(record->ReadRecPtr),
|
||||||
|
buf.data);
|
||||||
|
|
||||||
|
pfree(buf.data);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
static bool
|
||||||
|
redo_block_filter(XLogReaderState *record, uint8 block_id)
|
||||||
|
{
|
||||||
|
BufferTag target_tag;
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= 150000
|
||||||
|
XLogRecGetBlockTag(record, block_id,
|
||||||
|
&target_tag.rnode, &target_tag.forkNum, &target_tag.blockNum);
|
||||||
|
#else
|
||||||
|
if (!XLogRecGetBlockTag(record, block_id,
|
||||||
|
&target_tag.rnode, &target_tag.forkNum, &target_tag.blockNum))
|
||||||
|
{
|
||||||
|
/* Caller specified a bogus block_id */
|
||||||
|
elog(PANIC, "failed to locate backup block with ID %d", block_id);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Can a WAL redo function ever access a relation other than the one that
|
||||||
|
* it modifies? I don't see why it would.
|
||||||
|
*/
|
||||||
|
if (!RelFileNodeEquals(target_tag.rnode, target_redo_tag.rnode))
|
||||||
|
elog(WARNING, "REDO accessing unexpected page: %u/%u/%u.%u blk %u",
|
||||||
|
target_tag.rnode.spcNode, target_tag.rnode.dbNode, target_tag.rnode.relNode, target_tag.forkNum, target_tag.blockNum);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If this block isn't one we are currently restoring, then return 'true'
|
||||||
|
* so that this gets ignored
|
||||||
|
*/
|
||||||
|
return !BUFFERTAGS_EQUAL(target_tag, target_redo_tag);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Get a page image back from buffer cache.
|
||||||
|
*
|
||||||
|
* After applying some records.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
GetPage(StringInfo input_message)
|
||||||
|
{
|
||||||
|
RelFileNode rnode;
|
||||||
|
ForkNumber forknum;
|
||||||
|
BlockNumber blknum;
|
||||||
|
Buffer buf;
|
||||||
|
Page page;
|
||||||
|
int tot_written;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* message format:
|
||||||
|
*
|
||||||
|
* spcNode
|
||||||
|
* dbNode
|
||||||
|
* relNode
|
||||||
|
* ForkNumber
|
||||||
|
* BlockNumber
|
||||||
|
*/
|
||||||
|
forknum = pq_getmsgbyte(input_message);
|
||||||
|
rnode.spcNode = pq_getmsgint(input_message, 4);
|
||||||
|
rnode.dbNode = pq_getmsgint(input_message, 4);
|
||||||
|
rnode.relNode = pq_getmsgint(input_message, 4);
|
||||||
|
blknum = pq_getmsgint(input_message, 4);
|
||||||
|
|
||||||
|
/* FIXME: check that we got a BeginRedoForBlock message or this earlier */
|
||||||
|
|
||||||
|
buf = NeonRedoReadBuffer(rnode, forknum, blknum, RBM_NORMAL);
|
||||||
|
Assert(buf == wal_redo_buffer);
|
||||||
|
page = BufferGetPage(buf);
|
||||||
|
/* single thread, so don't bother locking the page */
|
||||||
|
|
||||||
|
/* Response: Page content */
|
||||||
|
tot_written = 0;
|
||||||
|
do {
|
||||||
|
ssize_t rc;
|
||||||
|
|
||||||
|
rc = write(STDOUT_FILENO, &page[tot_written], BLCKSZ - tot_written);
|
||||||
|
if (rc < 0) {
|
||||||
|
/* If interrupted by signal, just retry */
|
||||||
|
if (errno == EINTR)
|
||||||
|
continue;
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode_for_file_access(),
|
||||||
|
errmsg("could not write to stdout: %m")));
|
||||||
|
}
|
||||||
|
tot_written += rc;
|
||||||
|
} while (tot_written < BLCKSZ);
|
||||||
|
|
||||||
|
ReleaseBuffer(buf);
|
||||||
|
DropRelFileNodeAllLocalBuffers(rnode);
|
||||||
|
wal_redo_buffer = InvalidBuffer;
|
||||||
|
|
||||||
|
elog(TRACE, "Page sent back for block %u", blknum);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* Buffer used by buffered_read() */
|
||||||
|
static char stdin_buf[16 * 1024];
|
||||||
|
static size_t stdin_len = 0; /* # of bytes in buffer */
|
||||||
|
static size_t stdin_ptr = 0; /* # of bytes already consumed */
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Like read() on stdin, but buffered.
|
||||||
|
*
|
||||||
|
* We cannot use libc's buffered fread(), because it uses syscalls that we
|
||||||
|
* have disabled with seccomp(). Depending on the platform, it can call
|
||||||
|
* 'fstat' or 'newfstatat'. 'fstat' is probably harmless, but 'newfstatat'
|
||||||
|
* seems problematic because it allows interrogating files by path name.
|
||||||
|
*
|
||||||
|
* The return value is the number of bytes read. On error, -1 is returned, and
|
||||||
|
* errno is set appropriately. Unlike read(), this fills the buffer completely
|
||||||
|
* unless an error happens or EOF is reached.
|
||||||
|
*/
|
||||||
|
static ssize_t
|
||||||
|
buffered_read(void *buf, size_t count)
|
||||||
|
{
|
||||||
|
char *dst = buf;
|
||||||
|
|
||||||
|
while (count > 0)
|
||||||
|
{
|
||||||
|
size_t nthis;
|
||||||
|
|
||||||
|
if (stdin_ptr == stdin_len)
|
||||||
|
{
|
||||||
|
ssize_t ret;
|
||||||
|
|
||||||
|
ret = read(STDIN_FILENO, stdin_buf, sizeof(stdin_buf));
|
||||||
|
if (ret < 0)
|
||||||
|
{
|
||||||
|
/* don't do anything here that could set 'errno' */
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
if (ret == 0)
|
||||||
|
{
|
||||||
|
/* EOF */
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
stdin_len = (size_t) ret;
|
||||||
|
stdin_ptr = 0;
|
||||||
|
}
|
||||||
|
nthis = Min(stdin_len - stdin_ptr, count);
|
||||||
|
|
||||||
|
memcpy(dst, &stdin_buf[stdin_ptr], nthis);
|
||||||
|
|
||||||
|
stdin_ptr += nthis;
|
||||||
|
count -= nthis;
|
||||||
|
dst += nthis;
|
||||||
|
}
|
||||||
|
|
||||||
|
return (dst - (char *) buf);
|
||||||
|
}
|
||||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: bdd502a8da...e9b0010b45
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 64558b386b...5cd7e44799
Reference in New Issue
Block a user