Use write_all() when sending messages to Postgres WAL process.

tokio::io:AsyncWrite.read() function will do a short write, if the pipe's
buffer is full.
This commit is contained in:
Heikki Linnakangas
2021-03-30 12:05:45 +03:00
committed by Stas Kelvich
parent 1348915655
commit 98fd4aeffe

View File

@@ -193,16 +193,17 @@ impl WalRedoProcess {
let f_stdin = async {
// Send base image, if any. (If the record initializes the page, previous page
// version is not needed.)
timeout(TIMEOUT, stdin.write(&build_begin_redo_for_block_msg(tag))).await??;
timeout(TIMEOUT, stdin.write_all(&build_begin_redo_for_block_msg(tag))).await??;
if base_img.is_some() {
timeout(TIMEOUT, stdin.write(&build_push_page_msg(tag, base_img.unwrap()))).await??;
timeout(TIMEOUT, stdin.write_all(&build_push_page_msg(tag, base_img.unwrap()))).await??;
}
// Send WAL records.
for rec in records.iter() {
let r = rec.clone();
timeout(TIMEOUT, stdin.write(&build_apply_record_msg(r.lsn, r.rec))).await??;
stdin.write_all(&build_apply_record_msg(r.lsn, r.rec)).await?;
//debug!("sent WAL record to wal redo postgres process ({:X}/{:X}",
// r.lsn >> 32, r.lsn & 0xffff_ffff);
}
@@ -210,7 +211,7 @@ impl WalRedoProcess {
// records.len(), lsn >> 32, lsn & 0xffff_ffff);
// Send GetPage command to get the result back
timeout(TIMEOUT, stdin.write(&build_get_page_msg(tag))).await??;
timeout(TIMEOUT, stdin.write_all(&build_get_page_msg(tag))).await??;
timeout(TIMEOUT, stdin.flush()).await??;
//debug!("sent GetPage for {}", tag.blknum);
Ok::<(), Error>(())
@@ -240,16 +241,19 @@ impl WalRedoProcess {
fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes
{
let mut buf = BytesMut::new();
let len = 4 + 5*4;
let mut buf = BytesMut::with_capacity(1 + len);
buf.put_u8('B' as u8);
buf.put_u32(4 + 5*4);
buf.put_u32(len as u32);
buf.put_u32(tag.spcnode);
buf.put_u32(tag.dbnode);
buf.put_u32(tag.relnode);
buf.put_u32(tag.forknum as u32);
buf.put_u32(tag.blknum);
assert!(buf.len() == 1 + len);
return buf.freeze();
}
@@ -257,10 +261,11 @@ fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes
{
assert!(base_img.len() == 8192);
let mut buf = BytesMut::new();
let len = 4 + 5*4 + base_img.len();
let mut buf = BytesMut::with_capacity(1 + len);
buf.put_u8('P' as u8);
buf.put_u32(4 + 5*4 + base_img.len() as u32);
buf.put_u32(len as u32);
buf.put_u32(tag.spcnode);
buf.put_u32(tag.dbnode);
buf.put_u32(tag.relnode);
@@ -268,32 +273,39 @@ fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes
buf.put_u32(tag.blknum);
buf.put(base_img);
assert!(buf.len() == 1 + len);
return buf.freeze();
}
fn build_apply_record_msg(lsn: u64, rec: Bytes) -> Bytes {
let mut buf = BytesMut::new();
let len = 4 + 8 + rec.len();
let mut buf = BytesMut::with_capacity(1 + len);
buf.put_u8('A' as u8);
buf.put_u32(4 + 8 + rec.len() as u32);
buf.put_u32(len as u32);
buf.put_u64(lsn);
buf.put(rec);
assert!(buf.len() == 1 + len);
return buf.freeze();
}
fn build_get_page_msg(tag: BufferTag, ) -> Bytes {
let mut buf = BytesMut::new();
fn build_get_page_msg(tag: BufferTag) -> Bytes {
let len = 4 + 5*4;
let mut buf = BytesMut::with_capacity(1 + len);
buf.put_u8('G' as u8);
buf.put_u32(4 + 5*4);
buf.put_u32(len as u32);
buf.put_u32(tag.spcnode);
buf.put_u32(tag.dbnode);
buf.put_u32(tag.relnode);
buf.put_u32(tag.forknum as u32);
buf.put_u32(tag.blknum);
assert!(buf.len() == 1 + len);
return buf.freeze();
}