diff --git a/pageserver/src/walredo/mod.rs b/pageserver/src/walredo/mod.rs index 0d2aee27a7..11a0adb9b9 100644 --- a/pageserver/src/walredo/mod.rs +++ b/pageserver/src/walredo/mod.rs @@ -242,20 +242,17 @@ impl PostgresRedoManager { process: &mut PostgresRedoProcess, request: &WalRedoRequest, ) -> Result { - let rel = request.rel; - let blknum = request.blknum; - let lsn = request.lsn; - let base_img = request.base_img.clone(); - let records = &request.records; - - let nrecords = records.len(); - let start = Instant::now(); - let apply_result = if let RelishTag::Relation(rel) = rel { + let apply_result = if let RelishTag::Relation(rel) = request.rel { // Relational WAL records are applied using wal-redo-postgres - let buf_tag = BufferTag { rel, blknum }; - process.apply_wal_records(buf_tag, base_img, records).await + let buf_tag = BufferTag { + rel, + blknum: request.blknum, + }; + process + .apply_wal_records(buf_tag, &request.base_img, &request.records) + .await } else { Ok(nonrel::apply_nonrel(request)) }; @@ -266,9 +263,9 @@ impl PostgresRedoManager { debug!( "applied {} WAL records in {} ms to reconstruct page image at LSN {}", - nrecords, + request.records.len(), duration.as_millis(), - lsn + request.lsn ); if let Err(e) = apply_result { @@ -393,12 +390,11 @@ impl PostgresRedoProcess { async fn apply_wal_records( &mut self, tag: BufferTag, - base_img: Option, + base_img: &Option, records: &[WALRecord], ) -> Result { + let stdin = &mut self.stdin; let stdout = &mut self.stdout; - // Buffer the writes to avoid a lot of small syscalls. - let mut stdin = tokio::io::BufWriter::new(&mut self.stdin); // We do three things simultaneously: send the old base image and WAL records to // the child process's stdin, read the result from child's stdout, and forward any logging @@ -408,58 +404,47 @@ impl PostgresRedoProcess { // 'f_stdout' below reads the result back. And 'f_stderr', which was spawned into the // tokio runtime in the 'launch' function already, forwards the logging. let f_stdin = async { - // Send base image, if any. (If the record initializes the page, previous page - // version is not needed.) - timeout( - TIMEOUT, - stdin.write_all(&build_begin_redo_for_block_msg(tag)), - ) - .await??; + let mut capacity = 1 + BEGIN_REDO_MSG_LEN; if base_img.is_some() { - timeout( - TIMEOUT, - stdin.write_all(&build_push_page_msg(tag, base_img.unwrap())), - ) - .await??; + capacity += 1 + PUSH_PAGE_MSG_LEN; + } + capacity += (1 + APPLY_MSG_HEADER_LEN) * records.len(); + capacity += records.iter().map(|rec| rec.rec.len()).sum::(); + capacity += 1 + GET_PAGE_MSG_LEN; + + let mut buf = BytesMut::with_capacity(capacity); + + build_begin_redo_for_block_msg(&mut buf, tag); + + if let Some(base_img) = base_img.as_ref() { + build_push_page_msg(&mut buf, tag, base_img); } - // Send WAL records. - for rec in records.iter() { - let r = rec.clone(); - - WAL_REDO_RECORD_COUNTER.inc(); - - stdin - .write_all(&build_apply_record_msg(r.lsn, r.rec)) - .await?; - - //debug!("sent WAL record to wal redo postgres process ({:X}/{:X}", - // r.lsn >> 32, r.lsn & 0xffff_ffff); + for record in records { + build_apply_record_msg(&mut buf, record.lsn, &record.rec); } - //debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}", - // records.len(), lsn >> 32, lsn & 0xffff_ffff); - // Send GetPage command to get the result back - timeout(TIMEOUT, stdin.write_all(&build_get_page_msg(tag))).await??; + build_get_page_msg(&mut buf, tag); + + debug_assert_eq!(capacity, buf.len()); + + timeout(TIMEOUT, stdin.write_all(&buf)).await??; timeout(TIMEOUT, stdin.flush()).await??; - //debug!("sent GetPage for {}", tag.blknum); + Ok::<(), Error>(()) }; // Read back new page image let f_stdout = async { - let mut buf = [0u8; 8192]; + let mut buf = vec![0u8; 8192]; timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??; //debug!("got response for {}", tag.blknum); - Ok::<[u8; 8192], Error>(buf) + Ok::, Error>(buf) }; - let res = tokio::try_join!(f_stdout, f_stdin)?; - - let buf = res.0; - - Ok::(Bytes::from(std::vec::Vec::from(buf))) + let (buf, _) = tokio::try_join!(f_stdout, f_stdin)?; + Ok::(Bytes::from(buf)) } } @@ -467,81 +452,44 @@ impl PostgresRedoProcess { // process. See vendor/postgres/src/backend/tcop/zenith_wal_redo.c for // explanation of the protocol. -fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes { - let len = 4 + 1 + 4 * 4; - let mut buf = BytesMut::with_capacity(1 + len); +const TAG_LEN: usize = 4 * 4; +const PAGE_SIZE: usize = 8192; +const BEGIN_REDO_MSG_LEN: usize = 4 + 1 + TAG_LEN; +const PUSH_PAGE_MSG_LEN: usize = 4 + 1 + TAG_LEN + PAGE_SIZE; +const APPLY_MSG_HEADER_LEN: usize = 4 + 8; +const GET_PAGE_MSG_LEN: usize = 4 + 1 + TAG_LEN; +fn build_begin_redo_for_block_msg(buf: &mut BytesMut, tag: BufferTag) { buf.put_u8(b'B'); - buf.put_u32(len as u32); + buf.put_u32(BEGIN_REDO_MSG_LEN as u32); - // FIXME: this is a temporary hack that should go away when we refactor - // the postgres protocol serialization + handlers. - // - // BytesMut is a dynamic growable buffer, used a lot in tokio code but - // not in the std library. To write to a BytesMut from a serde serializer, - // we need to either: - // - pre-allocate the required buffer space. This is annoying because we - // shouldn't care what the exact serialized size is-- that's the - // serializer's job. - // - Or, we need to create a temporary "writer" (which implements the - // `Write` trait). It's a bit awkward, because the writer consumes the - // underlying BytesMut, and we need to extract it later with - // `into_inner`. - let mut writer = buf.writer(); - tag.ser_into(&mut writer) + tag.ser_into(&mut buf.writer()) .expect("serialize BufferTag should always succeed"); - let buf = writer.into_inner(); - - debug_assert!(buf.len() == 1 + len); - - buf.freeze() } -fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes { - assert!(base_img.len() == 8192); - - let len = 4 + 1 + 4 * 4 + base_img.len(); - let mut buf = BytesMut::with_capacity(1 + len); +fn build_push_page_msg(buf: &mut BytesMut, tag: BufferTag, base_img: &Bytes) { + debug_assert_eq!(base_img.len(), PAGE_SIZE); buf.put_u8(b'P'); - buf.put_u32(len as u32); - let mut writer = buf.writer(); - tag.ser_into(&mut writer) + buf.put_u32(PUSH_PAGE_MSG_LEN as u32); + tag.ser_into(&mut buf.writer()) .expect("serialize BufferTag should always succeed"); - let mut buf = writer.into_inner(); - buf.put(base_img); - - debug_assert!(buf.len() == 1 + len); - - buf.freeze() + buf.extend(base_img); } -fn build_apply_record_msg(endlsn: Lsn, rec: Bytes) -> Bytes { - let len = 4 + 8 + rec.len(); - let mut buf = BytesMut::with_capacity(1 + len); - +fn build_apply_record_msg(buf: &mut BytesMut, endlsn: Lsn, rec: &Bytes) { buf.put_u8(b'A'); + + let len = APPLY_MSG_HEADER_LEN + rec.len(); buf.put_u32(len as u32); + buf.put_u64(endlsn.0); - buf.put(rec); - - debug_assert!(buf.len() == 1 + len); - - buf.freeze() + buf.extend(rec); } -fn build_get_page_msg(tag: BufferTag) -> Bytes { - let len = 4 + 1 + 4 * 4; - let mut buf = BytesMut::with_capacity(1 + len); - +fn build_get_page_msg(buf: &mut BytesMut, tag: BufferTag) { buf.put_u8(b'G'); - buf.put_u32(len as u32); - let mut writer = buf.writer(); - tag.ser_into(&mut writer) + buf.put_u32(GET_PAGE_MSG_LEN as u32); + tag.ser_into(&mut buf.writer()) .expect("serialize BufferTag should always succeed"); - let buf = writer.into_inner(); - - debug_assert!(buf.len() == 1 + len); - - buf.freeze() } diff --git a/pageserver/src/walredo/nonrel.rs b/pageserver/src/walredo/nonrel.rs index 69ec328e74..657ea87b16 100644 --- a/pageserver/src/walredo/nonrel.rs +++ b/pageserver/src/walredo/nonrel.rs @@ -18,14 +18,12 @@ use super::WalRedoRequest; pub(super) fn apply_nonrel(request: &WalRedoRequest) -> Bytes { let rel = request.rel; let blknum = request.blknum; - let base_img = request.base_img.clone(); - let records = &request.records; // Non-relational WAL records are handled here, with custom code that has the // same effects as the corresponding Postgres WAL redo function. const ZERO_PAGE: [u8; 8192] = [0u8; 8192]; let mut page = BytesMut::new(); - if let Some(fpi) = base_img { + if let Some(fpi) = &request.base_img { // If full-page image is provided, then use it... page.extend_from_slice(&fpi[..]); } else { @@ -33,7 +31,7 @@ pub(super) fn apply_nonrel(request: &WalRedoRequest) -> Bytes { page.extend_from_slice(&ZERO_PAGE); } // Apply all collected WAL records - for record in records { + for record in &request.records { let mut buf = record.rec.clone(); super::WAL_REDO_RECORD_COUNTER.inc();