mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 06:52:55 +00:00
pageserver - reduce # of copies in walredo request
This commit is contained in:
@@ -249,22 +249,19 @@ impl PostgresRedoManager {
|
||||
process: &mut PostgresRedoProcess,
|
||||
request: &WalRedoRequest,
|
||||
) -> Result<Bytes, WalRedoError> {
|
||||
let rel = request.rel;
|
||||
let blknum = request.blknum;
|
||||
let lsn = request.lsn;
|
||||
let base_img = request.base_img.clone();
|
||||
let records = &request.records;
|
||||
|
||||
let nrecords = records.len();
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
let apply_result = if let RelishTag::Relation(rel) = rel {
|
||||
let apply_result = if let RelishTag::Relation(rel) = request.rel {
|
||||
// Relational WAL records are applied using wal-redo-postgres
|
||||
let buf_tag = BufferTag { rel, blknum };
|
||||
process.apply_wal_records(buf_tag, base_img, records).await
|
||||
let buf_tag = BufferTag {
|
||||
rel,
|
||||
blknum: request.blknum,
|
||||
};
|
||||
process
|
||||
.apply_wal_records(buf_tag, &request.base_img, &request.records)
|
||||
.await
|
||||
} else {
|
||||
Ok(redo_nonrel(&request))
|
||||
Ok(redo_nonrel(request))
|
||||
};
|
||||
|
||||
let duration = start.elapsed();
|
||||
@@ -273,9 +270,9 @@ impl PostgresRedoManager {
|
||||
|
||||
debug!(
|
||||
"applied {} WAL records in {} ms to reconstruct page image at LSN {}",
|
||||
nrecords,
|
||||
request.records.len(),
|
||||
duration.as_millis(),
|
||||
lsn
|
||||
request.lsn
|
||||
);
|
||||
|
||||
if let Err(e) = apply_result {
|
||||
@@ -400,12 +397,11 @@ impl PostgresRedoProcess {
|
||||
async fn apply_wal_records(
|
||||
&mut self,
|
||||
tag: BufferTag,
|
||||
base_img: Option<Bytes>,
|
||||
base_img: &Option<Bytes>,
|
||||
records: &[WALRecord],
|
||||
) -> Result<Bytes, std::io::Error> {
|
||||
let stdin = &mut self.stdin;
|
||||
let stdout = &mut self.stdout;
|
||||
// Buffer the writes to avoid a lot of small syscalls.
|
||||
let mut stdin = tokio::io::BufWriter::new(&mut self.stdin);
|
||||
|
||||
// We do three things simultaneously: send the old base image and WAL records to
|
||||
// the child process's stdin, read the result from child's stdout, and forward any logging
|
||||
@@ -415,58 +411,47 @@ impl PostgresRedoProcess {
|
||||
// 'f_stdout' below reads the result back. And 'f_stderr', which was spawned into the
|
||||
// tokio runtime in the 'launch' function already, forwards the logging.
|
||||
let f_stdin = async {
|
||||
// Send base image, if any. (If the record initializes the page, previous page
|
||||
// version is not needed.)
|
||||
timeout(
|
||||
TIMEOUT,
|
||||
stdin.write_all(&build_begin_redo_for_block_msg(tag)),
|
||||
)
|
||||
.await??;
|
||||
let mut capacity = 1 + BEGIN_REDO_MSG_LEN;
|
||||
if base_img.is_some() {
|
||||
timeout(
|
||||
TIMEOUT,
|
||||
stdin.write_all(&build_push_page_msg(tag, base_img.unwrap())),
|
||||
)
|
||||
.await??;
|
||||
capacity += 1 + PUSH_PAGE_MSG_LEN;
|
||||
}
|
||||
capacity += (1 + APPLY_MSG_HEADER_LEN) * records.len();
|
||||
capacity += records.iter().map(|rec| rec.rec.len()).sum::<usize>();
|
||||
capacity += 1 + GET_PAGE_MSG_LEN;
|
||||
|
||||
let mut buf = BytesMut::with_capacity(capacity);
|
||||
|
||||
build_begin_redo_for_block_msg(&mut buf, tag);
|
||||
|
||||
if let Some(base_img) = base_img.as_ref() {
|
||||
build_push_page_msg(&mut buf, tag, base_img);
|
||||
}
|
||||
|
||||
// Send WAL records.
|
||||
for rec in records.iter() {
|
||||
let r = rec.clone();
|
||||
|
||||
WAL_REDO_RECORD_COUNTER.inc();
|
||||
|
||||
stdin
|
||||
.write_all(&build_apply_record_msg(r.lsn, r.rec))
|
||||
.await?;
|
||||
|
||||
//debug!("sent WAL record to wal redo postgres process ({:X}/{:X}",
|
||||
// r.lsn >> 32, r.lsn & 0xffff_ffff);
|
||||
for record in records {
|
||||
build_apply_record_msg(&mut buf, record.lsn, &record.rec);
|
||||
}
|
||||
//debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}",
|
||||
// records.len(), lsn >> 32, lsn & 0xffff_ffff);
|
||||
|
||||
// Send GetPage command to get the result back
|
||||
timeout(TIMEOUT, stdin.write_all(&build_get_page_msg(tag))).await??;
|
||||
build_get_page_msg(&mut buf, tag);
|
||||
|
||||
debug_assert_eq!(capacity, buf.len());
|
||||
|
||||
timeout(TIMEOUT, stdin.write_all(&buf)).await??;
|
||||
timeout(TIMEOUT, stdin.flush()).await??;
|
||||
//debug!("sent GetPage for {}", tag.blknum);
|
||||
|
||||
Ok::<(), Error>(())
|
||||
};
|
||||
|
||||
// Read back new page image
|
||||
let f_stdout = async {
|
||||
let mut buf = [0u8; 8192];
|
||||
let mut buf = vec![0u8; 8192];
|
||||
|
||||
timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??;
|
||||
//debug!("got response for {}", tag.blknum);
|
||||
Ok::<[u8; 8192], Error>(buf)
|
||||
Ok::<Vec<u8>, Error>(buf)
|
||||
};
|
||||
|
||||
let res = tokio::try_join!(f_stdout, f_stdin)?;
|
||||
|
||||
let buf = res.0;
|
||||
|
||||
Ok::<Bytes, Error>(Bytes::from(std::vec::Vec::from(buf)))
|
||||
let (buf, _) = tokio::try_join!(f_stdout, f_stdin)?;
|
||||
Ok::<Bytes, Error>(Bytes::from(buf))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -474,96 +459,57 @@ impl PostgresRedoProcess {
|
||||
// process. See vendor/postgres/src/backend/tcop/zenith_wal_redo.c for
|
||||
// explanation of the protocol.
|
||||
|
||||
fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes {
|
||||
let len = 4 + 1 + 4 * 4;
|
||||
let mut buf = BytesMut::with_capacity(1 + len);
|
||||
const TAG_LEN: usize = 4 * 4;
|
||||
const PAGE_SIZE: usize = 8192;
|
||||
const BEGIN_REDO_MSG_LEN: usize = 4 + 1 + TAG_LEN;
|
||||
const PUSH_PAGE_MSG_LEN: usize = 4 + 1 + TAG_LEN + PAGE_SIZE;
|
||||
const APPLY_MSG_HEADER_LEN: usize = 4 + 8;
|
||||
const GET_PAGE_MSG_LEN: usize = 4 + 1 + TAG_LEN;
|
||||
|
||||
fn build_begin_redo_for_block_msg(buf: &mut BytesMut, tag: BufferTag) {
|
||||
buf.put_u8(b'B');
|
||||
buf.put_u32(len as u32);
|
||||
buf.put_u32(BEGIN_REDO_MSG_LEN as u32);
|
||||
|
||||
// FIXME: this is a temporary hack that should go away when we refactor
|
||||
// the postgres protocol serialization + handlers.
|
||||
//
|
||||
// BytesMut is a dynamic growable buffer, used a lot in tokio code but
|
||||
// not in the std library. To write to a BytesMut from a serde serializer,
|
||||
// we need to either:
|
||||
// - pre-allocate the required buffer space. This is annoying because we
|
||||
// shouldn't care what the exact serialized size is-- that's the
|
||||
// serializer's job.
|
||||
// - Or, we need to create a temporary "writer" (which implements the
|
||||
// `Write` trait). It's a bit awkward, because the writer consumes the
|
||||
// underlying BytesMut, and we need to extract it later with
|
||||
// `into_inner`.
|
||||
let mut writer = buf.writer();
|
||||
tag.ser_into(&mut writer)
|
||||
tag.ser_into(&mut buf.writer())
|
||||
.expect("serialize BufferTag should always succeed");
|
||||
let buf = writer.into_inner();
|
||||
|
||||
debug_assert!(buf.len() == 1 + len);
|
||||
|
||||
buf.freeze()
|
||||
}
|
||||
|
||||
fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes {
|
||||
assert!(base_img.len() == 8192);
|
||||
|
||||
let len = 4 + 1 + 4 * 4 + base_img.len();
|
||||
let mut buf = BytesMut::with_capacity(1 + len);
|
||||
fn build_push_page_msg(buf: &mut BytesMut, tag: BufferTag, base_img: &Bytes) {
|
||||
debug_assert_eq!(base_img.len(), PAGE_SIZE);
|
||||
|
||||
buf.put_u8(b'P');
|
||||
buf.put_u32(len as u32);
|
||||
let mut writer = buf.writer();
|
||||
tag.ser_into(&mut writer)
|
||||
buf.put_u32(PUSH_PAGE_MSG_LEN as u32);
|
||||
tag.ser_into(&mut buf.writer())
|
||||
.expect("serialize BufferTag should always succeed");
|
||||
let mut buf = writer.into_inner();
|
||||
buf.put(base_img);
|
||||
|
||||
debug_assert!(buf.len() == 1 + len);
|
||||
|
||||
buf.freeze()
|
||||
buf.extend(base_img);
|
||||
}
|
||||
|
||||
fn build_apply_record_msg(endlsn: Lsn, rec: Bytes) -> Bytes {
|
||||
let len = 4 + 8 + rec.len();
|
||||
let mut buf = BytesMut::with_capacity(1 + len);
|
||||
|
||||
fn build_apply_record_msg(buf: &mut BytesMut, endlsn: Lsn, rec: &Bytes) {
|
||||
buf.put_u8(b'A');
|
||||
|
||||
let len = APPLY_MSG_HEADER_LEN + rec.len();
|
||||
buf.put_u32(len as u32);
|
||||
|
||||
buf.put_u64(endlsn.0);
|
||||
buf.put(rec);
|
||||
|
||||
debug_assert!(buf.len() == 1 + len);
|
||||
|
||||
buf.freeze()
|
||||
buf.extend(rec);
|
||||
}
|
||||
|
||||
fn build_get_page_msg(tag: BufferTag) -> Bytes {
|
||||
let len = 4 + 1 + 4 * 4;
|
||||
let mut buf = BytesMut::with_capacity(1 + len);
|
||||
|
||||
fn build_get_page_msg(buf: &mut BytesMut, tag: BufferTag) {
|
||||
buf.put_u8(b'G');
|
||||
buf.put_u32(len as u32);
|
||||
let mut writer = buf.writer();
|
||||
tag.ser_into(&mut writer)
|
||||
buf.put_u32(GET_PAGE_MSG_LEN as u32);
|
||||
tag.ser_into(&mut buf.writer())
|
||||
.expect("serialize BufferTag should always succeed");
|
||||
let buf = writer.into_inner();
|
||||
|
||||
debug_assert!(buf.len() == 1 + len);
|
||||
|
||||
buf.freeze()
|
||||
}
|
||||
|
||||
fn redo_nonrel(request: &WalRedoRequest) -> Bytes {
|
||||
let rel = request.rel;
|
||||
let blknum = request.blknum;
|
||||
let base_img = request.base_img.clone();
|
||||
let records = &request.records;
|
||||
|
||||
// Non-relational WAL records are handled here, with custom code that has the
|
||||
// same effects as the corresponding Postgres WAL redo function.
|
||||
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
|
||||
let mut page = BytesMut::new();
|
||||
if let Some(fpi) = base_img {
|
||||
if let Some(fpi) = &request.base_img {
|
||||
// If full-page image is provided, then use it...
|
||||
page.extend_from_slice(&fpi[..]);
|
||||
} else {
|
||||
@@ -571,7 +517,7 @@ fn redo_nonrel(request: &WalRedoRequest) -> Bytes {
|
||||
page.extend_from_slice(&ZERO_PAGE);
|
||||
}
|
||||
// Apply all collected WAL records
|
||||
for record in records {
|
||||
for record in &request.records {
|
||||
let mut buf = record.rec.clone();
|
||||
|
||||
WAL_REDO_RECORD_COUNTER.inc();
|
||||
|
||||
Reference in New Issue
Block a user