Refactor functions for constructing WAL redo messages.

Instead of building a separate Vec<u8> to hold each message, serialize all
the messages to one big Vec<u8>. This eliminates some Vec allocation and
memcpy() overhead. The downside is that if there are a lot of records to
replay, we have to serialize them all into one big chunk of memory.
That shouldn't be a problem in practice. If you need to replay millions
of records to reconstruct a page, we should've materialized a new image
of that page earlier already.
This commit is contained in:
Heikki Linnakangas
2021-11-04 10:39:00 +02:00
parent 086a02ab92
commit 3a0111c75e

View File

@@ -283,9 +283,9 @@ impl PostgresRedoManager {
let duration = start.elapsed();
debug!(
"postgres applied {} WAL records in {} ms to reconstruct page image at LSN {}",
"postgres applied {} WAL records in {} us to reconstruct page image at LSN {}",
nrecords,
duration.as_millis(),
duration.as_micros(),
lsn
);
@@ -577,8 +577,23 @@ impl PostgresRedoProcess {
records: &[(Lsn, WALRecord)],
) -> Result<Bytes, std::io::Error> {
let stdout = &mut self.stdout;
// Buffer the writes to avoid a lot of small syscalls.
let mut stdin = tokio::io::BufWriter::new(&mut self.stdin);
let stdin = &mut self.stdin;
// Serialize all the messages to send the WAL redo process first.
//
// This could be problematic if there are millions of records to replay,
// but in practice the number of records is usually so small that it doesn't
// matter, and it's better to keep this code simple.
let mut writebuf: Vec<u8> = Vec::new();
build_begin_redo_for_block_msg(tag, &mut writebuf);
if let Some(img) = base_img {
build_push_page_msg(tag, &img, &mut writebuf);
}
for (lsn, rec) in records.iter() {
build_apply_record_msg(*lsn, &rec.rec, &mut writebuf);
}
build_get_page_msg(tag, &mut writebuf);
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
// We do three things simultaneously: send the old base image and WAL records to
// the child process's stdin, read the result from child's stdout, and forward any logging
@@ -590,33 +605,7 @@ impl PostgresRedoProcess {
let f_stdin = async {
// Send base image, if any. (If the record initializes the page, previous page
// version is not needed.)
timeout(
TIMEOUT,
stdin.write_all(&build_begin_redo_for_block_msg(tag)),
)
.await??;
if let Some(img) = base_img {
timeout(TIMEOUT, stdin.write_all(&build_push_page_msg(tag, &img))).await??;
}
// Send WAL records.
for (lsn, rec) in records.iter() {
WAL_REDO_RECORD_COUNTER.inc();
stdin
.write_all(&build_apply_record_msg(*lsn, &rec.rec))
.await?;
//debug!("sent WAL record to wal redo postgres process ({:X}/{:X}",
// r.lsn >> 32, r.lsn & 0xffff_ffff);
}
//debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}",
// records.len(), lsn >> 32, lsn & 0xffff_ffff);
// Send GetPage command to get the result back
timeout(TIMEOUT, stdin.write_all(&build_get_page_msg(tag))).await??;
timeout(TIMEOUT, stdin.flush()).await??;
//debug!("sent GetPage for {}", tag.blknum);
timeout(TIMEOUT, stdin.write_all(&writebuf)).await??;
Ok::<(), Error>(())
};
@@ -625,7 +614,6 @@ impl PostgresRedoProcess {
let mut buf = [0u8; 8192];
timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??;
//debug!("got response for {}", tag.blknum);
Ok::<[u8; 8192], Error>(buf)
};
@@ -641,62 +629,42 @@ impl PostgresRedoProcess {
// process. See vendor/postgres/src/backend/tcop/zenith_wal_redo.c for
// explanation of the protocol.
fn build_begin_redo_for_block_msg(tag: BufferTag) -> Vec<u8> {
fn build_begin_redo_for_block_msg(tag: BufferTag, buf: &mut Vec<u8>) {
let len = 4 + 1 + 4 * 4;
let mut buf = Vec::with_capacity(1 + len);
buf.put_u8(b'B');
buf.put_u32(len as u32);
tag.ser_into(&mut buf)
tag.ser_into(buf)
.expect("serialize BufferTag should always succeed");
debug_assert!(buf.len() == 1 + len);
buf
}
fn build_push_page_msg(tag: BufferTag, base_img: &[u8]) -> Vec<u8> {
fn build_push_page_msg(tag: BufferTag, base_img: &[u8], buf: &mut Vec<u8>) {
assert!(base_img.len() == 8192);
let len = 4 + 1 + 4 * 4 + base_img.len();
let mut buf = Vec::with_capacity(1 + len);
buf.put_u8(b'P');
buf.put_u32(len as u32);
tag.ser_into(&mut buf)
tag.ser_into(buf)
.expect("serialize BufferTag should always succeed");
buf.put(base_img);
debug_assert!(buf.len() == 1 + len);
buf
}
fn build_apply_record_msg(endlsn: Lsn, rec: &[u8]) -> Vec<u8> {
fn build_apply_record_msg(endlsn: Lsn, rec: &[u8], buf: &mut Vec<u8>) {
let len = 4 + 8 + rec.len();
let mut buf: Vec<u8> = Vec::with_capacity(1 + len);
buf.put_u8(b'A');
buf.put_u32(len as u32);
buf.put_u64(endlsn.0);
buf.put(rec);
debug_assert!(buf.len() == 1 + len);
buf
}
fn build_get_page_msg(tag: BufferTag) -> Vec<u8> {
fn build_get_page_msg(tag: BufferTag, buf: &mut Vec<u8>) {
let len = 4 + 1 + 4 * 4;
let mut buf = Vec::with_capacity(1 + len);
buf.put_u8(b'G');
buf.put_u32(len as u32);
tag.ser_into(&mut buf)
tag.ser_into(buf)
.expect("serialize BufferTag should always succeed");
debug_assert!(buf.len() == 1 + len);
buf
}