From 95a85312f549691a5acc448672955427df2aff1e Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 11 Oct 2021 20:02:47 +0300 Subject: [PATCH] Simplify code to build walredo messages. No need to use BytesMut in these functions. Plain Vec is simpler. And should be marginally faster too; I saw BytesMut functions previously in 'perf' profile, consuming around 5% of the overall pageserver CPU time. That's gone with this patch, although I don't see any discernible difference in the overall performance test results. --- pageserver/src/walredo.rs | 61 ++++++++++++--------------------------- 1 file changed, 18 insertions(+), 43 deletions(-) diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index f233fceb3e..e9382c4da5 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -565,22 +565,16 @@ impl PostgresRedoProcess { stdin.write_all(&build_begin_redo_for_block_msg(tag)), ) .await??; - if base_img.is_some() { - timeout( - TIMEOUT, - stdin.write_all(&build_push_page_msg(tag, base_img.unwrap())), - ) - .await??; + if let Some(img) = base_img { + timeout(TIMEOUT, stdin.write_all(&build_push_page_msg(tag, &img))).await??; } // Send WAL records. for rec in records.iter() { - let r = rec.clone(); - WAL_REDO_RECORD_COUNTER.inc(); stdin - .write_all(&build_apply_record_msg(r.lsn, r.rec)) + .write_all(&build_apply_record_msg(rec.lsn, &rec.rec)) .await?; //debug!("sent WAL record to wal redo postgres process ({:X}/{:X}", @@ -617,58 +611,41 @@ impl PostgresRedoProcess { // process. See vendor/postgres/src/backend/tcop/zenith_wal_redo.c for // explanation of the protocol. -fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes { +fn build_begin_redo_for_block_msg(tag: BufferTag) -> Vec { let len = 4 + 1 + 4 * 4; - let mut buf = BytesMut::with_capacity(1 + len); + let mut buf = Vec::with_capacity(1 + len); buf.put_u8(b'B'); buf.put_u32(len as u32); - // FIXME: this is a temporary hack that should go away when we refactor - // the postgres protocol serialization + handlers. - // - // BytesMut is a dynamic growable buffer, used a lot in tokio code but - // not in the std library. To write to a BytesMut from a serde serializer, - // we need to either: - // - pre-allocate the required buffer space. This is annoying because we - // shouldn't care what the exact serialized size is-- that's the - // serializer's job. - // - Or, we need to create a temporary "writer" (which implements the - // `Write` trait). It's a bit awkward, because the writer consumes the - // underlying BytesMut, and we need to extract it later with - // `into_inner`. - let mut writer = buf.writer(); - tag.ser_into(&mut writer) + tag.ser_into(&mut buf) .expect("serialize BufferTag should always succeed"); - let buf = writer.into_inner(); debug_assert!(buf.len() == 1 + len); - buf.freeze() + buf } -fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes { +fn build_push_page_msg(tag: BufferTag, base_img: &[u8]) -> Vec { assert!(base_img.len() == 8192); let len = 4 + 1 + 4 * 4 + base_img.len(); - let mut buf = BytesMut::with_capacity(1 + len); + let mut buf = Vec::with_capacity(1 + len); buf.put_u8(b'P'); buf.put_u32(len as u32); - let mut writer = buf.writer(); - tag.ser_into(&mut writer) + tag.ser_into(&mut buf) .expect("serialize BufferTag should always succeed"); - let mut buf = writer.into_inner(); buf.put(base_img); debug_assert!(buf.len() == 1 + len); - buf.freeze() + buf } -fn build_apply_record_msg(endlsn: Lsn, rec: Bytes) -> Bytes { +fn build_apply_record_msg(endlsn: Lsn, rec: &[u8]) -> Vec { let len = 4 + 8 + rec.len(); - let mut buf = BytesMut::with_capacity(1 + len); + let mut buf: Vec = Vec::with_capacity(1 + len); buf.put_u8(b'A'); buf.put_u32(len as u32); @@ -677,21 +654,19 @@ fn build_apply_record_msg(endlsn: Lsn, rec: Bytes) -> Bytes { debug_assert!(buf.len() == 1 + len); - buf.freeze() + buf } -fn build_get_page_msg(tag: BufferTag) -> Bytes { +fn build_get_page_msg(tag: BufferTag) -> Vec { let len = 4 + 1 + 4 * 4; - let mut buf = BytesMut::with_capacity(1 + len); + let mut buf = Vec::with_capacity(1 + len); buf.put_u8(b'G'); buf.put_u32(len as u32); - let mut writer = buf.writer(); - tag.ser_into(&mut writer) + tag.ser_into(&mut buf) .expect("serialize BufferTag should always succeed"); - let buf = writer.into_inner(); debug_assert!(buf.len() == 1 + len); - buf.freeze() + buf }