mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-06 20:20:37 +00:00
Compare commits
1 Commits
hackathon/
...
embedded_w
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
779508caa9 |
2
Makefile
2
Makefile
@@ -10,7 +10,7 @@ POSTGRES_INSTALL_DIR ?= $(ROOT_PROJECT_DIR)/pg_install/
|
|||||||
BUILD_TYPE ?= debug
|
BUILD_TYPE ?= debug
|
||||||
ifeq ($(BUILD_TYPE),release)
|
ifeq ($(BUILD_TYPE),release)
|
||||||
PG_CONFIGURE_OPTS = --enable-debug --with-openssl
|
PG_CONFIGURE_OPTS = --enable-debug --with-openssl
|
||||||
PG_CFLAGS = -O2 -g3 $(CFLAGS)
|
PG_CFLAGS = -fPIC -O2 -g3 $(CFLAGS)
|
||||||
# Unfortunately, `--profile=...` is a nightly feature
|
# Unfortunately, `--profile=...` is a nightly feature
|
||||||
CARGO_BUILD_FLAGS += --release
|
CARGO_BUILD_FLAGS += --release
|
||||||
else ifeq ($(BUILD_TYPE),debug)
|
else ifeq ($(BUILD_TYPE),debug)
|
||||||
|
|||||||
@@ -99,7 +99,7 @@ pub struct PostgresRedoManager {
|
|||||||
tenant_id: TenantId,
|
tenant_id: TenantId,
|
||||||
conf: &'static PageServerConf,
|
conf: &'static PageServerConf,
|
||||||
|
|
||||||
process: Mutex<Option<PostgresRedoProcess>>,
|
wal_redo_command: Mutex<libloading::Symbol<unsafe extern fn(cmd: u8, input: * const u8, size: u32, output: * u8)>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Can this request be served by neon redo functions
|
/// Can this request be served by neon redo functions
|
||||||
@@ -201,11 +201,17 @@ impl PostgresRedoManager {
|
|||||||
///
|
///
|
||||||
pub fn new(conf: &'static PageServerConf, tenant_id: TenantId) -> PostgresRedoManager {
|
pub fn new(conf: &'static PageServerConf, tenant_id: TenantId) -> PostgresRedoManager {
|
||||||
// The actual process is launched lazily, on first request.
|
// The actual process is launched lazily, on first request.
|
||||||
PostgresRedoManager {
|
unsafe {
|
||||||
tenant_id,
|
let lib = libloading::Library::new("/home/knizhnik/zenith/pg_install/build/v14/src/backend/postgres.so").unwrap();
|
||||||
conf,
|
let main: libloading::Symbol<unsafe extern fn(arg0: * const u8, arg1: * const u8, arg2: * const u8, arg3: * const u8) -> u32> = lib.get(b"man").unwrap();
|
||||||
process: Mutex::new(None),
|
main(b"postgres".as_ptr(), b"--wal-redo".as_ptr(), std::ptr::null());
|
||||||
}
|
let wal_redo_command = lib.get(b"wal_redo_command").unwrap();
|
||||||
|
PostgresRedoManager {
|
||||||
|
tenant_id,
|
||||||
|
conf,
|
||||||
|
wal_redo_command: Mutex::new(wal_redo_command),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
///
|
///
|
||||||
@@ -725,10 +731,11 @@ impl PostgresRedoProcess {
|
|||||||
// This could be problematic if there are millions of records to replay,
|
// This could be problematic if there are millions of records to replay,
|
||||||
// but in practice the number of records is usually so small that it doesn't
|
// but in practice the number of records is usually so small that it doesn't
|
||||||
// matter, and it's better to keep this code simple.
|
// matter, and it's better to keep this code simple.
|
||||||
let mut writebuf: Vec<u8> = Vec::new();
|
let wal_redo_command = self.wal_redo_command.lock().unwrap();
|
||||||
build_begin_redo_for_block_msg(tag, &mut writebuf);
|
let tag_data = tag.ser_into(buf).unwrap();
|
||||||
|
wal_redo_command(b'B', tag_data.as_ptr(), tag_data.len() as u32, std::ptr::null());
|
||||||
if let Some(img) = base_img {
|
if let Some(img) = base_img {
|
||||||
build_push_page_msg(tag, &img, &mut writebuf);
|
wal_redo_command(b'P', img.as_ptr(), img.len() as u32, std::ptr::null());
|
||||||
}
|
}
|
||||||
for (lsn, rec) in records.iter() {
|
for (lsn, rec) in records.iter() {
|
||||||
if let NeonWalRecord::Postgres {
|
if let NeonWalRecord::Postgres {
|
||||||
@@ -736,7 +743,10 @@ impl PostgresRedoProcess {
|
|||||||
rec: postgres_rec,
|
rec: postgres_rec,
|
||||||
} = rec
|
} = rec
|
||||||
{
|
{
|
||||||
build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
|
let mut buf = Vec::new();
|
||||||
|
buf.put_u64(*lsn.0);
|
||||||
|
buf.put(postgres_rec);
|
||||||
|
wal_redo_command(b'A', buf.get_ptr(), buf.len() as u32, std::ptr::null());
|
||||||
} else {
|
} else {
|
||||||
return Err(Error::new(
|
return Err(Error::new(
|
||||||
ErrorKind::Other,
|
ErrorKind::Other,
|
||||||
@@ -744,97 +754,10 @@ impl PostgresRedoProcess {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
build_get_page_msg(tag, &mut writebuf);
|
let mut page = Vec::new();
|
||||||
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
|
page.resize(BLCKSZ as usize, 0);
|
||||||
|
wal_redo_command(b'G', tag_data.as_ptr(), tag_data.len() as u32, page.as_mut_ptr());
|
||||||
// The input is now in 'writebuf'. Do a blind write first, writing as much as
|
Ok(Bytes::from(page))
|
||||||
// we can, before calling poll(). That skips one call to poll() if the stdin is
|
|
||||||
// already available for writing, which it almost certainly is because the
|
|
||||||
// process is idle.
|
|
||||||
let mut nwrite = self.stdin.write(&writebuf)?;
|
|
||||||
|
|
||||||
// We expect the WAL redo process to respond with an 8k page image. We read it
|
|
||||||
// into this buffer.
|
|
||||||
let mut resultbuf = vec![0; BLCKSZ.into()];
|
|
||||||
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
|
|
||||||
|
|
||||||
// Prepare for calling poll()
|
|
||||||
let mut pollfds = [
|
|
||||||
PollFd::new(self.stdout.as_raw_fd(), PollFlags::POLLIN),
|
|
||||||
PollFd::new(self.stderr.as_raw_fd(), PollFlags::POLLIN),
|
|
||||||
PollFd::new(self.stdin.as_raw_fd(), PollFlags::POLLOUT),
|
|
||||||
];
|
|
||||||
|
|
||||||
// We do three things simultaneously: send the old base image and WAL records to
|
|
||||||
// the child process's stdin, read the result from child's stdout, and forward any logging
|
|
||||||
// information that the child writes to its stderr to the page server's log.
|
|
||||||
while nresult < BLCKSZ.into() {
|
|
||||||
// If we have more data to write, wake up if 'stdin' becomes writeable or
|
|
||||||
// we have data to read. Otherwise only wake up if there's data to read.
|
|
||||||
let nfds = if nwrite < writebuf.len() { 3 } else { 2 };
|
|
||||||
let n = loop {
|
|
||||||
match nix::poll::poll(&mut pollfds[0..nfds], wal_redo_timeout.as_millis() as i32) {
|
|
||||||
Err(e) if e == nix::errno::Errno::EINTR => continue,
|
|
||||||
res => break res,
|
|
||||||
}
|
|
||||||
}?;
|
|
||||||
|
|
||||||
if n == 0 {
|
|
||||||
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we have some messages in stderr, forward them to the log.
|
|
||||||
let err_revents = pollfds[1].revents().unwrap();
|
|
||||||
if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
|
|
||||||
let mut errbuf: [u8; 16384] = [0; 16384];
|
|
||||||
let n = self.stderr.read(&mut errbuf)?;
|
|
||||||
|
|
||||||
// The message might not be split correctly into lines here. But this is
|
|
||||||
// good enough, the important thing is to get the message to the log.
|
|
||||||
if n > 0 {
|
|
||||||
error!(
|
|
||||||
"wal-redo-postgres: {}",
|
|
||||||
String::from_utf8_lossy(&errbuf[0..n])
|
|
||||||
);
|
|
||||||
|
|
||||||
// To make sure we capture all log from the process if it fails, keep
|
|
||||||
// reading from the stderr, before checking the stdout.
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
} else if err_revents.contains(PollFlags::POLLHUP) {
|
|
||||||
return Err(Error::new(
|
|
||||||
ErrorKind::BrokenPipe,
|
|
||||||
"WAL redo process closed its stderr unexpectedly",
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we have more data to write and 'stdin' is writeable, do write.
|
|
||||||
if nwrite < writebuf.len() {
|
|
||||||
let in_revents = pollfds[2].revents().unwrap();
|
|
||||||
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
|
|
||||||
nwrite += self.stdin.write(&writebuf[nwrite..])?;
|
|
||||||
} else if in_revents.contains(PollFlags::POLLHUP) {
|
|
||||||
// We still have more data to write, but the process closed the pipe.
|
|
||||||
return Err(Error::new(
|
|
||||||
ErrorKind::BrokenPipe,
|
|
||||||
"WAL redo process closed its stdin unexpectedly",
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we have some data in stdout, read it to the result buffer.
|
|
||||||
let out_revents = pollfds[0].revents().unwrap();
|
|
||||||
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
|
|
||||||
nresult += self.stdout.read(&mut resultbuf[nresult..])?;
|
|
||||||
} else if out_revents.contains(PollFlags::POLLHUP) {
|
|
||||||
return Err(Error::new(
|
|
||||||
ErrorKind::BrokenPipe,
|
|
||||||
"WAL redo process closed its stdout unexpectedly",
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(Bytes::from(resultbuf))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -93,6 +93,8 @@
|
|||||||
|
|
||||||
PG_MODULE_MAGIC;
|
PG_MODULE_MAGIC;
|
||||||
|
|
||||||
|
void wal_redo_command(char cmd, char const* input, int size, char* output);
|
||||||
|
|
||||||
static int ReadRedoCommand(StringInfo inBuf);
|
static int ReadRedoCommand(StringInfo inBuf);
|
||||||
static void BeginRedoForBlock(StringInfo input_message);
|
static void BeginRedoForBlock(StringInfo input_message);
|
||||||
static void PushPage(StringInfo input_message);
|
static void PushPage(StringInfo input_message);
|
||||||
@@ -282,7 +284,7 @@ WalRedoMain(int argc, char *argv[])
|
|||||||
if (enable_seccomp)
|
if (enable_seccomp)
|
||||||
enter_seccomp_mode();
|
enter_seccomp_mode();
|
||||||
#endif /* HAVE_LIBSECCOMP */
|
#endif /* HAVE_LIBSECCOMP */
|
||||||
|
#if 0
|
||||||
/*
|
/*
|
||||||
* Main processing loop
|
* Main processing loop
|
||||||
*/
|
*/
|
||||||
@@ -351,6 +353,7 @@ WalRedoMain(int argc, char *argv[])
|
|||||||
firstchar)));
|
firstchar)));
|
||||||
}
|
}
|
||||||
} /* end of input-reading loop */
|
} /* end of input-reading loop */
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -733,8 +736,8 @@ redo_block_filter(XLogReaderState *record, uint8 block_id)
|
|||||||
*
|
*
|
||||||
* After applying some records.
|
* After applying some records.
|
||||||
*/
|
*/
|
||||||
static void
|
static void*
|
||||||
GetPage(StringInfo input_message)
|
GetPage(StringInfo input_message, char* dst)
|
||||||
{
|
{
|
||||||
RelFileNode rnode;
|
RelFileNode rnode;
|
||||||
ForkNumber forknum;
|
ForkNumber forknum;
|
||||||
@@ -763,8 +766,8 @@ GetPage(StringInfo input_message)
|
|||||||
buf = NeonRedoReadBuffer(rnode, forknum, blknum, RBM_NORMAL);
|
buf = NeonRedoReadBuffer(rnode, forknum, blknum, RBM_NORMAL);
|
||||||
Assert(buf == wal_redo_buffer);
|
Assert(buf == wal_redo_buffer);
|
||||||
page = BufferGetPage(buf);
|
page = BufferGetPage(buf);
|
||||||
/* single thread, so don't bother locking the page */
|
memcpy(dst, page, BLCKSZ);
|
||||||
|
#if 0
|
||||||
/* Response: Page content */
|
/* Response: Page content */
|
||||||
tot_written = 0;
|
tot_written = 0;
|
||||||
do {
|
do {
|
||||||
@@ -781,7 +784,7 @@ GetPage(StringInfo input_message)
|
|||||||
}
|
}
|
||||||
tot_written += rc;
|
tot_written += rc;
|
||||||
} while (tot_written < BLCKSZ);
|
} while (tot_written < BLCKSZ);
|
||||||
|
#endif
|
||||||
ReleaseBuffer(buf);
|
ReleaseBuffer(buf);
|
||||||
DropRelFileNodeAllLocalBuffers(rnode);
|
DropRelFileNodeAllLocalBuffers(rnode);
|
||||||
wal_redo_buffer = InvalidBuffer;
|
wal_redo_buffer = InvalidBuffer;
|
||||||
@@ -845,3 +848,31 @@ buffered_read(void *buf, size_t count)
|
|||||||
|
|
||||||
return (dst - (char *) buf);
|
return (dst - (char *) buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void wal_redo_input(char cmd, char const* input, int size, char* output)
|
||||||
|
{
|
||||||
|
StringInfoData input_message;
|
||||||
|
input_message.data = input;
|
||||||
|
input_message.len = input_message.maxlen = size;
|
||||||
|
input_message.cursor = 0;
|
||||||
|
|
||||||
|
switch (cmd)
|
||||||
|
{
|
||||||
|
case 'B': /* BeginRedoForBlock */
|
||||||
|
BeginRedoForBlock(&input_message);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case 'P': /* PushPage */
|
||||||
|
PushPage(&input_message);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case 'A': /* ApplyRecord */
|
||||||
|
ApplyRecord(&input_message);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case 'G':
|
||||||
|
GetPage(&input_message, output);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user