diff --git a/Cargo.lock b/Cargo.lock index c3f688abb2..264165b622 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -343,6 +343,7 @@ dependencies = [ "bytes", "lazy_static", "postgres-protocol", + "rand", "tokio", "tokio-postgres", "tokio-stream", diff --git a/Cargo.toml b/Cargo.toml index 4c66c14caa..145aa5ec47 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +rand = "0.8.3" bytes = "1.0.1" byteorder = "1.4.3" lazy_static = "1.4.0" diff --git a/src/main.rs b/src/main.rs index 187bdcee4d..8bf7ac9b38 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,13 @@ use std::thread; mod page_cache; +mod page_service; mod waldecoder; mod walreceiver; -mod page_service; +mod walredo; use std::io::Error; +use std::time::Duration; fn main() -> Result<(), Error> { let mut threads = Vec::new(); @@ -25,11 +27,18 @@ fn main() -> Result<(), Error> { }); threads.push(page_server_thread); + // Since the GetPage@LSN network interface isn't working yet, mock that + // by calling the GetPage@LSN function with a random block every 5 seconds. + loop { + thread::sleep(Duration::from_secs(5)); - // never returns. - for t in threads { - t.join().unwrap() + page_cache::test_get_page_at_lsn(); } - Ok(()) + // never returns. + //for t in threads { + // t.join().unwrap() + //} + //let _unused = handler.join(); // never returns. + //Ok(()) } diff --git a/src/page_cache.rs b/src/page_cache.rs index 6329505e67..be63c05b14 100644 --- a/src/page_cache.rs +++ b/src/page_cache.rs @@ -1,8 +1,12 @@ use std::collections::BTreeMap; +use std::sync::Mutex; use bytes::Bytes; use lazy_static::lazy_static; +use rand::Rng; -#[derive(PartialEq, Eq, PartialOrd, Ord)] +use crate::walredo; + +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] pub struct BufferTag { pub spcnode: u32, pub dbnode: u32, @@ -11,14 +15,10 @@ pub struct BufferTag { pub blknum: u32, } -#[derive(PartialEq, Eq, PartialOrd, Ord)] -pub struct CacheKey { - pub tag: BufferTag, - pub lsn: u64 -} - +#[derive(Clone)] pub struct WALRecord { pub lsn: u64, + pub will_init: bool, pub rec: Bytes } @@ -35,26 +35,71 @@ pub struct WALRecord { // stored directly in the cache entry in that you still need to run the WAL redo // routine to generate the page image. // +#[derive(PartialEq, Eq, PartialOrd, Ord)] +pub struct CacheKey { + pub tag: BufferTag, + pub lsn: u64 +} + +#[derive(Clone)] enum CacheEntry { + PageImage(Bytes), - PageImage { - img: Bytes - }, - - WALRecord { - will_init: bool, - rec: Bytes - }, + WALRecord(WALRecord) } lazy_static! { - static ref PAGECACHE: BTreeMap = BTreeMap::new(); + static ref PAGECACHE: Mutex> = Mutex::new(BTreeMap::new()); } // Public interface functions + +// +// Simple test function for the WAL redo code: +// +// 1. Pick a page from the page cache at random. +// 2. Request that page with GetPage@LSN, using Max LSN (i.e. get the latest page version) +// +// +pub fn test_get_page_at_lsn() +{ + // for quick testing of the get_page_at_lsn() funcion. + // + // Get a random page from the page cache. Apply all its WAL, by requesting + // that page at the highest lsn. + + let mut tag: Option = None; + + { + let pagecache = PAGECACHE.lock().unwrap(); + + if pagecache.is_empty() { + println!("page cache is empty"); + return; + } + + // Find nth entry in the map, where + let n = rand::thread_rng().gen_range(0..pagecache.len()); + let mut i = 0; + for (key, _e) in pagecache.iter() { + if i == n { + tag = Some(key.tag); + break; + } + i +=1; + } + } + + println!("testing GetPage@LSN: {}", tag.unwrap().blknum); + + get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee); + +} + + // // GetPage@LSN // @@ -69,20 +114,66 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) // to the latest page image. Then apply all the WAL records up until the // given LSN. // + let minkey = CacheKey { + tag: tag, + lsn: 0 + }; + let maxkey = CacheKey { + tag: tag, + lsn: lsn + 1 + }; + let pagecache = PAGECACHE.lock().unwrap(); - // PAGECACHE.get(&tag); + let entries = pagecache.range(&minkey .. &maxkey); + let mut records: Vec = Vec::new(); + + let mut base_img: Option = None; + + for (key, e) in entries.rev() { + match e { + CacheEntry::PageImage(img) => { + // We have a base image. No need to dig deeper into the list of + // records + base_img = Some(img.clone()); + break; + } + CacheEntry::WALRecord(rec) => { + records.push(rec.clone()); + + if rec.will_init { + println!("WAL record at LSN {} initializes the page", rec.lsn); + } + } + } + } + + if !records.is_empty() { + records.reverse(); + + walredo::apply_wal_records(tag, base_img, &records).expect("could not apply WAL records"); + + println!("applied {} WAL records to produce page image at LSN {}", records.len(), lsn); + } } - - // // Add WAL record // #[allow(dead_code)] #[allow(unused_variables)] -pub fn put_wal_record(tag: BufferTag, lsn: u64, rec: Bytes) +pub fn put_wal_record(tag: BufferTag, rec: WALRecord) { + let key = CacheKey { + tag: tag, + lsn: rec.lsn + }; + let entry = CacheEntry::WALRecord(rec); + + let mut pagecache = PAGECACHE.lock().unwrap(); + + let oldentry = pagecache.insert(key, entry); + assert!(oldentry.is_none()); } diff --git a/src/waldecoder.rs b/src/waldecoder.rs index 7491950bfb..3b368efc0f 100644 --- a/src/waldecoder.rs +++ b/src/waldecoder.rs @@ -4,9 +4,7 @@ //#![allow(dead_code)] //include!(concat!(env!("OUT_DIR"), "/bindings.rs")); -use bytes::{Buf, BufMut, BytesMut}; - -use crate::page_cache::WALRecord; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use std::cmp::min; @@ -82,7 +80,7 @@ impl WalStreamDecoder { self.inputbuf.extend_from_slice(buf); } - pub fn poll_decode(&mut self) -> Option { + pub fn poll_decode(&mut self) -> Option<(u64, Bytes)> { loop { // parse and verify page boundaries as we go @@ -91,12 +89,12 @@ impl WalStreamDecoder { if self.lsn % WAL_SEGMENT_SIZE == 0 { // parse long header - if self.inputbuf.remaining() < SizeOfXLogShortPHD { + if self.inputbuf.remaining() < SizeOfXLogLongPHD { return None; } self.decode_XLogLongPageHeaderData(); - self.lsn += SizeOfXLogShortPHD as u64; + self.lsn += SizeOfXLogLongPHD as u64; // TODO: verify the fields in the header @@ -105,12 +103,12 @@ impl WalStreamDecoder { } else if self.lsn % (XLOG_BLCKSZ as u64) == 0 { // parse page header - if self.inputbuf.remaining() < SizeOfXLogLongPHD { + if self.inputbuf.remaining() < SizeOfXLogShortPHD { return None; } self.decode_XLogPageHeaderData(); - self.lsn += SizeOfXLogLongPHD as u64; + self.lsn += SizeOfXLogShortPHD as u64; // TODO: verify the fields in the header @@ -168,10 +166,7 @@ impl WalStreamDecoder { if self.contlen == 0 { let recordbuf = std::mem::replace(&mut self.recordbuf, BytesMut::new()); - let result = WALRecord { - lsn: self.reclsn, - rec: recordbuf.freeze(), - }; + let result = (self.reclsn, recordbuf.freeze()); if self.lsn % 8 != 0 { self.padlen = 8 - (self.lsn % 8) as u32; @@ -194,7 +189,7 @@ impl WalStreamDecoder { #[allow(non_snake_case)] fn decode_XLogPageHeaderData(&mut self) -> XLogPageHeaderData { - let buf = &mut self.recordbuf; + let buf = &mut self.inputbuf; // FIXME: Assume little-endian @@ -205,6 +200,11 @@ impl WalStreamDecoder { xlp_pageaddr: buf.get_u64_le(), xlp_rem_len: buf.get_u32_le() }; + // 4 bytes of padding, on 64-bit systems + buf.advance(4); + + // FIXME: check that hdr.xlp_rem_len matches self.contlen + //println!("next xlog page (xlp_rem_len: {})", hdr.xlp_rem_len); return hdr; } @@ -214,7 +214,6 @@ impl WalStreamDecoder { let hdr : XLogLongPageHeaderData = XLogLongPageHeaderData { std: self.decode_XLogPageHeaderData(), - // FIXME: eat padding xlp_sysid: self.recordbuf.get_u64_le(), xlp_seg_size: self.recordbuf.get_u32_le(), xlp_xlog_blcksz: self.recordbuf.get_u32_le(), @@ -241,7 +240,7 @@ const BKPBLOCK_FORK_MASK:u8 = 0x0F; const _BKPBLOCK_FLAG_MASK:u8 = 0xF0; const BKPBLOCK_HAS_IMAGE:u8 = 0x10; /* block data is an XLogRecordBlockImage */ const BKPBLOCK_HAS_DATA:u8 = 0x20; -const _BKPBLOCK_WILL_INIT:u8 = 0x40; /* redo will re-init the page */ +const BKPBLOCK_WILL_INIT:u8 = 0x40; /* redo will re-init the page */ const BKPBLOCK_SAME_REL:u8 = 0x80; /* RelFileNode omitted, same as previous */ /* Information stored in bimg_info */ @@ -250,23 +249,24 @@ const BKPIMAGE_IS_COMPRESSED:u8 = 0x02; /* page image is compressed */ const BKPIMAGE_APPLY:u8 = 0x04; /* page image should be restored during replay */ -struct DecodedBkpBlock { +pub struct DecodedBkpBlock { /* Is this block ref in use? */ //in_use: bool, /* Identify the block this refers to */ - rnode_spcnode: u32, - rnode_dbnode: u32, - rnode_relnode: u32, - forknum: u8, - blkno: u32, + pub rnode_spcnode: u32, + pub rnode_dbnode: u32, + pub rnode_relnode: u32, + pub forknum: u8, + pub blkno: u32, /* copy of the fork_flags field from the XLogRecordBlockHeader */ flags: u8, /* Information on full-page image, if any */ has_image: bool, /* has image, even for consistency checking */ - apply_image: bool, /* has image that should be restored */ + pub apply_image: bool, /* has image that should be restored */ + pub will_init: bool, //char *bkp_image; hole_offset: u16, hole_length: u16, @@ -282,12 +282,19 @@ struct DecodedBkpBlock { #[allow(non_upper_case_globals)] const SizeOfXLogRecord:u32 = 24; +pub struct DecodedWALRecord { + pub lsn: u64, + pub record: Bytes, // raw XLogRecord + + pub blocks: Vec +} + // // Routines to decode a WAL record and figure out which blocks are modified // -pub fn decode_wal_record(rec: &WALRecord) { +pub fn decode_wal_record(lsn: u64, rec: Bytes) -> DecodedWALRecord { - let mut buf = rec.rec.clone(); + let mut buf = rec.clone(); // FIXME: assume little-endian here let xl_tot_len = buf.get_u32_le(); @@ -315,6 +322,7 @@ pub fn decode_wal_record(rec: &WALRecord) { let mut max_block_id = 0; let mut datatotal: u32 = 0; + let mut blocks: Vec = Vec::new(); while buf.remaining() > datatotal as usize { let block_id = buf.get_u8(); @@ -355,6 +363,7 @@ pub fn decode_wal_record(rec: &WALRecord) { flags: 0, has_image: false, apply_image: false, + will_init: false, hole_offset: 0, hole_length: 0, bimg_len: 0, @@ -376,14 +385,12 @@ pub fn decode_wal_record(rec: &WALRecord) { } max_block_id = block_id; - //blk.in_use = true; // FIXME: pointless - //blk.apply_image = false; - fork_flags = buf.get_u8(); blk.forknum = fork_flags & BKPBLOCK_FORK_MASK; blk.flags = fork_flags; blk.has_image = (fork_flags & BKPBLOCK_HAS_IMAGE) != 0; blk.has_data = (fork_flags & BKPBLOCK_HAS_DATA) != 0; + blk.will_init = (fork_flags & BKPBLOCK_WILL_INIT) != 0; blk.data_len = buf.get_u16_le(); /* cross-check that the HAS_DATA flag is set iff data_length > 0 */ @@ -532,6 +539,8 @@ pub fn decode_wal_record(rec: &WALRecord) { blk.blkno = buf.get_u32_le(); println!("this record affects {}/{}/{} blk {}",rnode_spcnode, rnode_dbnode, rnode_relnode, blk.blkno); + + blocks.push(blk); } _ => { @@ -551,5 +560,10 @@ pub fn decode_wal_record(rec: &WALRecord) { */ // Since we don't care about the data payloads here, we're done. - + + return DecodedWALRecord { + lsn: lsn, + record: rec, + blocks: blocks + } } diff --git a/src/walreceiver.rs b/src/walreceiver.rs index e58527de44..8806a84d23 100644 --- a/src/walreceiver.rs +++ b/src/walreceiver.rs @@ -2,6 +2,8 @@ use tokio_stream::StreamExt; use tokio::runtime; use crate::waldecoder::WalStreamDecoder; +use crate::page_cache; +use crate::page_cache::BufferTag; use tokio_postgres::{connect_replication, NoTls, Error, ReplicationMode}; use postgres_protocol::message::backend::ReplicationMessage; @@ -69,16 +71,36 @@ pub async fn walreceiver_main() -> Result<(), Error> { waldecoder.feed_bytes(xlog_data.data()); loop { - let rec = waldecoder.poll_decode(); + if let Some((lsn, recdata)) = waldecoder.poll_decode() { - if rec.is_none() { + let decoded = crate::waldecoder::decode_wal_record(lsn, recdata.clone()); + println!("decoded record"); + + // Put the WAL record to the page cache. We make a separate copy of + // it for every block it modifes. (The actual WAL record is kept in + // a Bytes, which uses a reference counter for the underlying buffer, + // so having multiple copies of it doesn't cost that much) + for blk in decoded.blocks.iter() { + let tag = BufferTag { + spcnode: blk.rnode_spcnode, + dbnode: blk.rnode_dbnode, + relnode: blk.rnode_relnode, + forknum: blk.forknum as u32, + blknum: blk.blkno + }; + + let rec = page_cache::WALRecord { + lsn: lsn, + will_init: blk.will_init || blk.apply_image, + rec: recdata.clone() + }; + + page_cache::put_wal_record(tag, rec); + } + + } else { break; } - - crate::waldecoder::decode_wal_record(&rec.unwrap()); - println!("decoded record"); - - // TODO: Put the WAL record to the page cache } } ReplicationMessage::PrimaryKeepAlive(_keepalive) => { diff --git a/src/walredo.rs b/src/walredo.rs new file mode 100644 index 0000000000..598d2e6d79 --- /dev/null +++ b/src/walredo.rs @@ -0,0 +1,131 @@ +// +// WAL redo +// +// We rely on Postgres to perform WAL redo for us. We launch +// a postgres process in special "wal redo" mode that's similar +// to single-user mode. We then pass the WAL record and the previous +// page image, if any, to the postgress process, and ask the +// process to apply it. Then we get the page image back. Communication +// with the process happens via stdin/stdout +// +// TODO: Even though the postgres code runs in a separate process, +// it's not a secure sandbox. +// +use std::process::{Command, Stdio}; +use std::io::{Read, Write, Error}; +use std::assert; + +use bytes::{Bytes, BytesMut, BufMut}; + +use crate::page_cache::BufferTag; +use crate::page_cache::WALRecord; + + +// +// Apply given WAL records ('records') over an old page image. Returns +// new page image. +// +// +// FIXME: This is completely untested ATM. Will surely crash and burn. +// +pub fn apply_wal_records(tag: BufferTag, base_img: Option, records: &Vec) -> Result +{ + + // + // Start postgres binary in special WAL redo mode. + // + let mut child = + Command::new("postgres") + .arg("--wal-redo") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn() + .expect("postgres --wal-redo command failed to start"); + + let stdin = child.stdin.as_mut().expect("Failed to open stdin"); + + // Send base image, if any. (If the record initializes the page, previous page + // version is not needed.) + stdin.write(&build_begin_redo_for_block_msg(tag))?; + if base_img.is_some() { + stdin.write(&build_push_page_msg(tag, base_img.unwrap()))?; + } + + // Send WAL records. + for rec in records.iter() { + let r = rec.clone(); + + stdin.write(&build_apply_record_msg(r.lsn, r.rec))?; + } + + // Read back new page image + stdin.write(&build_get_page_msg(tag))?; + let mut buf = vec![0u8; 8192]; + child.stdout.unwrap().read_exact(&mut buf)?; + + // Kill the process. This closes its stdin, which should signal the process + // to terminate. TODO: SIGKILL if needed + //child.wait(); + + return Ok(Bytes::from(buf)); +} + +fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes +{ + let mut buf = BytesMut::new(); + + buf.put_u8('B' as u8); + buf.put_u32(4 + 5*4); + buf.put_u32(tag.spcnode); + buf.put_u32(tag.dbnode); + buf.put_u32(tag.relnode); + buf.put_u32(tag.forknum); + buf.put_u32(tag.blknum); + + return buf.freeze(); +} + +fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes +{ + assert!(base_img.len() == 8192); + + let mut buf = BytesMut::new(); + + buf.put_u8('P' as u8); + buf.put_u32(4 + 5*4 + base_img.len() as u32); + buf.put_u32(tag.spcnode); + buf.put_u32(tag.dbnode); + buf.put_u32(tag.relnode); + buf.put_u32(tag.forknum); + buf.put_u32(tag.blknum); + buf.put(base_img); + + return buf.freeze(); +} + +fn build_apply_record_msg(lsn: u64, rec: Bytes) -> Bytes { + + let mut buf = BytesMut::new(); + + buf.put_u8('A' as u8); + buf.put_u32(4 + 8 + rec.len() as u32); + buf.put_u64(lsn); + buf.put(rec); + + return buf.freeze(); +} + +fn build_get_page_msg(tag: BufferTag, ) -> Bytes { + + let mut buf = BytesMut::new(); + + buf.put_u8('G' as u8); + buf.put_u32(4 + 5*4); + buf.put_u32(tag.spcnode); + buf.put_u32(tag.dbnode); + buf.put_u32(tag.relnode); + buf.put_u32(tag.forknum); + buf.put_u32(tag.blknum); + + return buf.freeze(); +}