|
|
|
|
@@ -22,24 +22,26 @@ use byteorder::{ByteOrder, LittleEndian};
|
|
|
|
|
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
|
|
|
|
use lazy_static::lazy_static;
|
|
|
|
|
use log::*;
|
|
|
|
|
use rand::Rng;
|
|
|
|
|
use nix::poll::*;
|
|
|
|
|
use serde::Serialize;
|
|
|
|
|
use std::fs;
|
|
|
|
|
use std::fs::OpenOptions;
|
|
|
|
|
use std::io::prelude::*;
|
|
|
|
|
use std::io::Error;
|
|
|
|
|
use std::io::{Error, ErrorKind};
|
|
|
|
|
use std::os::unix::io::AsRawFd;
|
|
|
|
|
use std::path::PathBuf;
|
|
|
|
|
use std::process::Stdio;
|
|
|
|
|
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
|
|
|
|
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
|
|
|
|
use std::sync::mpsc;
|
|
|
|
|
use std::sync::mpsc::{Receiver, Sender, SyncSender};
|
|
|
|
|
use std::sync::Mutex;
|
|
|
|
|
use std::time::Duration;
|
|
|
|
|
use std::time::Instant;
|
|
|
|
|
use tokio::io::AsyncBufReadExt;
|
|
|
|
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
|
|
|
|
use tokio::process::{ChildStdin, ChildStdout, Command};
|
|
|
|
|
use tokio::time::timeout;
|
|
|
|
|
use zenith_metrics::{register_histogram, register_int_counter, Histogram, IntCounter};
|
|
|
|
|
use zenith_utils::bin_ser::BeSer;
|
|
|
|
|
use zenith_utils::lsn::Lsn;
|
|
|
|
|
use zenith_utils::nonblock::set_nonblock;
|
|
|
|
|
use zenith_utils::zid::ZTenantId;
|
|
|
|
|
|
|
|
|
|
use crate::relish::*;
|
|
|
|
|
@@ -54,7 +56,9 @@ use postgres_ffi::nonrelfile_utils::transaction_id_set_status;
|
|
|
|
|
use postgres_ffi::pg_constants;
|
|
|
|
|
use postgres_ffi::XLogRecord;
|
|
|
|
|
|
|
|
|
|
const WAL_REDO_WORKERS: usize = 1;
|
|
|
|
|
const N_CHANNELS: usize = 16;
|
|
|
|
|
const CHANNEL_SIZE: usize = 1024 * 1024;
|
|
|
|
|
type ChannelId = usize;
|
|
|
|
|
|
|
|
|
|
///
|
|
|
|
|
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
|
|
|
|
|
@@ -133,17 +137,22 @@ lazy_static! {
|
|
|
|
|
|
|
|
|
|
///
|
|
|
|
|
/// This is the real implementation that uses a Postgres process to
|
|
|
|
|
/// perform WAL replay. Only one thread can use the processs at a time,
|
|
|
|
|
/// that is controlled by the Mutex. In the future, we might want to
|
|
|
|
|
/// launch a pool of processes to allow concurrent replay of multiple
|
|
|
|
|
/// records.
|
|
|
|
|
/// perform WAL replay. I multiplex requests from multiple threads
|
|
|
|
|
/// using `sender` channel and send them to the postgres wal-redo process
|
|
|
|
|
/// pipe by separate thread. Responses are returned through set of `receivers`
|
|
|
|
|
/// channels, used in round robin manner. Receiver thread is protected by mutex
|
|
|
|
|
/// to prevent it's usage by more than one thread
|
|
|
|
|
/// In the future, we might want to launch a pool of processes to allow concurrent
|
|
|
|
|
/// replay of multiple records.
|
|
|
|
|
///
|
|
|
|
|
pub struct PostgresRedoManager {
|
|
|
|
|
tenantid: ZTenantId,
|
|
|
|
|
conf: &'static PageServerConf,
|
|
|
|
|
|
|
|
|
|
runtime: tokio::runtime::Runtime,
|
|
|
|
|
workers: [Mutex<Option<PostgresRedoProcess>>; WAL_REDO_WORKERS],
|
|
|
|
|
// mutiplexor pipe: use sync_channel to allow sharing sender by multiple threads
|
|
|
|
|
// and limit size of buffer
|
|
|
|
|
sender: SyncSender<(ChannelId, Vec<u8>)>,
|
|
|
|
|
// set of receiver channels
|
|
|
|
|
receivers: Vec<Mutex<Receiver<Bytes>>>,
|
|
|
|
|
// atomicly incremented counter for choosing receiver
|
|
|
|
|
round_robin: AtomicUsize,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
@@ -156,6 +165,13 @@ struct WalRedoRequest {
|
|
|
|
|
records: Vec<(Lsn, WALRecord)>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl WalRedoRequest {
|
|
|
|
|
// Can this request be served by zenith redo funcitons
|
|
|
|
|
// or we need to pass it to wal-redo postgres process?
|
|
|
|
|
fn can_apply_in_zenith(&self) -> bool {
|
|
|
|
|
!matches!(self.rel, RelishTag::Relation(_))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
/// An error happened in WAL redo
|
|
|
|
|
#[derive(Debug, thiserror::Error)]
|
|
|
|
|
pub enum WalRedoError {
|
|
|
|
|
@@ -164,6 +180,8 @@ pub enum WalRedoError {
|
|
|
|
|
|
|
|
|
|
#[error("cannot perform WAL redo now")]
|
|
|
|
|
InvalidState,
|
|
|
|
|
#[error("cannot perform WAL redo for this request")]
|
|
|
|
|
InvalidRequest,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
///
|
|
|
|
|
@@ -184,10 +202,6 @@ impl WalRedoManager for PostgresRedoManager {
|
|
|
|
|
base_img: Option<Bytes>,
|
|
|
|
|
records: Vec<(Lsn, WALRecord)>,
|
|
|
|
|
) -> Result<Bytes, WalRedoError> {
|
|
|
|
|
let start_time;
|
|
|
|
|
let lock_time;
|
|
|
|
|
let end_time;
|
|
|
|
|
|
|
|
|
|
let request = WalRedoRequest {
|
|
|
|
|
rel,
|
|
|
|
|
blknum,
|
|
|
|
|
@@ -195,30 +209,14 @@ impl WalRedoManager for PostgresRedoManager {
|
|
|
|
|
base_img,
|
|
|
|
|
records,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
start_time = Instant::now();
|
|
|
|
|
let result = {
|
|
|
|
|
let mut process_guard = self.workers[rand::thread_rng().gen_range(0..WAL_REDO_WORKERS)]
|
|
|
|
|
.lock()
|
|
|
|
|
.unwrap();
|
|
|
|
|
lock_time = Instant::now();
|
|
|
|
|
|
|
|
|
|
// launch the WAL redo process on first use
|
|
|
|
|
if process_guard.is_none() {
|
|
|
|
|
let p = self
|
|
|
|
|
.runtime
|
|
|
|
|
.block_on(PostgresRedoProcess::launch(self.conf, &self.tenantid))?;
|
|
|
|
|
*process_guard = Some(p);
|
|
|
|
|
}
|
|
|
|
|
let process = process_guard.as_mut().unwrap();
|
|
|
|
|
|
|
|
|
|
self.runtime
|
|
|
|
|
.block_on(self.handle_apply_request(process, &request))
|
|
|
|
|
let start_time = Instant::now();
|
|
|
|
|
let result = if request.can_apply_in_zenith() {
|
|
|
|
|
self.handle_apply_request_zenith(&request)
|
|
|
|
|
} else {
|
|
|
|
|
self.handle_apply_request_postgres(&request)
|
|
|
|
|
};
|
|
|
|
|
end_time = Instant::now();
|
|
|
|
|
|
|
|
|
|
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
|
|
|
|
|
WAL_REDO_TIME.observe(end_time.duration_since(lock_time).as_secs_f64());
|
|
|
|
|
let end_time = Instant::now();
|
|
|
|
|
WAL_REDO_TIME.observe(end_time.duration_since(start_time).as_secs_f64());
|
|
|
|
|
|
|
|
|
|
result
|
|
|
|
|
}
|
|
|
|
|
@@ -228,32 +226,109 @@ impl PostgresRedoManager {
|
|
|
|
|
///
|
|
|
|
|
/// Create a new PostgresRedoManager.
|
|
|
|
|
///
|
|
|
|
|
pub fn new(conf: &'static PageServerConf, tenantid: ZTenantId) -> PostgresRedoManager {
|
|
|
|
|
// We block on waiting for requests on the walredo request channel, but
|
|
|
|
|
// use async I/O to communicate with the child process. Initialize the
|
|
|
|
|
// runtime for the async part.
|
|
|
|
|
let runtime = tokio::runtime::Builder::new_current_thread()
|
|
|
|
|
.enable_all()
|
|
|
|
|
.build()
|
|
|
|
|
.unwrap();
|
|
|
|
|
pub fn new(conf: &PageServerConf, tenantid: ZTenantId) -> PostgresRedoManager {
|
|
|
|
|
let (tx, rx): (
|
|
|
|
|
SyncSender<(ChannelId, Vec<u8>)>,
|
|
|
|
|
Receiver<(ChannelId, Vec<u8>)>,
|
|
|
|
|
) = mpsc::sync_channel(CHANNEL_SIZE);
|
|
|
|
|
let mut senders: Vec<Sender<Bytes>> = Vec::with_capacity(N_CHANNELS);
|
|
|
|
|
let mut receivers: Vec<Mutex<Receiver<Bytes>>> = Vec::with_capacity(N_CHANNELS);
|
|
|
|
|
for _ in 0..N_CHANNELS {
|
|
|
|
|
let (tx, rx) = mpsc::channel();
|
|
|
|
|
senders.push(tx);
|
|
|
|
|
receivers.push(Mutex::new(rx));
|
|
|
|
|
}
|
|
|
|
|
if let Ok(mut proc) = PostgresRedoProcess::launch(conf, &tenantid) {
|
|
|
|
|
let _proxy = std::thread::spawn(move || loop {
|
|
|
|
|
let (id, data) = rx.recv().unwrap();
|
|
|
|
|
match proc.apply_wal_records(data) {
|
|
|
|
|
Ok(page) => senders[id as usize].send(page).unwrap(),
|
|
|
|
|
Err(err) => {
|
|
|
|
|
info!("wal-redo failed with error {:?}", err);
|
|
|
|
|
proc.kill();
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
PostgresRedoManager {
|
|
|
|
|
sender: tx,
|
|
|
|
|
receivers,
|
|
|
|
|
round_robin: AtomicUsize::new(0),
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
panic!("Failed to launch wal-redo postgres");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// The actual process is launched lazily, on first request.
|
|
|
|
|
PostgresRedoManager {
|
|
|
|
|
runtime,
|
|
|
|
|
tenantid,
|
|
|
|
|
conf,
|
|
|
|
|
workers: [(); WAL_REDO_WORKERS].map(|_| Mutex::new(None)),
|
|
|
|
|
fn apply_wal_records(
|
|
|
|
|
&self,
|
|
|
|
|
tag: BufferTag,
|
|
|
|
|
base_img: Option<Bytes>,
|
|
|
|
|
records: &[(Lsn, WALRecord)],
|
|
|
|
|
) -> Result<Bytes, std::io::Error> {
|
|
|
|
|
// Serialize all the messages to send the WAL redo process first.
|
|
|
|
|
//
|
|
|
|
|
// This could be problematic if there are millions of records to replay,
|
|
|
|
|
// but in practice the number of records is usually so small that it doesn't
|
|
|
|
|
// matter, and it's better to keep this code simple.
|
|
|
|
|
let mut writebuf: Vec<u8> = Vec::new();
|
|
|
|
|
build_begin_redo_for_block_msg(tag, &mut writebuf);
|
|
|
|
|
if let Some(img) = base_img {
|
|
|
|
|
build_push_page_msg(tag, &img, &mut writebuf);
|
|
|
|
|
}
|
|
|
|
|
for (lsn, rec) in records.iter() {
|
|
|
|
|
build_apply_record_msg(*lsn, &rec.rec, &mut writebuf);
|
|
|
|
|
}
|
|
|
|
|
build_get_page_msg(tag, &mut writebuf);
|
|
|
|
|
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
|
|
|
|
|
|
|
|
|
|
let id = self.round_robin.fetch_add(1, Ordering::Relaxed) % N_CHANNELS;
|
|
|
|
|
let rx = self.receivers[id].lock().unwrap();
|
|
|
|
|
self.sender.send((id, writebuf)).unwrap();
|
|
|
|
|
Ok(rx.recv().unwrap())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
///
|
|
|
|
|
/// Process one request for WAL redo using wal-redo postgres
|
|
|
|
|
///
|
|
|
|
|
fn handle_apply_request_postgres(
|
|
|
|
|
&self,
|
|
|
|
|
request: &WalRedoRequest,
|
|
|
|
|
) -> Result<Bytes, WalRedoError> {
|
|
|
|
|
let blknum = request.blknum;
|
|
|
|
|
let lsn = request.lsn;
|
|
|
|
|
let base_img = request.base_img.clone();
|
|
|
|
|
let records = &request.records;
|
|
|
|
|
let nrecords = records.len();
|
|
|
|
|
|
|
|
|
|
let start = Instant::now();
|
|
|
|
|
|
|
|
|
|
let apply_result: Result<Bytes, Error>;
|
|
|
|
|
|
|
|
|
|
if let RelishTag::Relation(rel) = request.rel {
|
|
|
|
|
// Relational WAL records are applied using wal-redo-postgres
|
|
|
|
|
let buf_tag = BufferTag { rel, blknum };
|
|
|
|
|
apply_result = self.apply_wal_records(buf_tag, base_img, records);
|
|
|
|
|
|
|
|
|
|
let duration = start.elapsed();
|
|
|
|
|
|
|
|
|
|
debug!(
|
|
|
|
|
"postgres applied {} WAL records in {} us to reconstruct page image at LSN {}",
|
|
|
|
|
nrecords,
|
|
|
|
|
duration.as_micros(),
|
|
|
|
|
lsn
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
apply_result.map_err(WalRedoError::IoError)
|
|
|
|
|
} else {
|
|
|
|
|
Err(WalRedoError::InvalidRequest)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
///
|
|
|
|
|
/// Process one request for WAL redo.
|
|
|
|
|
/// Process one request for WAL redo using custom zenith code
|
|
|
|
|
///
|
|
|
|
|
async fn handle_apply_request(
|
|
|
|
|
&self,
|
|
|
|
|
process: &mut PostgresRedoProcess,
|
|
|
|
|
request: &WalRedoRequest,
|
|
|
|
|
) -> Result<Bytes, WalRedoError> {
|
|
|
|
|
fn handle_apply_request_zenith(&self, request: &WalRedoRequest) -> Result<Bytes, WalRedoError> {
|
|
|
|
|
let rel = request.rel;
|
|
|
|
|
let blknum = request.blknum;
|
|
|
|
|
let lsn = request.lsn;
|
|
|
|
|
@@ -265,178 +340,158 @@ impl PostgresRedoManager {
|
|
|
|
|
let start = Instant::now();
|
|
|
|
|
|
|
|
|
|
let apply_result: Result<Bytes, Error>;
|
|
|
|
|
if let RelishTag::Relation(rel) = rel {
|
|
|
|
|
// Relational WAL records are applied using wal-redo-postgres
|
|
|
|
|
let buf_tag = BufferTag { rel, blknum };
|
|
|
|
|
apply_result = process.apply_wal_records(buf_tag, base_img, records).await;
|
|
|
|
|
|
|
|
|
|
// Non-relational WAL records are handled here, with custom code that has the
|
|
|
|
|
// same effects as the corresponding Postgres WAL redo function.
|
|
|
|
|
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
|
|
|
|
|
let mut page = BytesMut::new();
|
|
|
|
|
if let Some(fpi) = base_img {
|
|
|
|
|
// If full-page image is provided, then use it...
|
|
|
|
|
page.extend_from_slice(&fpi[..]);
|
|
|
|
|
} else {
|
|
|
|
|
// Non-relational WAL records are handled here, with custom code that has the
|
|
|
|
|
// same effects as the corresponding Postgres WAL redo function.
|
|
|
|
|
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
|
|
|
|
|
let mut page = BytesMut::new();
|
|
|
|
|
if let Some(fpi) = base_img {
|
|
|
|
|
// If full-page image is provided, then use it...
|
|
|
|
|
page.extend_from_slice(&fpi[..]);
|
|
|
|
|
} else {
|
|
|
|
|
// otherwise initialize page with zeros
|
|
|
|
|
page.extend_from_slice(&ZERO_PAGE);
|
|
|
|
|
// otherwise initialize page with zeros
|
|
|
|
|
page.extend_from_slice(&ZERO_PAGE);
|
|
|
|
|
}
|
|
|
|
|
// Apply all collected WAL records
|
|
|
|
|
for (_lsn, record) in records {
|
|
|
|
|
let mut buf = record.rec.clone();
|
|
|
|
|
|
|
|
|
|
WAL_REDO_RECORD_COUNTER.inc();
|
|
|
|
|
|
|
|
|
|
// 1. Parse XLogRecord struct
|
|
|
|
|
// FIXME: refactor to avoid code duplication.
|
|
|
|
|
let xlogrec = XLogRecord::from_bytes(&mut buf);
|
|
|
|
|
|
|
|
|
|
//move to main data
|
|
|
|
|
// TODO probably, we should store some records in our special format
|
|
|
|
|
// to avoid this weird parsing on replay
|
|
|
|
|
let skip = (record.main_data_offset - pg_constants::SIZEOF_XLOGRECORD) as usize;
|
|
|
|
|
if buf.remaining() > skip {
|
|
|
|
|
buf.advance(skip);
|
|
|
|
|
}
|
|
|
|
|
// Apply all collected WAL records
|
|
|
|
|
for (_lsn, record) in records {
|
|
|
|
|
let mut buf = record.rec.clone();
|
|
|
|
|
|
|
|
|
|
WAL_REDO_RECORD_COUNTER.inc();
|
|
|
|
|
|
|
|
|
|
// 1. Parse XLogRecord struct
|
|
|
|
|
// FIXME: refactor to avoid code duplication.
|
|
|
|
|
let xlogrec = XLogRecord::from_bytes(&mut buf);
|
|
|
|
|
|
|
|
|
|
//move to main data
|
|
|
|
|
// TODO probably, we should store some records in our special format
|
|
|
|
|
// to avoid this weird parsing on replay
|
|
|
|
|
let skip = (record.main_data_offset - pg_constants::SIZEOF_XLOGRECORD) as usize;
|
|
|
|
|
if buf.remaining() > skip {
|
|
|
|
|
buf.advance(skip);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
|
|
|
|
|
// Transaction manager stuff
|
|
|
|
|
let rec_segno = match rel {
|
|
|
|
|
RelishTag::Slru { slru, segno } => {
|
|
|
|
|
assert!(
|
|
|
|
|
slru == SlruKind::Clog,
|
|
|
|
|
"Not valid XACT relish tag {:?}",
|
|
|
|
|
rel
|
|
|
|
|
if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
|
|
|
|
|
// Transaction manager stuff
|
|
|
|
|
let rec_segno = match rel {
|
|
|
|
|
RelishTag::Slru { slru, segno } => {
|
|
|
|
|
assert!(
|
|
|
|
|
slru == SlruKind::Clog,
|
|
|
|
|
"Not valid XACT relish tag {:?}",
|
|
|
|
|
rel
|
|
|
|
|
);
|
|
|
|
|
segno
|
|
|
|
|
}
|
|
|
|
|
_ => panic!("Not valid XACT relish tag {:?}", rel),
|
|
|
|
|
};
|
|
|
|
|
let parsed_xact =
|
|
|
|
|
XlXactParsedRecord::decode(&mut buf, xlogrec.xl_xid, xlogrec.xl_info);
|
|
|
|
|
if parsed_xact.info == pg_constants::XLOG_XACT_COMMIT
|
|
|
|
|
|| parsed_xact.info == pg_constants::XLOG_XACT_COMMIT_PREPARED
|
|
|
|
|
{
|
|
|
|
|
transaction_id_set_status(
|
|
|
|
|
parsed_xact.xid,
|
|
|
|
|
pg_constants::TRANSACTION_STATUS_COMMITTED,
|
|
|
|
|
&mut page,
|
|
|
|
|
);
|
|
|
|
|
for subxact in &parsed_xact.subxacts {
|
|
|
|
|
let pageno = *subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
|
|
|
|
|
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
|
|
|
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
|
|
|
// only update xids on the requested page
|
|
|
|
|
if rec_segno == segno && blknum == rpageno {
|
|
|
|
|
transaction_id_set_status(
|
|
|
|
|
*subxact,
|
|
|
|
|
pg_constants::TRANSACTION_STATUS_COMMITTED,
|
|
|
|
|
&mut page,
|
|
|
|
|
);
|
|
|
|
|
segno
|
|
|
|
|
}
|
|
|
|
|
_ => panic!("Not valid XACT relish tag {:?}", rel),
|
|
|
|
|
};
|
|
|
|
|
let parsed_xact =
|
|
|
|
|
XlXactParsedRecord::decode(&mut buf, xlogrec.xl_xid, xlogrec.xl_info);
|
|
|
|
|
if parsed_xact.info == pg_constants::XLOG_XACT_COMMIT
|
|
|
|
|
|| parsed_xact.info == pg_constants::XLOG_XACT_COMMIT_PREPARED
|
|
|
|
|
{
|
|
|
|
|
transaction_id_set_status(
|
|
|
|
|
parsed_xact.xid,
|
|
|
|
|
pg_constants::TRANSACTION_STATUS_COMMITTED,
|
|
|
|
|
&mut page,
|
|
|
|
|
);
|
|
|
|
|
for subxact in &parsed_xact.subxacts {
|
|
|
|
|
let pageno = *subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
|
|
|
|
|
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
|
|
|
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
|
|
|
// only update xids on the requested page
|
|
|
|
|
if rec_segno == segno && blknum == rpageno {
|
|
|
|
|
transaction_id_set_status(
|
|
|
|
|
*subxact,
|
|
|
|
|
pg_constants::TRANSACTION_STATUS_COMMITTED,
|
|
|
|
|
&mut page,
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else if parsed_xact.info == pg_constants::XLOG_XACT_ABORT
|
|
|
|
|
|| parsed_xact.info == pg_constants::XLOG_XACT_ABORT_PREPARED
|
|
|
|
|
{
|
|
|
|
|
transaction_id_set_status(
|
|
|
|
|
parsed_xact.xid,
|
|
|
|
|
pg_constants::TRANSACTION_STATUS_ABORTED,
|
|
|
|
|
&mut page,
|
|
|
|
|
);
|
|
|
|
|
for subxact in &parsed_xact.subxacts {
|
|
|
|
|
let pageno = *subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
|
|
|
|
|
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
|
|
|
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
|
|
|
// only update xids on the requested page
|
|
|
|
|
if rec_segno == segno && blknum == rpageno {
|
|
|
|
|
transaction_id_set_status(
|
|
|
|
|
*subxact,
|
|
|
|
|
pg_constants::TRANSACTION_STATUS_ABORTED,
|
|
|
|
|
&mut page,
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID {
|
|
|
|
|
// Multixact operations
|
|
|
|
|
let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
|
|
|
|
if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
|
|
|
|
|
let xlrec = XlMultiXactCreate::decode(&mut buf);
|
|
|
|
|
if let RelishTag::Slru {
|
|
|
|
|
slru,
|
|
|
|
|
segno: rec_segno,
|
|
|
|
|
} = rel
|
|
|
|
|
{
|
|
|
|
|
if slru == SlruKind::MultiXactMembers {
|
|
|
|
|
for i in 0..xlrec.nmembers {
|
|
|
|
|
let pageno =
|
|
|
|
|
i / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
|
|
|
|
|
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
|
|
|
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
|
|
|
if segno == rec_segno && rpageno == blknum {
|
|
|
|
|
// update only target block
|
|
|
|
|
let offset = xlrec.moff + i;
|
|
|
|
|
let memberoff = mx_offset_to_member_offset(offset);
|
|
|
|
|
let flagsoff = mx_offset_to_flags_offset(offset);
|
|
|
|
|
let bshift = mx_offset_to_flags_bitshift(offset);
|
|
|
|
|
let mut flagsval =
|
|
|
|
|
LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
|
|
|
|
|
flagsval &= !(((1
|
|
|
|
|
<< pg_constants::MXACT_MEMBER_BITS_PER_XACT)
|
|
|
|
|
- 1)
|
|
|
|
|
} else if parsed_xact.info == pg_constants::XLOG_XACT_ABORT
|
|
|
|
|
|| parsed_xact.info == pg_constants::XLOG_XACT_ABORT_PREPARED
|
|
|
|
|
{
|
|
|
|
|
transaction_id_set_status(
|
|
|
|
|
parsed_xact.xid,
|
|
|
|
|
pg_constants::TRANSACTION_STATUS_ABORTED,
|
|
|
|
|
&mut page,
|
|
|
|
|
);
|
|
|
|
|
for subxact in &parsed_xact.subxacts {
|
|
|
|
|
let pageno = *subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
|
|
|
|
|
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
|
|
|
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
|
|
|
// only update xids on the requested page
|
|
|
|
|
if rec_segno == segno && blknum == rpageno {
|
|
|
|
|
transaction_id_set_status(
|
|
|
|
|
*subxact,
|
|
|
|
|
pg_constants::TRANSACTION_STATUS_ABORTED,
|
|
|
|
|
&mut page,
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID {
|
|
|
|
|
// Multixact operations
|
|
|
|
|
let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
|
|
|
|
if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
|
|
|
|
|
let xlrec = XlMultiXactCreate::decode(&mut buf);
|
|
|
|
|
if let RelishTag::Slru {
|
|
|
|
|
slru,
|
|
|
|
|
segno: rec_segno,
|
|
|
|
|
} = rel
|
|
|
|
|
{
|
|
|
|
|
if slru == SlruKind::MultiXactMembers {
|
|
|
|
|
for i in 0..xlrec.nmembers {
|
|
|
|
|
let pageno = i / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
|
|
|
|
|
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
|
|
|
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
|
|
|
if segno == rec_segno && rpageno == blknum {
|
|
|
|
|
// update only target block
|
|
|
|
|
let offset = xlrec.moff + i;
|
|
|
|
|
let memberoff = mx_offset_to_member_offset(offset);
|
|
|
|
|
let flagsoff = mx_offset_to_flags_offset(offset);
|
|
|
|
|
let bshift = mx_offset_to_flags_bitshift(offset);
|
|
|
|
|
let mut flagsval =
|
|
|
|
|
LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
|
|
|
|
|
flagsval &=
|
|
|
|
|
!(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1)
|
|
|
|
|
<< bshift);
|
|
|
|
|
flagsval |= xlrec.members[i as usize].status << bshift;
|
|
|
|
|
LittleEndian::write_u32(
|
|
|
|
|
&mut page[flagsoff..flagsoff + 4],
|
|
|
|
|
flagsval,
|
|
|
|
|
);
|
|
|
|
|
LittleEndian::write_u32(
|
|
|
|
|
&mut page[memberoff..memberoff + 4],
|
|
|
|
|
xlrec.members[i as usize].xid,
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
flagsval |= xlrec.members[i as usize].status << bshift;
|
|
|
|
|
LittleEndian::write_u32(
|
|
|
|
|
&mut page[flagsoff..flagsoff + 4],
|
|
|
|
|
flagsval,
|
|
|
|
|
);
|
|
|
|
|
LittleEndian::write_u32(
|
|
|
|
|
&mut page[memberoff..memberoff + 4],
|
|
|
|
|
xlrec.members[i as usize].xid,
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// Multixact offsets SLRU
|
|
|
|
|
let offs = (xlrec.mid
|
|
|
|
|
% pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32
|
|
|
|
|
* 4) as usize;
|
|
|
|
|
LittleEndian::write_u32(&mut page[offs..offs + 4], xlrec.moff);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
panic!();
|
|
|
|
|
// Multixact offsets SLRU
|
|
|
|
|
let offs = (xlrec.mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32
|
|
|
|
|
* 4) as usize;
|
|
|
|
|
LittleEndian::write_u32(&mut page[offs..offs + 4], xlrec.moff);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
panic!();
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
panic!();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
apply_result = Ok::<Bytes, Error>(page.freeze());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
apply_result = Ok::<Bytes, Error>(page.freeze());
|
|
|
|
|
|
|
|
|
|
let duration = start.elapsed();
|
|
|
|
|
|
|
|
|
|
let result: Result<Bytes, WalRedoError>;
|
|
|
|
|
|
|
|
|
|
debug!(
|
|
|
|
|
"applied {} WAL records in {} ms to reconstruct page image at LSN {}",
|
|
|
|
|
"zenith applied {} WAL records in {} ms to reconstruct page image at LSN {}",
|
|
|
|
|
nrecords,
|
|
|
|
|
duration.as_millis(),
|
|
|
|
|
lsn
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
if let Err(e) = apply_result {
|
|
|
|
|
error!("could not apply WAL records: {:#}", e);
|
|
|
|
|
result = Err(WalRedoError::IoError(e));
|
|
|
|
|
} else {
|
|
|
|
|
let img = apply_result.unwrap();
|
|
|
|
|
|
|
|
|
|
result = Ok(img);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// The caller is responsible for sending the response
|
|
|
|
|
result
|
|
|
|
|
apply_result.map_err(WalRedoError::IoError)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -444,18 +499,17 @@ impl PostgresRedoManager {
|
|
|
|
|
/// Handle to the Postgres WAL redo process
|
|
|
|
|
///
|
|
|
|
|
struct PostgresRedoProcess {
|
|
|
|
|
child: Child,
|
|
|
|
|
stdin: ChildStdin,
|
|
|
|
|
stdout: ChildStdout,
|
|
|
|
|
stderr: ChildStderr,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl PostgresRedoProcess {
|
|
|
|
|
//
|
|
|
|
|
// Start postgres binary in special WAL redo mode.
|
|
|
|
|
//
|
|
|
|
|
async fn launch(
|
|
|
|
|
conf: &PageServerConf,
|
|
|
|
|
tenantid: &ZTenantId,
|
|
|
|
|
) -> Result<PostgresRedoProcess, Error> {
|
|
|
|
|
fn launch(conf: &PageServerConf, tenantid: &ZTenantId) -> Result<PostgresRedoProcess, Error> {
|
|
|
|
|
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
|
|
|
|
|
// just create one with constant name. That fails if you try to launch more than
|
|
|
|
|
// one WAL redo manager concurrently.
|
|
|
|
|
@@ -476,7 +530,6 @@ impl PostgresRedoProcess {
|
|
|
|
|
.env("LD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap())
|
|
|
|
|
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap())
|
|
|
|
|
.output()
|
|
|
|
|
.await
|
|
|
|
|
.expect("failed to execute initdb");
|
|
|
|
|
|
|
|
|
|
if !initdb.status.success() {
|
|
|
|
|
@@ -513,102 +566,114 @@ impl PostgresRedoProcess {
|
|
|
|
|
datadir.display()
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
let stdin = child.stdin.take().expect("failed to open child's stdin");
|
|
|
|
|
let stderr = child.stderr.take().expect("failed to open child's stderr");
|
|
|
|
|
let stdout = child.stdout.take().expect("failed to open child's stdout");
|
|
|
|
|
let stdin = child.stdin.take().unwrap();
|
|
|
|
|
let stdout = child.stdout.take().unwrap();
|
|
|
|
|
let stderr = child.stderr.take().unwrap();
|
|
|
|
|
|
|
|
|
|
// This async block reads the child's stderr, and forwards it to the logger
|
|
|
|
|
let f_stderr = async {
|
|
|
|
|
let mut stderr_buffered = tokio::io::BufReader::new(stderr);
|
|
|
|
|
set_nonblock(stdin.as_raw_fd())?;
|
|
|
|
|
set_nonblock(stdout.as_raw_fd())?;
|
|
|
|
|
set_nonblock(stderr.as_raw_fd())?;
|
|
|
|
|
|
|
|
|
|
let mut line = String::new();
|
|
|
|
|
loop {
|
|
|
|
|
let res = stderr_buffered.read_line(&mut line).await;
|
|
|
|
|
if res.is_err() {
|
|
|
|
|
debug!("could not convert line to utf-8");
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
if res.unwrap() == 0 {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
error!("wal-redo-postgres: {}", line.trim());
|
|
|
|
|
line.clear();
|
|
|
|
|
}
|
|
|
|
|
Ok::<(), Error>(())
|
|
|
|
|
};
|
|
|
|
|
tokio::spawn(f_stderr);
|
|
|
|
|
Ok(PostgresRedoProcess {
|
|
|
|
|
child,
|
|
|
|
|
stdin,
|
|
|
|
|
stdout,
|
|
|
|
|
stderr,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(PostgresRedoProcess { stdin, stdout })
|
|
|
|
|
fn kill(mut self) {
|
|
|
|
|
let _ = self.child.kill();
|
|
|
|
|
if let Ok(exit_status) = self.child.wait() {
|
|
|
|
|
error!("wal-redo-postgres exited with code {}", exit_status);
|
|
|
|
|
}
|
|
|
|
|
drop(self);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// Apply given WAL records ('records') over an old page image. Returns
|
|
|
|
|
// new page image.
|
|
|
|
|
//
|
|
|
|
|
async fn apply_wal_records(
|
|
|
|
|
&mut self,
|
|
|
|
|
tag: BufferTag,
|
|
|
|
|
base_img: Option<Bytes>,
|
|
|
|
|
records: &[(Lsn, WALRecord)],
|
|
|
|
|
) -> Result<Bytes, std::io::Error> {
|
|
|
|
|
let stdout = &mut self.stdout;
|
|
|
|
|
// Buffer the writes to avoid a lot of small syscalls.
|
|
|
|
|
let mut stdin = tokio::io::BufWriter::new(&mut self.stdin);
|
|
|
|
|
fn apply_wal_records(&mut self, writebuf: Vec<u8>) -> Result<Bytes, std::io::Error> {
|
|
|
|
|
let mut nwrite = self.stdin.write(&writebuf)?;
|
|
|
|
|
|
|
|
|
|
// We expect the WAL redo process to respond with an 8k page image. We read it
|
|
|
|
|
// into this buffer.
|
|
|
|
|
let mut resultbuf = vec![0; pg_constants::BLCKSZ.into()];
|
|
|
|
|
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
|
|
|
|
|
|
|
|
|
|
// Prepare for calling poll()
|
|
|
|
|
let mut pollfds = [
|
|
|
|
|
PollFd::new(self.stdout.as_raw_fd(), PollFlags::POLLIN),
|
|
|
|
|
PollFd::new(self.stderr.as_raw_fd(), PollFlags::POLLIN),
|
|
|
|
|
PollFd::new(self.stdin.as_raw_fd(), PollFlags::POLLOUT),
|
|
|
|
|
];
|
|
|
|
|
|
|
|
|
|
// We do three things simultaneously: send the old base image and WAL records to
|
|
|
|
|
// the child process's stdin, read the result from child's stdout, and forward any logging
|
|
|
|
|
// information that the child writes to its stderr to the page server's log.
|
|
|
|
|
//
|
|
|
|
|
// 'f_stdin' handles writing the base image and WAL records to the child process.
|
|
|
|
|
// 'f_stdout' below reads the result back. And 'f_stderr', which was spawned into the
|
|
|
|
|
// tokio runtime in the 'launch' function already, forwards the logging.
|
|
|
|
|
let f_stdin = async {
|
|
|
|
|
// Send base image, if any. (If the record initializes the page, previous page
|
|
|
|
|
// version is not needed.)
|
|
|
|
|
timeout(
|
|
|
|
|
TIMEOUT,
|
|
|
|
|
stdin.write_all(&build_begin_redo_for_block_msg(tag)),
|
|
|
|
|
)
|
|
|
|
|
.await??;
|
|
|
|
|
if let Some(img) = base_img {
|
|
|
|
|
timeout(TIMEOUT, stdin.write_all(&build_push_page_msg(tag, &img))).await??;
|
|
|
|
|
while nresult < pg_constants::BLCKSZ.into() {
|
|
|
|
|
// If we have more data to write, wake up if 'stdin' becomes writeable or
|
|
|
|
|
// we have data to read. Otherwise only wake up if there's data to read.
|
|
|
|
|
let nfds = if nwrite < writebuf.len() { 3 } else { 2 };
|
|
|
|
|
let n = nix::poll::poll(&mut pollfds[0..nfds], TIMEOUT.as_millis() as i32)?;
|
|
|
|
|
|
|
|
|
|
if n == 0 {
|
|
|
|
|
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Send WAL records.
|
|
|
|
|
for (lsn, rec) in records.iter() {
|
|
|
|
|
WAL_REDO_RECORD_COUNTER.inc();
|
|
|
|
|
// If we have some messages in stderr, forward them to the log.
|
|
|
|
|
let err_revents = pollfds[1].revents().unwrap();
|
|
|
|
|
if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
|
|
|
|
|
let mut errbuf: [u8; 16384] = [0; 16384];
|
|
|
|
|
let n = self.stderr.read(&mut errbuf)?;
|
|
|
|
|
|
|
|
|
|
stdin
|
|
|
|
|
.write_all(&build_apply_record_msg(*lsn, &rec.rec))
|
|
|
|
|
.await?;
|
|
|
|
|
// The message might not be split correctly into lines here. But this is
|
|
|
|
|
// good enough, the important thing is to get the message to the log.
|
|
|
|
|
if n > 0 {
|
|
|
|
|
error!(
|
|
|
|
|
"wal-redo-postgres: {}",
|
|
|
|
|
String::from_utf8_lossy(&errbuf[0..n])
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
//debug!("sent WAL record to wal redo postgres process ({:X}/{:X}",
|
|
|
|
|
// r.lsn >> 32, r.lsn & 0xffff_ffff);
|
|
|
|
|
// To make sure we capture all log from the process if it fails, keep
|
|
|
|
|
// reading from the stderr, before checking the stdout.
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
} else if err_revents.contains(PollFlags::POLLHUP) {
|
|
|
|
|
return Err(Error::new(
|
|
|
|
|
ErrorKind::BrokenPipe,
|
|
|
|
|
"WAL redo process closed its stderr unexpectedly",
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
|
//debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}",
|
|
|
|
|
// records.len(), lsn >> 32, lsn & 0xffff_ffff);
|
|
|
|
|
|
|
|
|
|
// Send GetPage command to get the result back
|
|
|
|
|
timeout(TIMEOUT, stdin.write_all(&build_get_page_msg(tag))).await??;
|
|
|
|
|
timeout(TIMEOUT, stdin.flush()).await??;
|
|
|
|
|
//debug!("sent GetPage for {}", tag.blknum);
|
|
|
|
|
Ok::<(), Error>(())
|
|
|
|
|
};
|
|
|
|
|
// If we have more data to write and 'stdin' is writeable, do write.
|
|
|
|
|
if nwrite < writebuf.len() {
|
|
|
|
|
let in_revents = pollfds[2].revents().unwrap();
|
|
|
|
|
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
|
|
|
|
|
nwrite += self.stdin.write(&writebuf[nwrite..])?;
|
|
|
|
|
} else if in_revents.contains(PollFlags::POLLHUP) {
|
|
|
|
|
// We still have more data to write, but the process closed the pipe.
|
|
|
|
|
return Err(Error::new(
|
|
|
|
|
ErrorKind::BrokenPipe,
|
|
|
|
|
"WAL redo process closed its stdin unexpectedly",
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Read back new page image
|
|
|
|
|
let f_stdout = async {
|
|
|
|
|
let mut buf = [0u8; 8192];
|
|
|
|
|
// If we have some data in stdout, read it to the result buffer.
|
|
|
|
|
let out_revents = pollfds[0].revents().unwrap();
|
|
|
|
|
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
|
|
|
|
|
nresult += self.stdout.read(&mut resultbuf[nresult..])?;
|
|
|
|
|
} else if out_revents.contains(PollFlags::POLLHUP) {
|
|
|
|
|
return Err(Error::new(
|
|
|
|
|
ErrorKind::BrokenPipe,
|
|
|
|
|
"WAL redo process closed its stdout unexpectedly",
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??;
|
|
|
|
|
//debug!("got response for {}", tag.blknum);
|
|
|
|
|
Ok::<[u8; 8192], Error>(buf)
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let res = tokio::try_join!(f_stdout, f_stdin)?;
|
|
|
|
|
|
|
|
|
|
let buf = res.0;
|
|
|
|
|
|
|
|
|
|
Ok::<Bytes, Error>(Bytes::from(std::vec::Vec::from(buf)))
|
|
|
|
|
Ok(Bytes::from(resultbuf))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -616,62 +681,42 @@ impl PostgresRedoProcess {
|
|
|
|
|
// process. See vendor/postgres/src/backend/tcop/zenith_wal_redo.c for
|
|
|
|
|
// explanation of the protocol.
|
|
|
|
|
|
|
|
|
|
fn build_begin_redo_for_block_msg(tag: BufferTag) -> Vec<u8> {
|
|
|
|
|
fn build_begin_redo_for_block_msg(tag: BufferTag, buf: &mut Vec<u8>) {
|
|
|
|
|
let len = 4 + 1 + 4 * 4;
|
|
|
|
|
let mut buf = Vec::with_capacity(1 + len);
|
|
|
|
|
|
|
|
|
|
buf.put_u8(b'B');
|
|
|
|
|
buf.put_u32(len as u32);
|
|
|
|
|
|
|
|
|
|
tag.ser_into(&mut buf)
|
|
|
|
|
tag.ser_into(buf)
|
|
|
|
|
.expect("serialize BufferTag should always succeed");
|
|
|
|
|
|
|
|
|
|
debug_assert!(buf.len() == 1 + len);
|
|
|
|
|
|
|
|
|
|
buf
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn build_push_page_msg(tag: BufferTag, base_img: &[u8]) -> Vec<u8> {
|
|
|
|
|
fn build_push_page_msg(tag: BufferTag, base_img: &[u8], buf: &mut Vec<u8>) {
|
|
|
|
|
assert!(base_img.len() == 8192);
|
|
|
|
|
|
|
|
|
|
let len = 4 + 1 + 4 * 4 + base_img.len();
|
|
|
|
|
let mut buf = Vec::with_capacity(1 + len);
|
|
|
|
|
|
|
|
|
|
buf.put_u8(b'P');
|
|
|
|
|
buf.put_u32(len as u32);
|
|
|
|
|
tag.ser_into(&mut buf)
|
|
|
|
|
tag.ser_into(buf)
|
|
|
|
|
.expect("serialize BufferTag should always succeed");
|
|
|
|
|
buf.put(base_img);
|
|
|
|
|
|
|
|
|
|
debug_assert!(buf.len() == 1 + len);
|
|
|
|
|
|
|
|
|
|
buf
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn build_apply_record_msg(endlsn: Lsn, rec: &[u8]) -> Vec<u8> {
|
|
|
|
|
fn build_apply_record_msg(endlsn: Lsn, rec: &[u8], buf: &mut Vec<u8>) {
|
|
|
|
|
let len = 4 + 8 + rec.len();
|
|
|
|
|
let mut buf: Vec<u8> = Vec::with_capacity(1 + len);
|
|
|
|
|
|
|
|
|
|
buf.put_u8(b'A');
|
|
|
|
|
buf.put_u32(len as u32);
|
|
|
|
|
buf.put_u64(endlsn.0);
|
|
|
|
|
buf.put(rec);
|
|
|
|
|
|
|
|
|
|
debug_assert!(buf.len() == 1 + len);
|
|
|
|
|
|
|
|
|
|
buf
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn build_get_page_msg(tag: BufferTag) -> Vec<u8> {
|
|
|
|
|
fn build_get_page_msg(tag: BufferTag, buf: &mut Vec<u8>) {
|
|
|
|
|
let len = 4 + 1 + 4 * 4;
|
|
|
|
|
let mut buf = Vec::with_capacity(1 + len);
|
|
|
|
|
|
|
|
|
|
buf.put_u8(b'G');
|
|
|
|
|
buf.put_u32(len as u32);
|
|
|
|
|
tag.ser_into(&mut buf)
|
|
|
|
|
tag.ser_into(buf)
|
|
|
|
|
.expect("serialize BufferTag should always succeed");
|
|
|
|
|
|
|
|
|
|
debug_assert!(buf.len() == 1 + len);
|
|
|
|
|
|
|
|
|
|
buf
|
|
|
|
|
}
|
|
|
|
|
|