Compare commits

...

1 Commits

Author SHA1 Message Date
Konstantin Knizhnik
779508caa9 Perform wal-redo by direct call of Postgres redo functions 2022-11-07 11:09:44 +02:00
3 changed files with 62 additions and 108 deletions

View File

@@ -10,7 +10,7 @@ POSTGRES_INSTALL_DIR ?= $(ROOT_PROJECT_DIR)/pg_install/
BUILD_TYPE ?= debug
ifeq ($(BUILD_TYPE),release)
PG_CONFIGURE_OPTS = --enable-debug --with-openssl
PG_CFLAGS = -O2 -g3 $(CFLAGS)
PG_CFLAGS = -fPIC -O2 -g3 $(CFLAGS)
# Unfortunately, `--profile=...` is a nightly feature
CARGO_BUILD_FLAGS += --release
else ifeq ($(BUILD_TYPE),debug)

View File

@@ -99,7 +99,7 @@ pub struct PostgresRedoManager {
tenant_id: TenantId,
conf: &'static PageServerConf,
process: Mutex<Option<PostgresRedoProcess>>,
wal_redo_command: Mutex<libloading::Symbol<unsafe extern fn(cmd: u8, input: * const u8, size: u32, output: * u8)>>,
}
/// Can this request be served by neon redo functions
@@ -201,11 +201,17 @@ impl PostgresRedoManager {
///
pub fn new(conf: &'static PageServerConf, tenant_id: TenantId) -> PostgresRedoManager {
// The actual process is launched lazily, on first request.
PostgresRedoManager {
tenant_id,
conf,
process: Mutex::new(None),
}
unsafe {
let lib = libloading::Library::new("/home/knizhnik/zenith/pg_install/build/v14/src/backend/postgres.so").unwrap();
let main: libloading::Symbol<unsafe extern fn(arg0: * const u8, arg1: * const u8, arg2: * const u8, arg3: * const u8) -> u32> = lib.get(b"man").unwrap();
main(b"postgres".as_ptr(), b"--wal-redo".as_ptr(), std::ptr::null());
let wal_redo_command = lib.get(b"wal_redo_command").unwrap();
PostgresRedoManager {
tenant_id,
conf,
wal_redo_command: Mutex::new(wal_redo_command),
}
}
}
///
@@ -725,10 +731,11 @@ impl PostgresRedoProcess {
// This could be problematic if there are millions of records to replay,
// but in practice the number of records is usually so small that it doesn't
// matter, and it's better to keep this code simple.
let mut writebuf: Vec<u8> = Vec::new();
build_begin_redo_for_block_msg(tag, &mut writebuf);
let wal_redo_command = self.wal_redo_command.lock().unwrap();
let tag_data = tag.ser_into(buf).unwrap();
wal_redo_command(b'B', tag_data.as_ptr(), tag_data.len() as u32, std::ptr::null());
if let Some(img) = base_img {
build_push_page_msg(tag, &img, &mut writebuf);
wal_redo_command(b'P', img.as_ptr(), img.len() as u32, std::ptr::null());
}
for (lsn, rec) in records.iter() {
if let NeonWalRecord::Postgres {
@@ -736,7 +743,10 @@ impl PostgresRedoProcess {
rec: postgres_rec,
} = rec
{
build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
let mut buf = Vec::new();
buf.put_u64(*lsn.0);
buf.put(postgres_rec);
wal_redo_command(b'A', buf.get_ptr(), buf.len() as u32, std::ptr::null());
} else {
return Err(Error::new(
ErrorKind::Other,
@@ -744,97 +754,10 @@ impl PostgresRedoProcess {
));
}
}
build_get_page_msg(tag, &mut writebuf);
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
// The input is now in 'writebuf'. Do a blind write first, writing as much as
// we can, before calling poll(). That skips one call to poll() if the stdin is
// already available for writing, which it almost certainly is because the
// process is idle.
let mut nwrite = self.stdin.write(&writebuf)?;
// We expect the WAL redo process to respond with an 8k page image. We read it
// into this buffer.
let mut resultbuf = vec![0; BLCKSZ.into()];
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
// Prepare for calling poll()
let mut pollfds = [
PollFd::new(self.stdout.as_raw_fd(), PollFlags::POLLIN),
PollFd::new(self.stderr.as_raw_fd(), PollFlags::POLLIN),
PollFd::new(self.stdin.as_raw_fd(), PollFlags::POLLOUT),
];
// We do three things simultaneously: send the old base image and WAL records to
// the child process's stdin, read the result from child's stdout, and forward any logging
// information that the child writes to its stderr to the page server's log.
while nresult < BLCKSZ.into() {
// If we have more data to write, wake up if 'stdin' becomes writeable or
// we have data to read. Otherwise only wake up if there's data to read.
let nfds = if nwrite < writebuf.len() { 3 } else { 2 };
let n = loop {
match nix::poll::poll(&mut pollfds[0..nfds], wal_redo_timeout.as_millis() as i32) {
Err(e) if e == nix::errno::Errno::EINTR => continue,
res => break res,
}
}?;
if n == 0 {
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
}
// If we have some messages in stderr, forward them to the log.
let err_revents = pollfds[1].revents().unwrap();
if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
let mut errbuf: [u8; 16384] = [0; 16384];
let n = self.stderr.read(&mut errbuf)?;
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
if n > 0 {
error!(
"wal-redo-postgres: {}",
String::from_utf8_lossy(&errbuf[0..n])
);
// To make sure we capture all log from the process if it fails, keep
// reading from the stderr, before checking the stdout.
continue;
}
} else if err_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stderr unexpectedly",
));
}
// If we have more data to write and 'stdin' is writeable, do write.
if nwrite < writebuf.len() {
let in_revents = pollfds[2].revents().unwrap();
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
nwrite += self.stdin.write(&writebuf[nwrite..])?;
} else if in_revents.contains(PollFlags::POLLHUP) {
// We still have more data to write, but the process closed the pipe.
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdin unexpectedly",
));
}
}
// If we have some data in stdout, read it to the result buffer.
let out_revents = pollfds[0].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
nresult += self.stdout.read(&mut resultbuf[nresult..])?;
} else if out_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdout unexpectedly",
));
}
}
Ok(Bytes::from(resultbuf))
let mut page = Vec::new();
page.resize(BLCKSZ as usize, 0);
wal_redo_command(b'G', tag_data.as_ptr(), tag_data.len() as u32, page.as_mut_ptr());
Ok(Bytes::from(page))
}
}

