Implement WAL redo.

When a page is requested from the page cache (GetPage@LSN), launch postgres
in special "WAL redo" mode to reconstruct that page version.

Plus a bunch of misc fixes to the WAL decoding code.
This commit is contained in:
Heikki Linnakangas
2021-03-17 22:29:21 +02:00
committed by Stas Kelvich
parent 626b4e9987
commit af7ebb6395
7 changed files with 329 additions and 60 deletions

1
Cargo.lock generated
View File

@@ -343,6 +343,7 @@ dependencies = [
"bytes",
"lazy_static",
"postgres-protocol",
"rand",
"tokio",
"tokio-postgres",
"tokio-stream",

View File

@@ -7,6 +7,7 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
rand = "0.8.3"
bytes = "1.0.1"
byteorder = "1.4.3"
lazy_static = "1.4.0"

View File

@@ -1,11 +1,13 @@
use std::thread;
mod page_cache;
mod page_service;
mod waldecoder;
mod walreceiver;
mod page_service;
mod walredo;
use std::io::Error;
use std::time::Duration;
fn main() -> Result<(), Error> {
let mut threads = Vec::new();
@@ -25,11 +27,18 @@ fn main() -> Result<(), Error> {
});
threads.push(page_server_thread);
// Since the GetPage@LSN network interface isn't working yet, mock that
// by calling the GetPage@LSN function with a random block every 5 seconds.
loop {
thread::sleep(Duration::from_secs(5));
// never returns.
for t in threads {
t.join().unwrap()
page_cache::test_get_page_at_lsn();
}
Ok(())
// never returns.
//for t in threads {
// t.join().unwrap()
//}
//let _unused = handler.join(); // never returns.
//Ok(())
}

View File

@@ -1,8 +1,12 @@
use std::collections::BTreeMap;
use std::sync::Mutex;
use bytes::Bytes;
use lazy_static::lazy_static;
use rand::Rng;
#[derive(PartialEq, Eq, PartialOrd, Ord)]
use crate::walredo;
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
pub struct BufferTag {
pub spcnode: u32,
pub dbnode: u32,
@@ -11,14 +15,10 @@ pub struct BufferTag {
pub blknum: u32,
}
#[derive(PartialEq, Eq, PartialOrd, Ord)]
pub struct CacheKey {
pub tag: BufferTag,
pub lsn: u64
}
#[derive(Clone)]
pub struct WALRecord {
pub lsn: u64,
pub will_init: bool,
pub rec: Bytes
}
@@ -35,26 +35,71 @@ pub struct WALRecord {
// stored directly in the cache entry in that you still need to run the WAL redo
// routine to generate the page image.
//
#[derive(PartialEq, Eq, PartialOrd, Ord)]
pub struct CacheKey {
pub tag: BufferTag,
pub lsn: u64
}
#[derive(Clone)]
enum CacheEntry {
PageImage(Bytes),
PageImage {
img: Bytes
},
WALRecord {
will_init: bool,
rec: Bytes
},
WALRecord(WALRecord)
}
lazy_static! {
static ref PAGECACHE: BTreeMap<CacheKey, CacheEntry> = BTreeMap::new();
static ref PAGECACHE: Mutex<BTreeMap<CacheKey, CacheEntry>> = Mutex::new(BTreeMap::new());
}
// Public interface functions
//
// Simple test function for the WAL redo code:
//
// 1. Pick a page from the page cache at random.
// 2. Request that page with GetPage@LSN, using Max LSN (i.e. get the latest page version)
//
//
pub fn test_get_page_at_lsn()
{
// for quick testing of the get_page_at_lsn() funcion.
//
// Get a random page from the page cache. Apply all its WAL, by requesting
// that page at the highest lsn.
let mut tag: Option<BufferTag> = None;
{
let pagecache = PAGECACHE.lock().unwrap();
if pagecache.is_empty() {
println!("page cache is empty");
return;
}
// Find nth entry in the map, where
let n = rand::thread_rng().gen_range(0..pagecache.len());
let mut i = 0;
for (key, _e) in pagecache.iter() {
if i == n {
tag = Some(key.tag);
break;
}
i +=1;
}
}
println!("testing GetPage@LSN: {}", tag.unwrap().blknum);
get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee);
}
//
// GetPage@LSN
//
@@ -69,20 +114,66 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64)
// to the latest page image. Then apply all the WAL records up until the
// given LSN.
//
let minkey = CacheKey {
tag: tag,
lsn: 0
};
let maxkey = CacheKey {
tag: tag,
lsn: lsn + 1
};
let pagecache = PAGECACHE.lock().unwrap();
// PAGECACHE.get(&tag);
let entries = pagecache.range(&minkey .. &maxkey);
let mut records: Vec<WALRecord> = Vec::new();
let mut base_img: Option<Bytes> = None;
for (key, e) in entries.rev() {
match e {
CacheEntry::PageImage(img) => {
// We have a base image. No need to dig deeper into the list of
// records
base_img = Some(img.clone());
break;
}
CacheEntry::WALRecord(rec) => {
records.push(rec.clone());
if rec.will_init {
println!("WAL record at LSN {} initializes the page", rec.lsn);
}
}
}
}
if !records.is_empty() {
records.reverse();
walredo::apply_wal_records(tag, base_img, &records).expect("could not apply WAL records");
println!("applied {} WAL records to produce page image at LSN {}", records.len(), lsn);
}
}
//
// Add WAL record
//
#[allow(dead_code)]
#[allow(unused_variables)]
pub fn put_wal_record(tag: BufferTag, lsn: u64, rec: Bytes)
pub fn put_wal_record(tag: BufferTag, rec: WALRecord)
{
let key = CacheKey {
tag: tag,
lsn: rec.lsn
};
let entry = CacheEntry::WALRecord(rec);
let mut pagecache = PAGECACHE.lock().unwrap();
let oldentry = pagecache.insert(key, entry);
assert!(oldentry.is_none());
}

