mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 15:02:56 +00:00
Simplify code to build walredo messages.
No need to use BytesMut in these functions. Plain Vec is simpler. And should be marginally faster too; I saw BytesMut functions previously in 'perf' profile, consuming around 5% of the overall pageserver CPU time. That's gone with this patch, although I don't see any discernible difference in the overall performance test results.
This commit is contained in:
@@ -565,22 +565,16 @@ impl PostgresRedoProcess {
|
||||
stdin.write_all(&build_begin_redo_for_block_msg(tag)),
|
||||
)
|
||||
.await??;
|
||||
if base_img.is_some() {
|
||||
timeout(
|
||||
TIMEOUT,
|
||||
stdin.write_all(&build_push_page_msg(tag, base_img.unwrap())),
|
||||
)
|
||||
.await??;
|
||||
if let Some(img) = base_img {
|
||||
timeout(TIMEOUT, stdin.write_all(&build_push_page_msg(tag, &img))).await??;
|
||||
}
|
||||
|
||||
// Send WAL records.
|
||||
for rec in records.iter() {
|
||||
let r = rec.clone();
|
||||
|
||||
WAL_REDO_RECORD_COUNTER.inc();
|
||||
|
||||
stdin
|
||||
.write_all(&build_apply_record_msg(r.lsn, r.rec))
|
||||
.write_all(&build_apply_record_msg(rec.lsn, &rec.rec))
|
||||
.await?;
|
||||
|
||||
//debug!("sent WAL record to wal redo postgres process ({:X}/{:X}",
|
||||
@@ -617,58 +611,41 @@ impl PostgresRedoProcess {
|
||||
// process. See vendor/postgres/src/backend/tcop/zenith_wal_redo.c for
|
||||
// explanation of the protocol.
|
||||
|
||||
fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes {
|
||||
fn build_begin_redo_for_block_msg(tag: BufferTag) -> Vec<u8> {
|
||||
let len = 4 + 1 + 4 * 4;
|
||||
let mut buf = BytesMut::with_capacity(1 + len);
|
||||
let mut buf = Vec::with_capacity(1 + len);
|
||||
|
||||
buf.put_u8(b'B');
|
||||
buf.put_u32(len as u32);
|
||||
|
||||
// FIXME: this is a temporary hack that should go away when we refactor
|
||||
// the postgres protocol serialization + handlers.
|
||||
//
|
||||
// BytesMut is a dynamic growable buffer, used a lot in tokio code but
|
||||
// not in the std library. To write to a BytesMut from a serde serializer,
|
||||
// we need to either:
|
||||
// - pre-allocate the required buffer space. This is annoying because we
|
||||
// shouldn't care what the exact serialized size is-- that's the
|
||||
// serializer's job.
|
||||
// - Or, we need to create a temporary "writer" (which implements the
|
||||
// `Write` trait). It's a bit awkward, because the writer consumes the
|
||||
// underlying BytesMut, and we need to extract it later with
|
||||
// `into_inner`.
|
||||
let mut writer = buf.writer();
|
||||
tag.ser_into(&mut writer)
|
||||
tag.ser_into(&mut buf)
|
||||
.expect("serialize BufferTag should always succeed");
|
||||
let buf = writer.into_inner();
|
||||
|
||||
debug_assert!(buf.len() == 1 + len);
|
||||
|
||||
buf.freeze()
|
||||
buf
|
||||
}
|
||||
|
||||
fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes {
|
||||
fn build_push_page_msg(tag: BufferTag, base_img: &[u8]) -> Vec<u8> {
|
||||
assert!(base_img.len() == 8192);
|
||||
|
||||
let len = 4 + 1 + 4 * 4 + base_img.len();
|
||||
let mut buf = BytesMut::with_capacity(1 + len);
|
||||
let mut buf = Vec::with_capacity(1 + len);
|
||||
|
||||
buf.put_u8(b'P');
|
||||
buf.put_u32(len as u32);
|
||||
let mut writer = buf.writer();
|
||||
tag.ser_into(&mut writer)
|
||||
tag.ser_into(&mut buf)
|
||||
.expect("serialize BufferTag should always succeed");
|
||||
let mut buf = writer.into_inner();
|
||||
buf.put(base_img);
|
||||
|
||||
debug_assert!(buf.len() == 1 + len);
|
||||
|
||||
buf.freeze()
|
||||
buf
|
||||
}
|
||||
|
||||
fn build_apply_record_msg(endlsn: Lsn, rec: Bytes) -> Bytes {
|
||||
fn build_apply_record_msg(endlsn: Lsn, rec: &[u8]) -> Vec<u8> {
|
||||
let len = 4 + 8 + rec.len();
|
||||
let mut buf = BytesMut::with_capacity(1 + len);
|
||||
let mut buf: Vec<u8> = Vec::with_capacity(1 + len);
|
||||
|
||||
buf.put_u8(b'A');
|
||||
buf.put_u32(len as u32);
|
||||
@@ -677,21 +654,19 @@ fn build_apply_record_msg(endlsn: Lsn, rec: Bytes) -> Bytes {
|
||||
|
||||
debug_assert!(buf.len() == 1 + len);
|
||||
|
||||
buf.freeze()
|
||||
buf
|
||||
}
|
||||
|
||||
fn build_get_page_msg(tag: BufferTag) -> Bytes {
|
||||
fn build_get_page_msg(tag: BufferTag) -> Vec<u8> {
|
||||
let len = 4 + 1 + 4 * 4;
|
||||
let mut buf = BytesMut::with_capacity(1 + len);
|
||||
let mut buf = Vec::with_capacity(1 + len);
|
||||
|
||||
buf.put_u8(b'G');
|
||||
buf.put_u32(len as u32);
|
||||
let mut writer = buf.writer();
|
||||
tag.ser_into(&mut writer)
|
||||
tag.ser_into(&mut buf)
|
||||
.expect("serialize BufferTag should always succeed");
|
||||
let buf = writer.into_inner();
|
||||
|
||||
debug_assert!(buf.len() == 1 + len);
|
||||
|
||||
buf.freeze()
|
||||
buf
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user