View File

@@ -93,6 +93,8 @@
PG_MODULE_MAGIC;
void wal_redo_command(char cmd, char const* input, int size, char* output);
static int ReadRedoCommand(StringInfo inBuf);
static void BeginRedoForBlock(StringInfo input_message);
static void PushPage(StringInfo input_message);
@@ -282,7 +284,7 @@ WalRedoMain(int argc, char *argv[])
if (enable_seccomp)
enter_seccomp_mode();
#endif /* HAVE_LIBSECCOMP */
#if 0
/*
* Main processing loop
*/
@@ -351,6 +353,7 @@ WalRedoMain(int argc, char *argv[])
firstchar)));
}
} /* end of input-reading loop */
#endif
}
@@ -733,8 +736,8 @@ redo_block_filter(XLogReaderState *record, uint8 block_id)
*
* After applying some records.
*/
static void
GetPage(StringInfo input_message)
static void*
GetPage(StringInfo input_message, char* dst)
{
RelFileNode rnode;
ForkNumber forknum;
@@ -763,8 +766,8 @@ GetPage(StringInfo input_message)
buf = NeonRedoReadBuffer(rnode, forknum, blknum, RBM_NORMAL);
Assert(buf == wal_redo_buffer);
page = BufferGetPage(buf);
/* single thread, so don't bother locking the page */
memcpy(dst, page, BLCKSZ);
#if 0
/* Response: Page content */
tot_written = 0;
do {
@@ -781,7 +784,7 @@ GetPage(StringInfo input_message)
}
tot_written += rc;
} while (tot_written < BLCKSZ);
#endif
ReleaseBuffer(buf);
DropRelFileNodeAllLocalBuffers(rnode);
wal_redo_buffer = InvalidBuffer;
@@ -845,3 +848,31 @@ buffered_read(void *buf, size_t count)
return (dst - (char *) buf);
}
void wal_redo_input(char cmd, char const* input, int size, char* output)
{
StringInfoData input_message;
input_message.data = input;
input_message.len = input_message.maxlen = size;
input_message.cursor = 0;
switch (cmd)
{
case 'B': /* BeginRedoForBlock */
BeginRedoForBlock(&input_message);
break;
case 'P': /* PushPage */
PushPage(&input_message);
break;
case 'A': /* ApplyRecord */
ApplyRecord(&input_message);
break;
case 'G':
GetPage(&input_message, output);
break;
}
}