Restore CLOG from snapshot

This commit is contained in:
Konstantin Knizhnik
2021-04-30 14:22:34 +03:00
parent 086c0ad829
commit eea6f0898e
7 changed files with 88 additions and 13 deletions

View File

@@ -160,9 +160,7 @@ pub fn init_repo(local_env: &mut LocalEnv) -> Result<()> {
// Move the initial WAL file
fs::rename(
tmppath
.join("pg_wal")
.join("000000010000000000000001"),
tmppath.join("pg_wal").join("000000010000000000000001"),
timelinedir
.join("wal")
.join("000000010000000000000001.partial"),

View File

@@ -25,7 +25,10 @@ fn test_embedded_wal_proposer() {
// start postgres
let maintli = storage_cplane.get_branch_timeline("main");
let node = compute_cplane.new_test_master_node(maintli);
node.append_conf("postgresql.conf", &format!("wal_acceptors='{}'\n", wal_acceptors));
node.append_conf(
"postgresql.conf",
&format!("wal_acceptors='{}'\n", wal_acceptors),
);
node.start().unwrap();
// check basic work with table

View File

@@ -33,6 +33,7 @@ use crate::page_cache::RelTag;
use crate::waldecoder::{decode_wal_record, WalStreamDecoder};
use crate::PageServerConf;
use crate::ZTimelineId;
use postgres_ffi::pg_constants;
use postgres_ffi::xlog_utils::*;
use zenith_utils::lsn::Lsn;
@@ -170,7 +171,17 @@ fn restore_snapshot(
}
}
}
for entry in fs::read_dir(snapshotpath.join("pg_xact"))? {
let entry = entry?;
restore_nonrelfile(
conf,
pcache,
timeline,
snapshot,
pg_constants::PG_XACT_FORKNUM,
&entry.path(),
)?;
}
// TODO: Scan pg_tblspc
Ok(())
@@ -241,6 +252,64 @@ fn restore_relfile(
Ok(())
}
fn restore_nonrelfile(
_conf: &PageServerConf,
pcache: &PageCache,
_timeline: ZTimelineId,
snapshot: &str,
forknum: u32,
path: &Path,
) -> Result<()> {
let lsn = Lsn::from_hex(snapshot)?;
// Does it look like a relation file?
let mut file = File::open(path)?;
let mut buf: [u8; 8192] = [0u8; 8192];
let segno = u32::from_str_radix(path.file_name().unwrap().to_str().unwrap(), 16)?;
// FIXME: use constants (BLCKSZ)
let mut blknum: u32 = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
loop {
let r = file.read_exact(&mut buf);
match r {
Ok(_) => {
let tag = BufferTag {
rel: RelTag {
spcnode: 0,
dbnode: 0,
relnode: 0,
forknum: forknum as u8,
},
blknum,
};
pcache.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf));
/*
if oldest_lsn == 0 || p.lsn < oldest_lsn {
oldest_lsn = p.lsn;
}
*/
}
// TODO: UnexpectedEof is expected
Err(e) => match e.kind() {
std::io::ErrorKind::UnexpectedEof => {
// reached EOF. That's expected.
// FIXME: maybe check that we read the full length of the file?
break;
}
_ => {
error!("error reading file: {:?} ({})", path, e);
break;
}
},
};
blknum += 1;
}
Ok(())
}
// Scan WAL on a timeline, starting from gien LSN, and load all the records
// into the page cache.
fn restore_wal(
@@ -312,7 +381,6 @@ fn restore_wal(
},
blknum: blk.blkno,
};
let rec = page_cache::WALRecord {
lsn,
will_init: blk.will_init || blk.apply_image,

View File

@@ -619,6 +619,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_XACT_FORKNUM as u8;
blk.blkno = buf.get_i32_le() as u32;
blk.will_init = true;
trace!("RM_CLOG_ID updates block {}", blk.blkno);
blocks.push(blk);
} else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {

View File

@@ -242,11 +242,13 @@ impl WalRedoManagerInternal {
let apply_result: Result<Bytes, Error>;
if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM as u8 {
//TODO use base image if any
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
let zero_page_bytes: &[u8] = &ZERO_PAGE;
let mut page = BytesMut::from(zero_page_bytes);
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
let mut page = BytesMut::new();
if let Some(fpi) = base_img {
page.extend_from_slice(&fpi[..]);
} else {
page.extend_from_slice(&ZERO_PAGE);
}
for record in records {
let mut buf = record.rec.clone();
@@ -265,7 +267,7 @@ impl WalRedoManagerInternal {
if xlogrec.xl_rmid == pg_constants::RM_CLOG_ID {
let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK;
if info == pg_constants::CLOG_ZEROPAGE {
page.clone_from_slice(zero_page_bytes);
page.clone_from_slice(&ZERO_PAGE);
}
} else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;

View File

@@ -29,6 +29,9 @@ pub const CLOG_TRUNCATE: u8 = 0x10;
pub const XLOG_XACT_COMMIT: u8 = 0x00;
pub const XLOG_XACT_ABORT: u8 = 0x20;
// From srlu.h
pub const SLRU_PAGES_PER_SEGMENT: u32 = 32;
/* mask for filtering opcodes out of xl_info */
pub const XLOG_XACT_OPMASK: u8 = 0x70;
/* does this record have a 'xinfo' field or not */