diff --git a/src/walredo.rs b/src/walredo.rs index 5ea7e8b238..c19347a5e5 100644 --- a/src/walredo.rs +++ b/src/walredo.rs @@ -193,16 +193,17 @@ impl WalRedoProcess { let f_stdin = async { // Send base image, if any. (If the record initializes the page, previous page // version is not needed.) - timeout(TIMEOUT, stdin.write(&build_begin_redo_for_block_msg(tag))).await??; + timeout(TIMEOUT, stdin.write_all(&build_begin_redo_for_block_msg(tag))).await??; if base_img.is_some() { - timeout(TIMEOUT, stdin.write(&build_push_page_msg(tag, base_img.unwrap()))).await??; + timeout(TIMEOUT, stdin.write_all(&build_push_page_msg(tag, base_img.unwrap()))).await??; } // Send WAL records. for rec in records.iter() { let r = rec.clone(); - timeout(TIMEOUT, stdin.write(&build_apply_record_msg(r.lsn, r.rec))).await??; + stdin.write_all(&build_apply_record_msg(r.lsn, r.rec)).await?; + //debug!("sent WAL record to wal redo postgres process ({:X}/{:X}", // r.lsn >> 32, r.lsn & 0xffff_ffff); } @@ -210,7 +211,7 @@ impl WalRedoProcess { // records.len(), lsn >> 32, lsn & 0xffff_ffff); // Send GetPage command to get the result back - timeout(TIMEOUT, stdin.write(&build_get_page_msg(tag))).await??; + timeout(TIMEOUT, stdin.write_all(&build_get_page_msg(tag))).await??; timeout(TIMEOUT, stdin.flush()).await??; //debug!("sent GetPage for {}", tag.blknum); Ok::<(), Error>(()) @@ -240,16 +241,19 @@ impl WalRedoProcess { fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes { - let mut buf = BytesMut::new(); + let len = 4 + 5*4; + let mut buf = BytesMut::with_capacity(1 + len); buf.put_u8('B' as u8); - buf.put_u32(4 + 5*4); + buf.put_u32(len as u32); buf.put_u32(tag.spcnode); buf.put_u32(tag.dbnode); buf.put_u32(tag.relnode); buf.put_u32(tag.forknum as u32); buf.put_u32(tag.blknum); + assert!(buf.len() == 1 + len); + return buf.freeze(); } @@ -257,10 +261,11 @@ fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes { assert!(base_img.len() == 8192); - let mut buf = BytesMut::new(); + let len = 4 + 5*4 + base_img.len(); + let mut buf = BytesMut::with_capacity(1 + len); buf.put_u8('P' as u8); - buf.put_u32(4 + 5*4 + base_img.len() as u32); + buf.put_u32(len as u32); buf.put_u32(tag.spcnode); buf.put_u32(tag.dbnode); buf.put_u32(tag.relnode); @@ -268,32 +273,39 @@ fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes buf.put_u32(tag.blknum); buf.put(base_img); + assert!(buf.len() == 1 + len); + return buf.freeze(); } fn build_apply_record_msg(lsn: u64, rec: Bytes) -> Bytes { - let mut buf = BytesMut::new(); + let len = 4 + 8 + rec.len(); + let mut buf = BytesMut::with_capacity(1 + len); buf.put_u8('A' as u8); - buf.put_u32(4 + 8 + rec.len() as u32); + buf.put_u32(len as u32); buf.put_u64(lsn); buf.put(rec); + assert!(buf.len() == 1 + len); + return buf.freeze(); } -fn build_get_page_msg(tag: BufferTag, ) -> Bytes { - - let mut buf = BytesMut::new(); +fn build_get_page_msg(tag: BufferTag) -> Bytes { + let len = 4 + 5*4; + let mut buf = BytesMut::with_capacity(1 + len); buf.put_u8('G' as u8); - buf.put_u32(4 + 5*4); + buf.put_u32(len as u32); buf.put_u32(tag.spcnode); buf.put_u32(tag.dbnode); buf.put_u32(tag.relnode); buf.put_u32(tag.forknum as u32); buf.put_u32(tag.blknum); + assert!(buf.len() == 1 + len); + return buf.freeze(); }