From 3a0111c75e30b9d555a28b4cb1dcd90f63ef637a Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 4 Nov 2021 10:39:00 +0200 Subject: [PATCH] Refactor functions for constructing WAL redo messages. Instead of building a separate Vec to hold each message, serialize all the messages to one big Vec. This eliminates some Vec allocation and memcpy() overhead. The downside is that if there are a lot of records to replay, we have to serialize them all into one big chunk of memory. That shouldn't be a problem in practice. If you need to replay millions of records to reconstruct a page, we should've materialized a new image of that page earlier already. --- pageserver/src/walredo.rs | 86 ++++++++++++--------------------------- 1 file changed, 27 insertions(+), 59 deletions(-) diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 4ceb4e2b37..50a06f069c 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -283,9 +283,9 @@ impl PostgresRedoManager { let duration = start.elapsed(); debug!( - "postgres applied {} WAL records in {} ms to reconstruct page image at LSN {}", + "postgres applied {} WAL records in {} us to reconstruct page image at LSN {}", nrecords, - duration.as_millis(), + duration.as_micros(), lsn ); @@ -577,8 +577,23 @@ impl PostgresRedoProcess { records: &[(Lsn, WALRecord)], ) -> Result { let stdout = &mut self.stdout; - // Buffer the writes to avoid a lot of small syscalls. - let mut stdin = tokio::io::BufWriter::new(&mut self.stdin); + let stdin = &mut self.stdin; + + // Serialize all the messages to send the WAL redo process first. + // + // This could be problematic if there are millions of records to replay, + // but in practice the number of records is usually so small that it doesn't + // matter, and it's better to keep this code simple. + let mut writebuf: Vec = Vec::new(); + build_begin_redo_for_block_msg(tag, &mut writebuf); + if let Some(img) = base_img { + build_push_page_msg(tag, &img, &mut writebuf); + } + for (lsn, rec) in records.iter() { + build_apply_record_msg(*lsn, &rec.rec, &mut writebuf); + } + build_get_page_msg(tag, &mut writebuf); + WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64); // We do three things simultaneously: send the old base image and WAL records to // the child process's stdin, read the result from child's stdout, and forward any logging @@ -590,33 +605,7 @@ impl PostgresRedoProcess { let f_stdin = async { // Send base image, if any. (If the record initializes the page, previous page // version is not needed.) - timeout( - TIMEOUT, - stdin.write_all(&build_begin_redo_for_block_msg(tag)), - ) - .await??; - if let Some(img) = base_img { - timeout(TIMEOUT, stdin.write_all(&build_push_page_msg(tag, &img))).await??; - } - - // Send WAL records. - for (lsn, rec) in records.iter() { - WAL_REDO_RECORD_COUNTER.inc(); - - stdin - .write_all(&build_apply_record_msg(*lsn, &rec.rec)) - .await?; - - //debug!("sent WAL record to wal redo postgres process ({:X}/{:X}", - // r.lsn >> 32, r.lsn & 0xffff_ffff); - } - //debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}", - // records.len(), lsn >> 32, lsn & 0xffff_ffff); - - // Send GetPage command to get the result back - timeout(TIMEOUT, stdin.write_all(&build_get_page_msg(tag))).await??; - timeout(TIMEOUT, stdin.flush()).await??; - //debug!("sent GetPage for {}", tag.blknum); + timeout(TIMEOUT, stdin.write_all(&writebuf)).await??; Ok::<(), Error>(()) }; @@ -625,7 +614,6 @@ impl PostgresRedoProcess { let mut buf = [0u8; 8192]; timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??; - //debug!("got response for {}", tag.blknum); Ok::<[u8; 8192], Error>(buf) }; @@ -641,62 +629,42 @@ impl PostgresRedoProcess { // process. See vendor/postgres/src/backend/tcop/zenith_wal_redo.c for // explanation of the protocol. -fn build_begin_redo_for_block_msg(tag: BufferTag) -> Vec { +fn build_begin_redo_for_block_msg(tag: BufferTag, buf: &mut Vec) { let len = 4 + 1 + 4 * 4; - let mut buf = Vec::with_capacity(1 + len); buf.put_u8(b'B'); buf.put_u32(len as u32); - tag.ser_into(&mut buf) + tag.ser_into(buf) .expect("serialize BufferTag should always succeed"); - - debug_assert!(buf.len() == 1 + len); - - buf } -fn build_push_page_msg(tag: BufferTag, base_img: &[u8]) -> Vec { +fn build_push_page_msg(tag: BufferTag, base_img: &[u8], buf: &mut Vec) { assert!(base_img.len() == 8192); let len = 4 + 1 + 4 * 4 + base_img.len(); - let mut buf = Vec::with_capacity(1 + len); buf.put_u8(b'P'); buf.put_u32(len as u32); - tag.ser_into(&mut buf) + tag.ser_into(buf) .expect("serialize BufferTag should always succeed"); buf.put(base_img); - - debug_assert!(buf.len() == 1 + len); - - buf } -fn build_apply_record_msg(endlsn: Lsn, rec: &[u8]) -> Vec { +fn build_apply_record_msg(endlsn: Lsn, rec: &[u8], buf: &mut Vec) { let len = 4 + 8 + rec.len(); - let mut buf: Vec = Vec::with_capacity(1 + len); buf.put_u8(b'A'); buf.put_u32(len as u32); buf.put_u64(endlsn.0); buf.put(rec); - - debug_assert!(buf.len() == 1 + len); - - buf } -fn build_get_page_msg(tag: BufferTag) -> Vec { +fn build_get_page_msg(tag: BufferTag, buf: &mut Vec) { let len = 4 + 1 + 4 * 4; - let mut buf = Vec::with_capacity(1 + len); buf.put_u8(b'G'); buf.put_u32(len as u32); - tag.ser_into(&mut buf) + tag.ser_into(buf) .expect("serialize BufferTag should always succeed"); - - debug_assert!(buf.len() == 1 + len); - - buf }