mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 15:02:56 +00:00
pageserver - walredo request serialization module
This commit is contained in:
@@ -19,8 +19,9 @@
|
||||
//! process, he cannot escape out of it.
|
||||
|
||||
mod nonrel;
|
||||
mod request;
|
||||
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use bytes::Bytes;
|
||||
use lazy_static::lazy_static;
|
||||
use log::*;
|
||||
use serde::Serialize;
|
||||
@@ -38,7 +39,6 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::process::{ChildStdin, ChildStdout, Command};
|
||||
use tokio::time::timeout;
|
||||
use zenith_metrics::{register_histogram, register_int_counter, Histogram, IntCounter};
|
||||
use zenith_utils::bin_ser::BeSer;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
use zenith_utils::zid::ZTenantId;
|
||||
|
||||
@@ -404,29 +404,7 @@ impl PostgresRedoProcess {
|
||||
// 'f_stdout' below reads the result back. And 'f_stderr', which was spawned into the
|
||||
// tokio runtime in the 'launch' function already, forwards the logging.
|
||||
let f_stdin = async {
|
||||
let mut capacity = 1 + BEGIN_REDO_MSG_LEN;
|
||||
if base_img.is_some() {
|
||||
capacity += 1 + PUSH_PAGE_MSG_LEN;
|
||||
}
|
||||
capacity += (1 + APPLY_MSG_HEADER_LEN) * records.len();
|
||||
capacity += records.iter().map(|rec| rec.rec.len()).sum::<usize>();
|
||||
capacity += 1 + GET_PAGE_MSG_LEN;
|
||||
|
||||
let mut buf = BytesMut::with_capacity(capacity);
|
||||
|
||||
build_begin_redo_for_block_msg(&mut buf, tag);
|
||||
|
||||
if let Some(base_img) = base_img.as_ref() {
|
||||
build_push_page_msg(&mut buf, tag, base_img);
|
||||
}
|
||||
|
||||
for record in records {
|
||||
build_apply_record_msg(&mut buf, record.lsn, &record.rec);
|
||||
}
|
||||
|
||||
build_get_page_msg(&mut buf, tag);
|
||||
|
||||
debug_assert_eq!(capacity, buf.len());
|
||||
let buf = request::serialize_request(tag, base_img, records);
|
||||
|
||||
timeout(TIMEOUT, stdin.write_all(&buf)).await??;
|
||||
timeout(TIMEOUT, stdin.flush()).await??;
|
||||
@@ -447,49 +425,3 @@ impl PostgresRedoProcess {
|
||||
Ok::<Bytes, Error>(Bytes::from(buf))
|
||||
}
|
||||
}
|
||||
|
||||
// Functions for constructing messages to send to the postgres WAL redo
|
||||
// process. See vendor/postgres/src/backend/tcop/zenith_wal_redo.c for
|
||||
// explanation of the protocol.
|
||||
|
||||
const TAG_LEN: usize = 4 * 4;
|
||||
const PAGE_SIZE: usize = 8192;
|
||||
const BEGIN_REDO_MSG_LEN: usize = 4 + 1 + TAG_LEN;
|
||||
const PUSH_PAGE_MSG_LEN: usize = 4 + 1 + TAG_LEN + PAGE_SIZE;
|
||||
const APPLY_MSG_HEADER_LEN: usize = 4 + 8;
|
||||
const GET_PAGE_MSG_LEN: usize = 4 + 1 + TAG_LEN;
|
||||
|
||||
fn build_begin_redo_for_block_msg(buf: &mut BytesMut, tag: BufferTag) {
|
||||
buf.put_u8(b'B');
|
||||
buf.put_u32(BEGIN_REDO_MSG_LEN as u32);
|
||||
|
||||
tag.ser_into(&mut buf.writer())
|
||||
.expect("serialize BufferTag should always succeed");
|
||||
}
|
||||
|
||||
fn build_push_page_msg(buf: &mut BytesMut, tag: BufferTag, base_img: &Bytes) {
|
||||
debug_assert_eq!(base_img.len(), PAGE_SIZE);
|
||||
|
||||
buf.put_u8(b'P');
|
||||
buf.put_u32(PUSH_PAGE_MSG_LEN as u32);
|
||||
tag.ser_into(&mut buf.writer())
|
||||
.expect("serialize BufferTag should always succeed");
|
||||
buf.extend(base_img);
|
||||
}
|
||||
|
||||
fn build_apply_record_msg(buf: &mut BytesMut, endlsn: Lsn, rec: &Bytes) {
|
||||
buf.put_u8(b'A');
|
||||
|
||||
let len = APPLY_MSG_HEADER_LEN + rec.len();
|
||||
buf.put_u32(len as u32);
|
||||
|
||||
buf.put_u64(endlsn.0);
|
||||
buf.extend(rec);
|
||||
}
|
||||
|
||||
fn build_get_page_msg(buf: &mut BytesMut, tag: BufferTag) {
|
||||
buf.put_u8(b'G');
|
||||
buf.put_u32(GET_PAGE_MSG_LEN as u32);
|
||||
tag.ser_into(&mut buf.writer())
|
||||
.expect("serialize BufferTag should always succeed");
|
||||
}
|
||||
|
||||
86
pageserver/src/walredo/request.rs
Normal file
86
pageserver/src/walredo/request.rs
Normal file
@@ -0,0 +1,86 @@
|
||||
///! Functions for constructing messages to send to the postgres WAL redo
|
||||
///! process. See vendor/postgres/src/backend/tcop/zenith_wal_redo.c for
|
||||
///! explanation of the protocol.
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use zenith_utils::{bin_ser::BeSer, lsn::Lsn};
|
||||
|
||||
use crate::repository::WALRecord;
|
||||
|
||||
use super::BufferTag;
|
||||
|
||||
pub fn serialize_request(
|
||||
tag: BufferTag,
|
||||
base_img: &Option<Bytes>,
|
||||
records: &[WALRecord],
|
||||
) -> BytesMut {
|
||||
let mut capacity = 1 + BEGIN_REDO_MSG_LEN;
|
||||
if base_img.is_some() {
|
||||
capacity += 1 + PUSH_PAGE_MSG_LEN;
|
||||
}
|
||||
capacity += (1 + APPLY_MSG_HEADER_LEN) * records.len();
|
||||
capacity += records.iter().map(|rec| rec.rec.len()).sum::<usize>();
|
||||
capacity += 1 + GET_PAGE_MSG_LEN;
|
||||
|
||||
let mut buf = BytesMut::with_capacity(capacity);
|
||||
|
||||
build_begin_redo_for_block_msg(&mut buf, tag);
|
||||
|
||||
if let Some(base_img) = base_img.as_ref() {
|
||||
build_push_page_msg(&mut buf, tag, base_img);
|
||||
}
|
||||
|
||||
for record in records {
|
||||
build_apply_record_msg(&mut buf, record.lsn, &record.rec);
|
||||
}
|
||||
|
||||
build_get_page_msg(&mut buf, tag);
|
||||
|
||||
debug_assert_eq!(capacity, buf.len());
|
||||
|
||||
buf
|
||||
}
|
||||
|
||||
const TAG_LEN: usize = 4 * 4;
|
||||
const PAGE_SIZE: usize = 8192;
|
||||
const BEGIN_REDO_MSG_LEN: usize = 4 + 1 + TAG_LEN;
|
||||
const PUSH_PAGE_MSG_LEN: usize = 4 + 1 + TAG_LEN + PAGE_SIZE;
|
||||
const APPLY_MSG_HEADER_LEN: usize = 4 + 8;
|
||||
const GET_PAGE_MSG_LEN: usize = 4 + 1 + TAG_LEN;
|
||||
|
||||
fn build_begin_redo_for_block_msg(buf: &mut BytesMut, tag: BufferTag) {
|
||||
buf.put_u8(b'B');
|
||||
buf.put_u32(BEGIN_REDO_MSG_LEN as u32);
|
||||
|
||||
// TODO tag is serialized multiple times
|
||||
// let's try to serialize it just once
|
||||
// or make the protocol less repetitive
|
||||
tag.ser_into(&mut buf.writer())
|
||||
.expect("serialize BufferTag should always succeed");
|
||||
}
|
||||
|
||||
fn build_push_page_msg(buf: &mut BytesMut, tag: BufferTag, base_img: &Bytes) {
|
||||
debug_assert_eq!(base_img.len(), PAGE_SIZE);
|
||||
|
||||
buf.put_u8(b'P');
|
||||
buf.put_u32(PUSH_PAGE_MSG_LEN as u32);
|
||||
tag.ser_into(&mut buf.writer())
|
||||
.expect("serialize BufferTag should always succeed");
|
||||
buf.extend(base_img);
|
||||
}
|
||||
|
||||
fn build_apply_record_msg(buf: &mut BytesMut, endlsn: Lsn, rec: &Bytes) {
|
||||
buf.put_u8(b'A');
|
||||
|
||||
let len = APPLY_MSG_HEADER_LEN + rec.len();
|
||||
buf.put_u32(len as u32);
|
||||
|
||||
buf.put_u64(endlsn.0);
|
||||
buf.extend(rec);
|
||||
}
|
||||
|
||||
fn build_get_page_msg(buf: &mut BytesMut, tag: BufferTag) {
|
||||
buf.put_u8(b'G');
|
||||
buf.put_u32(GET_PAGE_MSG_LEN as u32);
|
||||
tag.ser_into(&mut buf.writer())
|
||||
.expect("serialize BufferTag should always succeed");
|
||||
}
|
||||
Reference in New Issue
Block a user