Do not transfer WAL to computation nodes: use pg_resetwal for node startup

This commit is contained in:
Konstantin Knizhnik
2021-05-19 14:23:25 +03:00
parent b5f60f3874
commit 06f96f9600
14 changed files with 298 additions and 134 deletions

View File

@@ -345,6 +345,10 @@ impl PostgresNode {
),
);
fs::create_dir_all(self.pgdata().join("pg_wal"))?;
fs::create_dir_all(self.pgdata().join("pg_wal").join("archive_status"))?;
self.pg_resetwal(&["-f"])?;
Ok(())
}
@@ -401,6 +405,19 @@ impl PostgresNode {
Ok(())
}
fn pg_resetwal(&self, args: &[&str]) -> Result<()> {
let pg_resetwal_path = self.env.pg_bin_dir().join("pg_resetwal");
let pg_ctl = Command::new(pg_resetwal_path)
.args([&["-D", self.pgdata().to_str().unwrap()], args].concat())
.status()
.with_context(|| "pg_resetwal failed")?;
if !pg_ctl.success() {
anyhow::bail!("pg_resetwal failed");
}
Ok(())
}
pub fn start(&self) -> Result<()> {
println!("Starting postgres node at '{}'", self.connstr());
self.pg_ctl(&["start"])

View File

@@ -11,6 +11,20 @@ use integration_tests::TestStorageControlPlane;
const DOWNTIME: u64 = 2;
fn start_node_with_wal_proposer(
timeline: &str,
compute_cplane: &mut ComputeControlPlane,
wal_acceptors: &String,
) -> Arc<PostgresNode> {
let node = compute_cplane.new_test_master_node(timeline);
node.append_conf(
"postgresql.conf",
&format!("wal_acceptors='{}'\n", wal_acceptors),
);
node.start().unwrap();
node
}
#[test]
//#[ignore]
fn test_embedded_wal_proposer() {
@@ -22,12 +36,7 @@ fn test_embedded_wal_proposer() {
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
// start postgres
let node = compute_cplane.new_test_master_node("main");
node.append_conf(
"postgresql.conf",
&format!("wal_acceptors='{}'\n", wal_acceptors),
);
node.start().unwrap();
let node = start_node_with_wal_proposer("main", &mut compute_cplane, &wal_acceptors);
// check basic work with table
node.safe_psql(
@@ -58,11 +67,7 @@ fn test_acceptors_normal_work() {
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
// start postgres
let node = compute_cplane.new_test_master_node("main");
node.start().unwrap();
// start proxy
let _proxy = node.start_proxy(&wal_acceptors);
let node = start_node_with_wal_proposer("main", &mut compute_cplane, &wal_acceptors);
// check basic work with table
node.safe_psql(
@@ -111,10 +116,8 @@ fn test_many_timelines() {
// start postgres on each timeline
let mut nodes = Vec::new();
for tli_name in timelines {
let node = compute_cplane.new_test_node(&tli_name);
let node = start_node_with_wal_proposer(&tli_name, &mut compute_cplane, &wal_acceptors);
nodes.push(node.clone());
node.start().unwrap();
node.start_proxy(&wal_acceptors);
}
// create schema
@@ -160,11 +163,8 @@ fn test_acceptors_restarts() {
let mut rng = rand::thread_rng();
// start postgres
let node = compute_cplane.new_test_master_node("main");
node.start().unwrap();
let node = start_node_with_wal_proposer("main", &mut compute_cplane, &wal_acceptors);
// start proxy
let _proxy = node.start_proxy(&wal_acceptors);
let mut failed_node: Option<usize> = None;
// check basic work with table
@@ -220,11 +220,7 @@ fn test_acceptors_unavailability() {
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
// start postgres
let node = compute_cplane.new_test_master_node("main");
node.start().unwrap();
// start proxy
let _proxy = node.start_proxy(&wal_acceptors);
let node = start_node_with_wal_proposer("main", &mut compute_cplane, &wal_acceptors);
// check basic work with table
node.safe_psql(
@@ -306,11 +302,7 @@ fn test_race_conditions() {
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
// start postgres
let node = compute_cplane.new_test_master_node("main");
node.start().unwrap();
// start proxy
let _proxy = node.start_proxy(&wal_acceptors);
let node = start_node_with_wal_proposer("main", &mut compute_cplane, &wal_acceptors);
// check basic work with table
node.safe_psql(

View File

@@ -7,8 +7,8 @@ use tar::{Builder, Header};
use walkdir::WalkDir;
use crate::repository::{BufferTag, RelTag, Timeline};
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::*;
use postgres_ffi::*;
use zenith_utils::lsn::Lsn;
fn new_tar_header(path: &str, size: u64) -> anyhow::Result<Header> {
@@ -130,6 +130,38 @@ fn add_twophase_files(
Ok(())
}
//
// Add generated pg_control file
//
fn add_pgcontrol_file(
ar: &mut Builder<&mut dyn Write>,
timeline: &Arc<dyn Timeline>,
lsn: Lsn,
) -> anyhow::Result<()> {
if let Some(checkpoint_bytes) =
timeline.get_page_image(BufferTag::fork(pg_constants::PG_CHECKPOINT_FORKNUM), Lsn(0))?
{
if let Some(pg_control_bytes) = timeline.get_page_image(
BufferTag::fork(pg_constants::PG_CONTROLFILE_FORKNUM),
Lsn(0),
)? {
let mut pg_control = postgres_ffi::decode_pg_control(pg_control_bytes)?;
let mut checkpoint = postgres_ffi::decode_checkpoint(checkpoint_bytes)?;
checkpoint.redo = lsn.0;
// TODO: When we restart master there are no active transaction and oldestXid is
// equal to nextXid if there are no prepared transactions.
// Let's ignore them for a while...
checkpoint.oldestXid = checkpoint.nextXid.value as u32;
pg_control.checkPointCopy = checkpoint;
let pg_control_bytes = postgres_ffi::encode_pg_control(pg_control);
let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;
ar.append(&header, &pg_control_bytes[..])?;
}
}
Ok(())
}
///
/// Generate tarball with non-relational files from repository
///
@@ -143,7 +175,6 @@ pub fn send_tarball_at_lsn(
let mut ar = Builder::new(write);
let snappath = format!("timelines/{}/snapshots/{:016X}", timelineid, snapshot_lsn.0);
let walpath = format!("timelines/{}/wal", timelineid);
debug!("sending tarball of snapshot in {}", snappath);
for entry in WalkDir::new(&snappath) {
@@ -171,6 +202,7 @@ pub fn send_tarball_at_lsn(
ar.append_path_with_name(fullpath, relpath)?;
} else if !is_rel_file_path(relpath.to_str().unwrap()) {
if entry.file_name() != "pg_filenode.map"
&& entry.file_name() != "pg_control"
&& !relpath.starts_with("pg_xact/")
&& !relpath.starts_with("pg_multixact/")
{
@@ -208,28 +240,8 @@ pub fn send_tarball_at_lsn(
)?;
add_relmap_files(&mut ar, timeline, lsn)?;
add_twophase_files(&mut ar, timeline, lsn)?;
add_pgcontrol_file(&mut ar, timeline, lsn)?;
// FIXME: Also send all the WAL. The compute node would only need
// the WAL that applies to non-relation files, because the page
// server handles all the relation files. But we don't have a
// mechanism for separating relation and non-relation WAL at the
// moment.
for entry in std::fs::read_dir(&walpath)? {
let entry = entry?;
let fullpath = &entry.path();
let relpath = fullpath.strip_prefix(&walpath).unwrap();
if !entry.path().is_file() {
continue;
}
let archive_fname = relpath.to_str().unwrap();
let archive_fname = archive_fname
.strip_suffix(".partial")
.unwrap_or(&archive_fname);
let archive_path = "pg_wal/".to_owned() + archive_fname;
ar.append_path_with_name(fullpath, archive_path)?;
}
ar.finish()?;
debug!("all tarred up!");
Ok(())

View File

@@ -48,6 +48,9 @@ pub trait Timeline {
/// Does relation exist?
fn get_relsize_exists(&self, tag: RelTag, lsn: Lsn) -> Result<bool>;
/// Get page image at the particular LSN
fn get_page_image(&self, tag: BufferTag, lsn: Lsn) -> Result<Option<Bytes>>;
//------------------------------------------------------------------------------
// Public PUT functions, to update the repository with new page versions.
//
@@ -64,7 +67,7 @@ pub trait Timeline {
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes);
/// Truncate relation
fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> anyhow::Result<()>;
fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()>;
/// Create a new database from a template database
///
@@ -89,7 +92,7 @@ pub trait Timeline {
decoded: DecodedWALRecord,
recdata: Bytes,
lsn: Lsn,
) -> anyhow::Result<()> {
) -> Result<()> {
// Figure out which blocks the record applies to, and "put" a separate copy
// of the record for each block.
for blk in decoded.blocks.iter() {
@@ -233,6 +236,18 @@ pub struct BufferTag {
}
impl BufferTag {
pub fn fork(forknum: u8) -> BufferTag {
BufferTag {
rel: RelTag {
forknum,
spcnode: 0,
dbnode: 0,
relnode: 0,
},
blknum: 0,
}
}
pub fn pack(&self, buf: &mut BytesMut) {
self.rel.pack(buf);
buf.put_u32(self.blknum);

View File

@@ -16,7 +16,8 @@ use crate::ZTimelineId;
use anyhow::{bail, Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use log::*;
use postgres_ffi::pg_constants;
use postgres_ffi::nonrelfile_utils::transaction_id_get_status;
use postgres_ffi::*;
use std::cmp::min;
use std::collections::HashMap;
use std::convert::TryInto;
@@ -340,7 +341,7 @@ impl RocksTimeline {
//
// The caller must ensure that WAL has been received up to 'lsn'.
//
fn relsize_get_nowait(&self, rel: RelTag, lsn: Lsn) -> anyhow::Result<u32> {
fn relsize_get_nowait(&self, rel: RelTag, lsn: Lsn) -> Result<u32> {
assert!(lsn <= self.last_valid_lsn.load());
let mut key = CacheKey {
@@ -375,7 +376,8 @@ impl RocksTimeline {
Ok(0)
}
fn do_gc(&self, conf: &'static PageServerConf) -> anyhow::Result<Bytes> {
fn do_gc(&self, conf: &'static PageServerConf) -> Result<Bytes> {
loop {
thread::sleep(conf.gc_period);
let last_lsn = self.get_last_valid_lsn();
@@ -529,7 +531,7 @@ impl RocksTimeline {
//
// Wait until WAL has been received up to the given LSN.
//
fn wait_lsn(&self, mut lsn: Lsn) -> anyhow::Result<Lsn> {
fn wait_lsn(&self, mut lsn: Lsn) -> Result<Lsn> {
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
// This is necessary for bootstrap.
if lsn == Lsn(0) {
@@ -541,7 +543,7 @@ impl RocksTimeline {
);
lsn = last_valid_lsn;
}
//trace!("Start waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
self.last_valid_lsn
.wait_for_timeout(lsn, TIMEOUT)
.with_context(|| {
@@ -550,6 +552,7 @@ impl RocksTimeline {
lsn
)
})?;
//trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
Ok(lsn)
}
@@ -646,7 +649,21 @@ impl Timeline for RocksTimeline {
break; // we are done with this fork
}
if key.lsn <= lsn {
gxacts.push(key.tag.blknum); // XID
let xid = key.tag.blknum;
let tag = BufferTag {
rel: RelTag {
forknum: pg_constants::PG_XACT_FORKNUM,
spcnode: 0,
dbnode: 0,
relnode: 0,
},
blknum: xid / pg_constants::CLOG_XACTS_PER_PAGE,
};
let clog_page = self.get_page_at_lsn(tag, lsn)?;
let status = transaction_id_get_status(xid, &clog_page[..]);
if status == pg_constants::TRANSACTION_STATUS_IN_PROGRESS {
gxacts.push(xid);
}
}
iter.next();
}
@@ -772,7 +789,7 @@ impl Timeline for RocksTimeline {
/// Adds a relation-wide WAL record (like truncate) to the repository,
/// associating it with all pages started with specified block number
///
fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> anyhow::Result<()> {
fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()> {
// What was the size of the relation before this record?
let last_lsn = self.last_valid_lsn.load();
let old_rel_size = self.relsize_get_nowait(rel, last_lsn)?;
@@ -795,6 +812,24 @@ impl Timeline for RocksTimeline {
Ok(())
}
///
/// Get page image at particular LSN
///
fn get_page_image(&self, tag: BufferTag, lsn: Lsn) -> Result<Option<Bytes>> {
let key = CacheKey { tag, lsn };
let mut key_buf = BytesMut::new();
key.pack(&mut key_buf);
if let Some(bytes) = self.db.get(&key_buf[..])? {
let mut buf = BytesMut::new();
buf.extend_from_slice(&bytes);
let content = CacheEntryContent::unpack(&mut buf);
if let CacheEntryContent::PageImage(img) = content {
return Ok(Some(img));
}
}
return Ok(None);
}
///
/// Memorize a full image of a page version
///
@@ -835,7 +870,7 @@ impl Timeline for RocksTimeline {
tablespace_id: Oid,
src_db_id: Oid,
src_tablespace_id: Oid,
) -> anyhow::Result<()> {
) -> Result<()> {
let key = CacheKey {
tag: BufferTag {
rel: RelTag {
@@ -874,6 +909,7 @@ impl Timeline for RocksTimeline {
/// Remember that WAL has been received and added to the timeline up to the given LSN
fn advance_last_valid_lsn(&self, lsn: Lsn) {
let lsn = Lsn((lsn.0 + 7) & !7); // align position on 8 bytes
let old = self.last_valid_lsn.advance(lsn);
// Can't move backwards.
@@ -891,7 +927,8 @@ impl Timeline for RocksTimeline {
/// NOTE: this updates last_valid_lsn as well.
///
fn advance_last_record_lsn(&self, lsn: Lsn) {
// Can't move backwards.
let lsn = Lsn((lsn.0 + 7) & !7); // align position on 8 bytes
// Can't move backwards.
let old = self.last_record_lsn.fetch_max(lsn);
assert!(old <= lsn);

View File

@@ -26,9 +26,9 @@ use crate::repository::{BufferTag, RelTag, Timeline};
use crate::waldecoder::{decode_wal_record, Oid, WalStreamDecoder};
use crate::PageServerConf;
use crate::ZTimelineId;
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::*;
use postgres_ffi::xlog_utils::*;
use postgres_ffi::*;
use zenith_utils::lsn::Lsn;
///
@@ -124,8 +124,8 @@ fn restore_snapshot(
conf,
timeline,
timelineid,
snapshot,
pg_constants::GLOBALTABLESPACE_OID,
"0",
0,
0,
pg_constants::PG_CONTROLFILE_FORKNUM,
0,
@@ -403,13 +403,23 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn
let mut waldecoder = WalStreamDecoder::new(startpoint);
const SEG_SIZE: u64 = 16 * 1024 * 1024;
let mut segno = startpoint.segment_number(SEG_SIZE);
let mut offset = startpoint.segment_offset(SEG_SIZE);
let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE);
let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
let mut last_lsn = Lsn(0);
let mut checkpoint = CheckPoint::new(startpoint.0, 1);
let checkpoint_tag = BufferTag::fork(pg_constants::PG_CHECKPOINT_FORKNUM);
let pg_control_tag = BufferTag::fork(pg_constants::PG_CONTROLFILE_FORKNUM);
if let Some(pg_control_bytes) = timeline.get_page_image(pg_control_tag, Lsn(0))? {
let pg_control = decode_pg_control(pg_control_bytes)?;
checkpoint = pg_control.checkPointCopy.clone();
} else {
error!("No control file is found in reposistory");
}
loop {
// FIXME: assume postgresql tli 1 for now
let filename = XLogFileName(1, segno, 16 * 1024 * 1024);
let filename = XLogFileName(1, segno, pg_constants::WAL_SEGMENT_SIZE);
let mut path = walpath.clone() + "/" + &filename;
// It could be as .partial
@@ -432,7 +442,7 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn
let mut buf = Vec::new();
let nread = file.read_to_end(&mut buf)?;
if nread != 16 * 1024 * 1024 - offset as usize {
if nread != pg_constants::WAL_SEGMENT_SIZE - offset as usize {
// Maybe allow this for .partial files?
error!("read only {} bytes from WAL file", nread);
}
@@ -447,7 +457,7 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn
break;
}
if let Some((lsn, recdata)) = rec.unwrap() {
let decoded = decode_wal_record(recdata.clone());
let decoded = decode_wal_record(&mut checkpoint, recdata.clone());
timeline.save_decoded_record(decoded, recdata, lsn)?;
last_lsn = lsn;
} else {
@@ -462,6 +472,7 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn
offset = 0;
}
info!("reached end of WAL at {}", last_lsn);
let checkpoint_bytes = encode_checkpoint(checkpoint);
timeline.put_page_image(checkpoint_tag, Lsn(0), checkpoint_bytes);
Ok(())
}

View File

@@ -1,25 +1,32 @@
use bytes::{Buf, BufMut, Bytes, BytesMut};
use log::*;
use postgres_ffi::pg_constants;
use postgres_ffi::xlog_utils::XLogRecord;
use postgres_ffi::*;
use std::cmp::min;
use std::str;
use thiserror::Error;
use zenith_utils::lsn::Lsn;
// FIXME: this is configurable in PostgreSQL, 16 MB is the default
const WAL_SEGMENT_SIZE: u64 = 16 * 1024 * 1024;
pub type Oid = u32;
pub type TransactionId = u32;
pub type BlockNumber = u32;
pub type OffsetNumber = u16;
pub type MultiXactId = TransactionId;
pub type MultiXactOffset = u32;
pub type MultiXactStatus = u32;
pub type TimeLineID = u32;
pub type PgTime = i64;
// From PostgreSQL headers
#[repr(C)]
#[derive(Debug)]
pub struct XLogPageHeaderData {
xlp_magic: u16, /* magic value for correctness checks */
xlp_info: u16, /* flag bits, see below */
xlp_tli: u32, /* TimeLineID of first record on page */
xlp_pageaddr: u64, /* XLOG address of this page */
xlp_rem_len: u32, /* total len of remaining data for record */
xlp_magic: u16, /* magic value for correctness checks */
xlp_info: u16, /* flag bits, see below */
xlp_tli: TimeLineID, /* TimeLineID of first record on page */
xlp_pageaddr: u64, /* XLOG address of this page */
xlp_rem_len: u32, /* total len of remaining data for record */
}
// FIXME: this assumes MAXIMUM_ALIGNOF 8. There are 4 padding bytes at end
@@ -92,7 +99,7 @@ impl WalStreamDecoder {
pub fn poll_decode(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
loop {
// parse and verify page boundaries as we go
if self.lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 {
if self.lsn.segment_offset(pg_constants::WAL_SEGMENT_SIZE) == 0 {
// parse long header
if self.inputbuf.remaining() < SizeOfXLogLongPHD {
@@ -185,7 +192,8 @@ impl WalStreamDecoder {
let xlogrec = XLogRecord::from_bytes(&mut buf);
if xlogrec.is_xlog_switch_record() {
trace!("saw xlog switch record at {}", self.lsn);
self.padlen = self.lsn.calc_padding(WAL_SEGMENT_SIZE) as u32;
self.padlen =
self.lsn.calc_padding(pg_constants::WAL_SEGMENT_SIZE as u64) as u32;
} else {
// Pad to an 8-byte boundary
self.padlen = self.lsn.calc_padding(8u32) as u32;
@@ -306,14 +314,6 @@ pub struct DecodedWALRecord {
pub main_data_offset: usize,
}
pub type Oid = u32;
pub type TransactionId = u32;
pub type BlockNumber = u32;
pub type OffsetNumber = u16;
pub type MultiXactId = TransactionId;
pub type MultiXactOffset = u32;
pub type MultiXactStatus = u32;
#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub struct RelFileNode {
@@ -549,7 +549,7 @@ impl XlMultiXactTruncate {
// block data
// ...
// main data
pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedWALRecord {
let mut rnode_spcnode: u32 = 0;
let mut rnode_dbnode: u32 = 0;
let mut rnode_relnode: u32 = 0;
@@ -567,7 +567,12 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
xlogrec.xl_rmid,
xlogrec.xl_info
);
if xlogrec.xl_xid > checkpoint.nextXid.value as u32 {
// TODO: handle XID wraparound
checkpoint.nextXid = FullTransactionId {
value: (checkpoint.nextXid.value & 0xFFFFFFFF00000000) | xlogrec.xl_xid as u64,
};
}
let remaining = xlogrec.xl_tot_len - SizeOfXLogRecord;
if buf.remaining() != remaining as usize {
@@ -1055,8 +1060,29 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
blk.blkno = blkno;
blocks.push(blk);
}
if xlrec.mid > checkpoint.nextMulti {
checkpoint.nextMulti = xlrec.mid;
}
if xlrec.moff > checkpoint.nextMultiOffset {
checkpoint.nextMultiOffset = xlrec.moff;
}
let max_xid = xlrec
.members
.iter()
.fold(checkpoint.nextXid.value as u32, |acc, mbr| {
if mbr.xid > acc {
mbr.xid
} else {
acc
}
});
checkpoint.nextXid = FullTransactionId {
value: (checkpoint.nextXid.value & 0xFFFFFFFF00000000) | max_xid as u64,
};
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
let xlrec = XlMultiXactTruncate::decode(&mut buf);
checkpoint.oldestXid = xlrec.end_trunc_off;
checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
let first_off_blkno =
xlrec.start_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
let last_off_blkno =

View File

@@ -7,6 +7,7 @@
//!
use crate::page_cache;
use crate::repository::*;
use crate::waldecoder::*;
use crate::PageServerConf;
use crate::ZTimelineId;
@@ -17,6 +18,7 @@ use postgres::fallible_iterator::FallibleIterator;
use postgres::replication::ReplicationIter;
use postgres::{Client, NoTls, SimpleQueryMessage, SimpleQueryRow};
use postgres_ffi::xlog_utils::*;
use postgres_ffi::*;
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use std::collections::HashMap;
@@ -145,6 +147,11 @@ fn walreceiver_main(
error!("No previous WAL position");
}
startpoint = Lsn::max(
startpoint,
Lsn(end_of_wal.0 & !(pg_constants::WAL_SEGMENT_SIZE as u64 - 1)),
);
// There might be some padding after the last full record, skip it.
//
// FIXME: It probably would be better to always start streaming from the beginning
@@ -164,6 +171,14 @@ fn walreceiver_main(
let mut waldecoder = WalStreamDecoder::new(startpoint);
let mut checkpoint = CheckPoint::new(startpoint.0, identify.timeline);
let checkpoint_tag = BufferTag::fork(pg_constants::PG_CHECKPOINT_FORKNUM);
if let Some(checkpoint_bytes) = timeline.get_page_image(checkpoint_tag, Lsn(0))? {
checkpoint = decode_checkpoint(checkpoint_bytes)?;
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
} else {
error!("No checkpoint record was found in reposistory");
}
while let Some(replication_message) = physical_stream.next()? {
match replication_message {
ReplicationMessage::XLogData(xlog_data) => {
@@ -173,21 +188,21 @@ fn walreceiver_main(
let startlsn = Lsn::from(xlog_data.wal_start());
let endlsn = startlsn + data.len() as u64;
write_wal_file(
startlsn,
timelineid,
16 * 1024 * 1024, // FIXME
data,
)?;
write_wal_file(startlsn, timelineid, pg_constants::WAL_SEGMENT_SIZE, data)?;
trace!("received XLogData between {} and {}", startlsn, endlsn);
waldecoder.feed_bytes(data);
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let decoded = decode_wal_record(recdata.clone());
let old_checkpoint_bytes = encode_checkpoint(checkpoint);
let decoded = decode_wal_record(&mut checkpoint, recdata.clone());
timeline.save_decoded_record(decoded, recdata, lsn)?;
let new_checkpoint_bytes = encode_checkpoint(checkpoint);
if new_checkpoint_bytes != old_checkpoint_bytes {
timeline.put_page_image(checkpoint_tag, Lsn(0), new_checkpoint_bytes);
}
// Now that this record has been handled, let the page cache know that
// it is up-to-date to this LSN
timeline.advance_last_record_lsn(lsn);
@@ -299,7 +314,7 @@ fn write_wal_file(
let wal_dir = PathBuf::from(format!("timelines/{}/wal", timeline));
/* Extract WAL location for this block */
let mut xlogoff = start_pos.segment_offset(wal_seg_size as u64) as usize;
let mut xlogoff = start_pos.segment_offset(wal_seg_size);
while bytes_left != 0 {
let bytes_to_write;
@@ -315,7 +330,7 @@ fn write_wal_file(
}
/* Open file */
let segno = start_pos.segment_number(wal_seg_size as u64);
let segno = start_pos.segment_number(wal_seg_size);
let wal_file_name = XLogFileName(
1, // FIXME: always use Postgres timeline 1
segno,
@@ -367,7 +382,7 @@ fn write_wal_file(
xlogoff += bytes_to_write;
/* Did we reach the end of a WAL segment? */
if start_pos.segment_offset(wal_seg_size as u64) == 0 {
if start_pos.segment_offset(wal_seg_size) == 0 {
xlogoff = 0;
if partial {
fs::rename(&wal_file_partial_path, &wal_file_path)?;

View File

@@ -39,6 +39,7 @@ use crate::repository::BufferTag;
use crate::repository::WALRecord;
use crate::waldecoder::{MultiXactId, XlMultiXactCreate};
use crate::PageServerConf;
use postgres_ffi::nonrelfile_utils::transaction_id_set_status;
use postgres_ffi::pg_constants;
use postgres_ffi::xlog_utils::XLogRecord;
@@ -241,22 +242,6 @@ impl PostgresRedoManagerInternal {
}
}
fn transaction_id_set_status_bit(&self, xid: u32, status: u8, page: &mut BytesMut) {
trace!(
"handle_apply_request for RM_XACT_ID-{} (1-commit, 2-abort, 3-sub_commit)",
status
);
let byteno: usize = ((xid as u32 % pg_constants::CLOG_XACTS_PER_PAGE as u32)
/ pg_constants::CLOG_XACTS_PER_BYTE) as usize;
let bshift: u8 = ((xid % pg_constants::CLOG_XACTS_PER_BYTE)
* pg_constants::CLOG_BITS_PER_XACT as u32) as u8;
page[byteno] =
(page[byteno] & !(pg_constants::CLOG_XACT_BITMASK << bshift)) | (status << bshift);
}
///
/// Process one request for WAL redo.
///
@@ -308,7 +293,7 @@ impl PostgresRedoManagerInternal {
let mut status = 0;
if info == pg_constants::XLOG_XACT_COMMIT {
status = pg_constants::TRANSACTION_STATUS_COMMITTED;
self.transaction_id_set_status_bit(xlogrec.xl_xid, status, &mut page);
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
//handle subtrans
let _xact_time = buf.get_i64_le();
let mut xinfo = 0;
@@ -328,13 +313,13 @@ impl PostgresRedoManagerInternal {
// only update xids on the requested page
if tag.blknum == blkno {
status = pg_constants::TRANSACTION_STATUS_SUB_COMMITTED;
self.transaction_id_set_status_bit(subxact, status, &mut page);
transaction_id_set_status(subxact, status, &mut page);
}
}
}
} else if info == pg_constants::XLOG_XACT_ABORT {
status = pg_constants::TRANSACTION_STATUS_ABORTED;
self.transaction_id_set_status_bit(xlogrec.xl_xid, status, &mut page);
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
//handle subtrans
let _xact_time = buf.get_i64_le();
let mut xinfo = 0;
@@ -354,7 +339,7 @@ impl PostgresRedoManagerInternal {
// only update xids on the requested page
if tag.blknum == blkno {
status = pg_constants::TRANSACTION_STATUS_ABORTED;
self.transaction_id_set_status_bit(subxact, status, &mut page);
transaction_id_set_status(subxact, status, &mut page);
}
}
}

View File

@@ -18,6 +18,8 @@ fn main() {
// included header files changed.
.parse_callbacks(Box::new(bindgen::CargoCallbacks))
.whitelist_type("ControlFileData")
.whitelist_type("CheckPoint")
.whitelist_type("FullTransactionId")
.whitelist_var("PG_CONTROL_FILE_SIZE")
.whitelist_var("PG_CONTROLFILEDATA_OFFSETOF_CRC")
.whitelist_type("DBState")

View File

@@ -3,6 +3,7 @@
#![allow(non_snake_case)]
include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
pub mod nonrelfile_utils;
pub mod pg_constants;
pub mod relfile_utils;
pub mod xlog_utils;
@@ -11,6 +12,7 @@ use bytes::{Buf, Bytes, BytesMut};
// sizeof(ControlFileData)
const SIZEOF_CONTROLDATA: usize = std::mem::size_of::<ControlFileData>();
const SIZEOF_CHECKPOINT: usize = std::mem::size_of::<CheckPoint>();
const OFFSETOF_CRC: usize = PG_CONTROLFILEDATA_OFFSETOF_CRC as usize;
impl ControlFileData {
@@ -69,3 +71,42 @@ pub fn encode_pg_control(controlfile: ControlFileData) -> Bytes {
buf.into()
}
pub fn encode_checkpoint(checkpoint: CheckPoint) -> Bytes {
let b: [u8; SIZEOF_CHECKPOINT];
b = unsafe { std::mem::transmute::<CheckPoint, [u8; SIZEOF_CHECKPOINT]>(checkpoint) };
return Bytes::copy_from_slice(&b[..]);
}
pub fn decode_checkpoint(mut buf: Bytes) -> Result<CheckPoint, anyhow::Error> {
let mut b = [0u8; SIZEOF_CHECKPOINT];
buf.copy_to_slice(&mut b);
let checkpoint: CheckPoint;
checkpoint = unsafe { std::mem::transmute::<[u8; SIZEOF_CHECKPOINT], CheckPoint>(b) };
Ok(checkpoint)
}
impl CheckPoint {
pub fn new(lsn: u64, timeline: u32) -> CheckPoint {
CheckPoint {
redo: lsn,
ThisTimeLineID: timeline,
PrevTimeLineID: timeline,
fullPageWrites: true, // TODO: get actual value of full_page_writes
nextXid: FullTransactionId {
value: pg_constants::FIRST_NORMAL_TRANSACTION_ID as u64,
}, // TODO: handle epoch?
nextOid: pg_constants::FIRST_BOOTSTRAP_OBJECT_ID,
nextMulti: 1,
nextMultiOffset: 0,
oldestXid: pg_constants::FIRST_NORMAL_TRANSACTION_ID,
oldestXidDB: 0,
oldestMulti: 1,
oldestMultiDB: 0,
time: 0,
oldestCommitTsXid: 0,
newestCommitTsXid: 0,
oldestActiveXid: pg_constants::INVALID_TRANSACTION_ID,
}
}
}

View File

@@ -23,15 +23,21 @@ pub const PG_XACT_FORKNUM: u8 = 44;
pub const PG_MXACT_OFFSETS_FORKNUM: u8 = 45;
pub const PG_MXACT_MEMBERS_FORKNUM: u8 = 46;
pub const PG_TWOPHASE_FORKNUM: u8 = 47;
pub const PG_CHECKPOINT_FORKNUM: u8 = 48;
// From storage_xlog.h
pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001;
// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and
// --with-segsize=SEGSIZE, but assume the defaults for now.
pub const BLCKSZ: u16 = 8192;
pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32);
//
// constants from clog.h
//
pub const CLOG_XACTS_PER_BYTE: u32 = 4;
pub const CLOG_XACTS_PER_PAGE: u32 = 8192 * CLOG_XACTS_PER_BYTE;
pub const CLOG_XACTS_PER_PAGE: u32 = BLCKSZ as u32 * CLOG_XACTS_PER_BYTE;
pub const CLOG_BITS_PER_XACT: u8 = 2;
pub const CLOG_XACT_BITMASK: u8 = (1 << CLOG_BITS_PER_XACT) - 1;
@@ -42,6 +48,7 @@ pub const SIZE_OF_PAGE_HEADER: u16 = 24;
pub const BITS_PER_HEAPBLOCK: u16 = 2;
pub const HEAPBLOCKS_PER_PAGE: u16 = (BLCKSZ - SIZE_OF_PAGE_HEADER) * 8 / BITS_PER_HEAPBLOCK;
pub const TRANSACTION_STATUS_IN_PROGRESS: u8 = 0x00;
pub const TRANSACTION_STATUS_COMMITTED: u8 = 0x01;
pub const TRANSACTION_STATUS_ABORTED: u8 = 0x02;
pub const TRANSACTION_STATUS_SUB_COMMITTED: u8 = 0x03;
@@ -136,11 +143,6 @@ pub const XLOG_TBLSPC_DROP: u8 = 0x10;
pub const SIZEOF_XLOGRECORD: u32 = 24;
// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and
// --with-segsize=SEGSIZE, but assume the defaults for now.
pub const BLCKSZ: u16 = 8192;
pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32);
//
// from xlogrecord.h
//
@@ -162,3 +164,12 @@ pub const BKPBLOCK_SAME_REL: u8 = 0x80; /* RelFileNode omitted, same as previous
pub const BKPIMAGE_HAS_HOLE: u8 = 0x01; /* page image has "hole" */
pub const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */
pub const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */
/* From transam.h */
pub const FIRST_NORMAL_TRANSACTION_ID: u32 = 3;
pub const INVALID_TRANSACTION_ID: u32 = 0;
pub const FIRST_BOOTSTRAP_OBJECT_ID: u32 = 12000;
pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384;
/* FIXME: pageserver should request wal_seg_size from compute node */
pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024;

View File

@@ -51,13 +51,13 @@ impl Lsn {
}
/// Compute the offset into a segment
pub fn segment_offset(self, seg_sz: u64) -> u64 {
self.0 % seg_sz
pub fn segment_offset(self, seg_sz: usize) -> usize {
(self.0 % seg_sz as u64) as usize
}
/// Compute the segment number
pub fn segment_number(self, seg_sz: u64) -> u64 {
self.0 / seg_sz
pub fn segment_number(self, seg_sz: usize) -> u64 {
self.0 / seg_sz as u64
}
/// Compute the offset into a block
@@ -230,7 +230,7 @@ mod tests {
assert_eq!(Lsn(1234).checked_sub(1233u64), Some(Lsn(1)));
assert_eq!(Lsn(1234).checked_sub(1235u64), None);
let seg_sz = 16u64 * 1024 * 1024;
let seg_sz: usize = 16 * 1024 * 1024;
assert_eq!(Lsn(0x1000007).segment_offset(seg_sz), 7u64);
assert_eq!(Lsn(0x1000007).segment_number(seg_sz), 1u64);