View File

@@ -4,9 +4,7 @@
//#![allow(dead_code)]
//include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
use bytes::{Buf, BufMut, BytesMut};
use crate::page_cache::WALRecord;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use std::cmp::min;
@@ -82,7 +80,7 @@ impl WalStreamDecoder {
self.inputbuf.extend_from_slice(buf);
}
pub fn poll_decode(&mut self) -> Option<WALRecord> {
pub fn poll_decode(&mut self) -> Option<(u64, Bytes)> {
loop {
// parse and verify page boundaries as we go
@@ -91,12 +89,12 @@ impl WalStreamDecoder {
if self.lsn % WAL_SEGMENT_SIZE == 0 {
// parse long header
if self.inputbuf.remaining() < SizeOfXLogShortPHD {
if self.inputbuf.remaining() < SizeOfXLogLongPHD {
return None;
}
self.decode_XLogLongPageHeaderData();
self.lsn += SizeOfXLogShortPHD as u64;
self.lsn += SizeOfXLogLongPHD as u64;
// TODO: verify the fields in the header
@@ -105,12 +103,12 @@ impl WalStreamDecoder {
} else if self.lsn % (XLOG_BLCKSZ as u64) == 0 {
// parse page header
if self.inputbuf.remaining() < SizeOfXLogLongPHD {
if self.inputbuf.remaining() < SizeOfXLogShortPHD {
return None;
}
self.decode_XLogPageHeaderData();
self.lsn += SizeOfXLogLongPHD as u64;
self.lsn += SizeOfXLogShortPHD as u64;
// TODO: verify the fields in the header
@@ -168,10 +166,7 @@ impl WalStreamDecoder {
if self.contlen == 0 {
let recordbuf = std::mem::replace(&mut self.recordbuf, BytesMut::new());
let result = WALRecord {
lsn: self.reclsn,
rec: recordbuf.freeze(),
};
let result = (self.reclsn, recordbuf.freeze());
if self.lsn % 8 != 0 {
self.padlen = 8 - (self.lsn % 8) as u32;
@@ -194,7 +189,7 @@ impl WalStreamDecoder {
#[allow(non_snake_case)]
fn decode_XLogPageHeaderData(&mut self) -> XLogPageHeaderData {
let buf = &mut self.recordbuf;
let buf = &mut self.inputbuf;
// FIXME: Assume little-endian
@@ -205,6 +200,11 @@ impl WalStreamDecoder {
xlp_pageaddr: buf.get_u64_le(),
xlp_rem_len: buf.get_u32_le()
};
// 4 bytes of padding, on 64-bit systems
buf.advance(4);
// FIXME: check that hdr.xlp_rem_len matches self.contlen
//println!("next xlog page (xlp_rem_len: {})", hdr.xlp_rem_len);
return hdr;
}
@@ -214,7 +214,6 @@ impl WalStreamDecoder {
let hdr : XLogLongPageHeaderData = XLogLongPageHeaderData {
std: self.decode_XLogPageHeaderData(),
// FIXME: eat padding
xlp_sysid: self.recordbuf.get_u64_le(),
xlp_seg_size: self.recordbuf.get_u32_le(),
xlp_xlog_blcksz: self.recordbuf.get_u32_le(),
@@ -241,7 +240,7 @@ const BKPBLOCK_FORK_MASK:u8 = 0x0F;
const _BKPBLOCK_FLAG_MASK:u8 = 0xF0;
const BKPBLOCK_HAS_IMAGE:u8 = 0x10; /* block data is an XLogRecordBlockImage */
const BKPBLOCK_HAS_DATA:u8 = 0x20;
const _BKPBLOCK_WILL_INIT:u8 = 0x40; /* redo will re-init the page */
const BKPBLOCK_WILL_INIT:u8 = 0x40; /* redo will re-init the page */
const BKPBLOCK_SAME_REL:u8 = 0x80; /* RelFileNode omitted, same as previous */
/* Information stored in bimg_info */
@@ -250,23 +249,24 @@ const BKPIMAGE_IS_COMPRESSED:u8 = 0x02; /* page image is compressed */
const BKPIMAGE_APPLY:u8 = 0x04; /* page image should be restored during replay */
struct DecodedBkpBlock {
pub struct DecodedBkpBlock {
/* Is this block ref in use? */
//in_use: bool,
/* Identify the block this refers to */
rnode_spcnode: u32,
rnode_dbnode: u32,
rnode_relnode: u32,
forknum: u8,
blkno: u32,
pub rnode_spcnode: u32,
pub rnode_dbnode: u32,
pub rnode_relnode: u32,
pub forknum: u8,
pub blkno: u32,
/* copy of the fork_flags field from the XLogRecordBlockHeader */
flags: u8,
/* Information on full-page image, if any */
has_image: bool, /* has image, even for consistency checking */
apply_image: bool, /* has image that should be restored */
pub apply_image: bool, /* has image that should be restored */
pub will_init: bool,
//char *bkp_image;
hole_offset: u16,
hole_length: u16,
@@ -282,12 +282,19 @@ struct DecodedBkpBlock {
#[allow(non_upper_case_globals)]
const SizeOfXLogRecord:u32 = 24;
pub struct DecodedWALRecord {
pub lsn: u64,
pub record: Bytes, // raw XLogRecord
pub blocks: Vec<DecodedBkpBlock>
}
//
// Routines to decode a WAL record and figure out which blocks are modified
//
pub fn decode_wal_record(rec: &WALRecord) {
pub fn decode_wal_record(lsn: u64, rec: Bytes) -> DecodedWALRecord {
let mut buf = rec.rec.clone();
let mut buf = rec.clone();
// FIXME: assume little-endian here
let xl_tot_len = buf.get_u32_le();
@@ -315,6 +322,7 @@ pub fn decode_wal_record(rec: &WALRecord) {
let mut max_block_id = 0;
let mut datatotal: u32 = 0;
let mut blocks: Vec<DecodedBkpBlock> = Vec::new();
while buf.remaining() > datatotal as usize {
let block_id = buf.get_u8();
@@ -355,6 +363,7 @@ pub fn decode_wal_record(rec: &WALRecord) {
flags: 0,
has_image: false,
apply_image: false,
will_init: false,
hole_offset: 0,
hole_length: 0,
bimg_len: 0,
@@ -376,14 +385,12 @@ pub fn decode_wal_record(rec: &WALRecord) {
}
max_block_id = block_id;
//blk.in_use = true; // FIXME: pointless
//blk.apply_image = false;
fork_flags = buf.get_u8();
blk.forknum = fork_flags & BKPBLOCK_FORK_MASK;
blk.flags = fork_flags;
blk.has_image = (fork_flags & BKPBLOCK_HAS_IMAGE) != 0;
blk.has_data = (fork_flags & BKPBLOCK_HAS_DATA) != 0;
blk.will_init = (fork_flags & BKPBLOCK_WILL_INIT) != 0;
blk.data_len = buf.get_u16_le();
/* cross-check that the HAS_DATA flag is set iff data_length > 0 */
@@ -532,6 +539,8 @@ pub fn decode_wal_record(rec: &WALRecord) {
blk.blkno = buf.get_u32_le();
println!("this record affects {}/{}/{} blk {}",rnode_spcnode, rnode_dbnode, rnode_relnode, blk.blkno);
blocks.push(blk);
}
_ => {
@@ -551,5 +560,10 @@ pub fn decode_wal_record(rec: &WALRecord) {
*/
// Since we don't care about the data payloads here, we're done.
return DecodedWALRecord {
lsn: lsn,
record: rec,
blocks: blocks
}
}

View File

@@ -2,6 +2,8 @@ use tokio_stream::StreamExt;
use tokio::runtime;
use crate::waldecoder::WalStreamDecoder;
use crate::page_cache;
use crate::page_cache::BufferTag;
use tokio_postgres::{connect_replication, NoTls, Error, ReplicationMode};
use postgres_protocol::message::backend::ReplicationMessage;
@@ -69,16 +71,36 @@ pub async fn walreceiver_main() -> Result<(), Error> {
waldecoder.feed_bytes(xlog_data.data());
loop {
let rec = waldecoder.poll_decode();
if let Some((lsn, recdata)) = waldecoder.poll_decode() {
if rec.is_none() {
let decoded = crate::waldecoder::decode_wal_record(lsn, recdata.clone());
println!("decoded record");
// Put the WAL record to the page cache. We make a separate copy of
// it for every block it modifes. (The actual WAL record is kept in
// a Bytes, which uses a reference counter for the underlying buffer,
// so having multiple copies of it doesn't cost that much)
for blk in decoded.blocks.iter() {
let tag = BufferTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum as u32,
blknum: blk.blkno
};
let rec = page_cache::WALRecord {
lsn: lsn,
will_init: blk.will_init || blk.apply_image,
rec: recdata.clone()
};
page_cache::put_wal_record(tag, rec);
}
} else {
break;
}
crate::waldecoder::decode_wal_record(&rec.unwrap());
println!("decoded record");
// TODO: Put the WAL record to the page cache
}
}
ReplicationMessage::PrimaryKeepAlive(_keepalive) => {

131
src/walredo.rs Normal file
View File

@@ -0,0 +1,131 @@
//
// WAL redo
//
// We rely on Postgres to perform WAL redo for us. We launch
// a postgres process in special "wal redo" mode that's similar
// to single-user mode. We then pass the WAL record and the previous
// page image, if any, to the postgress process, and ask the
// process to apply it. Then we get the page image back. Communication
// with the process happens via stdin/stdout
//
// TODO: Even though the postgres code runs in a separate process,
// it's not a secure sandbox.
//
use std::process::{Command, Stdio};
use std::io::{Read, Write, Error};
use std::assert;
use bytes::{Bytes, BytesMut, BufMut};
use crate::page_cache::BufferTag;
use crate::page_cache::WALRecord;
//
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
//
// FIXME: This is completely untested ATM. Will surely crash and burn.
//
pub fn apply_wal_records(tag: BufferTag, base_img: Option<Bytes>, records: &Vec<WALRecord>) -> Result<Bytes, Error>
{
//
// Start postgres binary in special WAL redo mode.
//
let mut child =
Command::new("postgres")
.arg("--wal-redo")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.expect("postgres --wal-redo command failed to start");
let stdin = child.stdin.as_mut().expect("Failed to open stdin");
// Send base image, if any. (If the record initializes the page, previous page
// version is not needed.)
stdin.write(&build_begin_redo_for_block_msg(tag))?;
if base_img.is_some() {
stdin.write(&build_push_page_msg(tag, base_img.unwrap()))?;
}
// Send WAL records.
for rec in records.iter() {
let r = rec.clone();
stdin.write(&build_apply_record_msg(r.lsn, r.rec))?;
}
// Read back new page image
stdin.write(&build_get_page_msg(tag))?;
let mut buf = vec![0u8; 8192];
child.stdout.unwrap().read_exact(&mut buf)?;
// Kill the process. This closes its stdin, which should signal the process
// to terminate. TODO: SIGKILL if needed
//child.wait();
return Ok(Bytes::from(buf));
}
fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes
{
let mut buf = BytesMut::new();
buf.put_u8('B' as u8);
buf.put_u32(4 + 5*4);
buf.put_u32(tag.spcnode);
buf.put_u32(tag.dbnode);
buf.put_u32(tag.relnode);
buf.put_u32(tag.forknum);
buf.put_u32(tag.blknum);
return buf.freeze();
}
fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes
{
assert!(base_img.len() == 8192);
let mut buf = BytesMut::new();
buf.put_u8('P' as u8);
buf.put_u32(4 + 5*4 + base_img.len() as u32);
buf.put_u32(tag.spcnode);
buf.put_u32(tag.dbnode);
buf.put_u32(tag.relnode);
buf.put_u32(tag.forknum);
buf.put_u32(tag.blknum);
buf.put(base_img);
return buf.freeze();
}
fn build_apply_record_msg(lsn: u64, rec: Bytes) -> Bytes {
let mut buf = BytesMut::new();
buf.put_u8('A' as u8);
buf.put_u32(4 + 8 + rec.len() as u32);
buf.put_u64(lsn);
buf.put(rec);
return buf.freeze();
}
fn build_get_page_msg(tag: BufferTag, ) -> Bytes {
let mut buf = BytesMut::new();
buf.put_u8('G' as u8);
buf.put_u32(4 + 5*4);
buf.put_u32(tag.spcnode);
buf.put_u32(tag.dbnode);
buf.put_u32(tag.relnode);
buf.put_u32(tag.forknum);
buf.put_u32(tag.blknum);
return buf.freeze();
}