From 04dc698d4be8c45d0ea8be7f649735f0144733a1 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Sun, 16 May 2021 00:03:08 +0300 Subject: [PATCH] Add support of twophase transactions --- pageserver/src/basebackup.rs | 57 ++++++++++++++++----- pageserver/src/page_service.rs | 8 +-- pageserver/src/repository.rs | 7 ++- pageserver/src/repository/rocksdb.rs | 68 +++++++++++++++++++++---- pageserver/src/restore_local_repo.rs | 75 +++++++++++++++++----------- pageserver/src/waldecoder.rs | 5 ++ pageserver/src/walredo.rs | 2 +- postgres_ffi/src/pg_constants.rs | 2 + 8 files changed, 169 insertions(+), 55 deletions(-) diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 6314c3586d..cacd5c515f 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -12,17 +12,22 @@ use postgres_ffi::relfile_utils::*; use zenith_utils::lsn::Lsn; fn new_tar_header(path: &str, size: u64) -> anyhow::Result
{ - let mut header = Header::new_gnu(); - header.set_size(size); + let mut header = Header::new_gnu(); + header.set_size(size); header.set_path(path)?; - header.set_mode(0b110000000); - header.set_mtime(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs()); - header.set_cksum(); - Ok(header) + header.set_mode(0b110000000); + header.set_mtime( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(), + ); + header.set_cksum(); + Ok(header) } // -// Generate SRLU segment files from repoistory +// Generate SRLU segment files from repository // fn add_slru_segments( ar: &mut Builder<&mut dyn Write>, @@ -52,7 +57,7 @@ fn add_slru_segments( let segno = page / pg_constants::SLRU_PAGES_PER_SEGMENT; if curr_segno.is_some() && curr_segno.unwrap() != segno { let segname = format!("{}/{:>04X}", path, curr_segno.unwrap()); - let header = new_tar_header(&segname, SEG_SIZE as u64)?; + let header = new_tar_header(&segname, SEG_SIZE as u64)?; ar.append(&header, &seg_buf[..])?; seg_buf = [0u8; SEG_SIZE]; } @@ -65,21 +70,21 @@ fn add_slru_segments( } if curr_segno.is_some() { let segname = format!("{}/{:>04X}", path, curr_segno.unwrap()); - let header = new_tar_header(&segname, SEG_SIZE as u64)?; + let header = new_tar_header(&segname, SEG_SIZE as u64)?; ar.append(&header, &seg_buf[..])?; } Ok(()) } // -// Extract pg_filenode.map files from repoistory +// Extract pg_filenode.map files from repository // fn add_relmap_files( ar: &mut Builder<&mut dyn Write>, timeline: &Arc, lsn: Lsn, ) -> anyhow::Result<()> { - for db in timeline.get_databases()?.iter() { + for db in timeline.get_databases(lsn)?.iter() { let tag = BufferTag { rel: *db, blknum: 0, @@ -92,7 +97,34 @@ fn add_relmap_files( assert!(db.spcnode == pg_constants::DEFAULTTABLESPACE_OID); format!("base/{}/pg_filenode.map", db.dbnode) }; - let header = new_tar_header(&path, 512)?; + assert!(img.len() == 512); + let header = new_tar_header(&path, img.len() as u64)?; + ar.append(&header, &img[..])?; + } + Ok(()) +} + +// +// Extract twophase state files +// +fn add_twophase_files( + ar: &mut Builder<&mut dyn Write>, + timeline: &Arc, + lsn: Lsn, +) -> anyhow::Result<()> { + for xid in timeline.get_twophase(lsn)?.iter() { + let tag = BufferTag { + rel: RelTag { + spcnode: 0, + dbnode: 0, + relnode: 0, + forknum: pg_constants::PG_TWOPHASE_FORKNUM, + }, + blknum: *xid, + }; + let img = timeline.get_page_at_lsn(tag, lsn)?; + let path = format!("pg_twophase/{:>08X}", xid); + let header = new_tar_header(&path, img.len() as u64)?; ar.append(&header, &img[..])?; } Ok(()) @@ -175,6 +207,7 @@ pub fn send_tarball_at_lsn( lsn, )?; add_relmap_files(&mut ar, timeline, lsn)?; + add_twophase_files(&mut ar, timeline, lsn)?; // FIXME: Also send all the WAL. The compute node would only need // the WAL that applies to non-relation files, because the page diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 79998c37ba..c1e159d3bb 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -827,12 +827,14 @@ impl Connection { ) -> anyhow::Result<()> { // check that the timeline exists let repository = page_cache::get_repository(); - let timeline = repository.get_or_restore_timeline(timelineid).map_err(|_| { - anyhow!( + let timeline = repository + .get_or_restore_timeline(timelineid) + .map_err(|_| { + anyhow!( "client requested basebackup on timeline {} which does not exist in page server", timelineid ) - })?; + })?; /* switch client to COPYOUT */ let stream = &mut self.stream; stream.write_u8(b'H')?; diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index cb3eb5f67e..05c3eb6cce 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -1,6 +1,6 @@ pub mod rocksdb; -use crate::waldecoder::{DecodedWALRecord, Oid}; +use crate::waldecoder::{DecodedWALRecord, Oid, TransactionId}; use crate::ZTimelineId; use anyhow::Result; use bytes::{Buf, BufMut, Bytes, BytesMut}; @@ -112,7 +112,10 @@ pub trait Timeline { fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)>; /// Get vector of databases (represented using RelTag only dbnode and spcnode fields are used) - fn get_databases(&self) -> Result>; + fn get_databases(&self, lsn: Lsn) -> Result>; + + /// Get vector of prepared twophase transactions + fn get_twophase(&self, lsn: Lsn) -> Result>; } #[derive(Clone)] diff --git a/pageserver/src/repository/rocksdb.rs b/pageserver/src/repository/rocksdb.rs index 0ee33e11e3..48e50afeda 100644 --- a/pageserver/src/repository/rocksdb.rs +++ b/pageserver/src/repository/rocksdb.rs @@ -7,7 +7,7 @@ use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord}; use crate::restore_local_repo::restore_timeline; -use crate::waldecoder::{DecodedWALRecord, Oid, XlCreateDatabase, XlSmgrTruncate}; +use crate::waldecoder::{DecodedWALRecord, Oid, TransactionId, XlCreateDatabase, XlSmgrTruncate}; use crate::walredo::WalRedoManager; use crate::PageServerConf; use crate::ZTimelineId; @@ -423,6 +423,19 @@ impl RocksTimeline { let mut minkey = maxkey.clone(); minkey.lsn = Lsn(0); // first version + // Special handling of delete of PREPARE WAL record + if last_lsn < horizon + && key.tag.rel.forknum == pg_constants::PG_TWOPHASE_FORKNUM + { + if (v[0] & UNUSED_VERSION_FLAG) == 0 { + let mut v = v.to_owned(); + v[0] |= UNUSED_VERSION_FLAG; + self.db.put(k, &v[..])?; + deleted += 1; + } + maxkey = minkey; + continue; + } // reconstruct most recent page version if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD { // force reconstruction of most recent page version @@ -632,8 +645,45 @@ impl Timeline for RocksTimeline { self.relsize_get_nowait(rel, lsn) } + /// Get vector of prepared twophase transactions + fn get_twophase(&self, lsn: Lsn) -> Result> { + let key = CacheKey { + // minimal key + tag: BufferTag { + rel: RelTag { + forknum: pg_constants::PG_TWOPHASE_FORKNUM, + spcnode: 0, + dbnode: 0, + relnode: 0, + }, + blknum: 0, + }, + lsn: Lsn(0), + }; + let mut buf = BytesMut::new(); + key.pack(&mut buf); + let mut gxacts = Vec::new(); + + let mut iter = self.db.raw_iterator(); + iter.seek(&buf[..]); + while iter.valid() { + let k = iter.key().unwrap(); + buf.clear(); + buf.extend_from_slice(&k); + let key = CacheKey::unpack(&mut buf); + if key.tag.rel.forknum != pg_constants::PG_TWOPHASE_FORKNUM { + break; // we are done with this fork + } + if key.lsn <= lsn { + gxacts.push(key.tag.blknum); // XID + } + iter.next(); + } + return Ok(gxacts); + } + /// Get databases. This function is used to local pg_filenode.map files - fn get_databases(&self) -> Result> { + fn get_databases(&self, lsn: Lsn) -> Result> { let key = CacheKey { // minimal key tag: BufferTag { @@ -658,13 +708,13 @@ impl Timeline for RocksTimeline { let k = iter.key().unwrap(); buf.clear(); buf.extend_from_slice(&k); - let tag = RelTag::unpack(&mut buf); - if tag.forknum != pg_constants::PG_FILENODEMAP_FORKNUM { + let key = CacheKey::unpack(&mut buf); + if key.tag.rel.forknum != pg_constants::PG_FILENODEMAP_FORKNUM { break; // we are done with this fork } - if tag != prev_tag { - dbs.push(tag); // collect unique tags - prev_tag = tag; + if key.tag.rel != prev_tag && key.lsn <= lsn { + prev_tag = key.tag.rel; + dbs.push(prev_tag); // collect unique tags } iter.next(); } @@ -816,8 +866,8 @@ impl Timeline for RocksTimeline { let mut val_buf = BytesMut::new(); content.pack(&mut val_buf); - // Zero size of page image indicates that SLRU page was truncated - if img_len == 0 && key.tag.rel.forknum > pg_constants::PG_XACT_FORKNUM { + // Zero size of page image indicates that page can be removed + if img_len == 0 { if (val_buf[0] & UNUSED_VERSION_FLAG) != 0 { // records already marked for deletion return; diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 1c24964fc1..0824245451 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -23,7 +23,7 @@ use anyhow::Result; use bytes::Bytes; use crate::repository::{BufferTag, RelTag, Timeline}; -use crate::waldecoder::{Oid, decode_wal_record, WalStreamDecoder}; +use crate::waldecoder::{decode_wal_record, Oid, WalStreamDecoder}; use crate::PageServerConf; use crate::ZTimelineId; use postgres_ffi::pg_constants; @@ -121,25 +121,27 @@ fn restore_snapshot( // These special files appear in the snapshot, but are not needed by the page server Some("pg_control") => restore_nonrel_file( - conf, - timeline, - timelineid, - snapshot, - pg_constants::GLOBALTABLESPACE_OID, - 0, - pg_constants::PG_CONTROLFILE_FORKNUM, - &direntry.path(), - )?, + conf, + timeline, + timelineid, + snapshot, + pg_constants::GLOBALTABLESPACE_OID, + 0, + pg_constants::PG_CONTROLFILE_FORKNUM, + 0, + &direntry.path(), + )?, Some("pg_filenode.map") => restore_nonrel_file( - conf, - timeline, - timelineid, - snapshot, - pg_constants::GLOBALTABLESPACE_OID, - 0, - pg_constants::PG_FILENODEMAP_FORKNUM, - &direntry.path(), - )?, + conf, + timeline, + timelineid, + snapshot, + pg_constants::GLOBALTABLESPACE_OID, + 0, + pg_constants::PG_FILENODEMAP_FORKNUM, + 0, + &direntry.path(), + )?, // Load any relation files into the page server _ => restore_relfile( @@ -167,15 +169,16 @@ fn restore_snapshot( // These special files appear in the snapshot, but are not needed by the page server Some("PG_VERSION") => continue, Some("pg_filenode.map") => restore_nonrel_file( - conf, - timeline, - timelineid, - snapshot, + conf, + timeline, + timelineid, + snapshot, pg_constants::DEFAULTTABLESPACE_OID, dboid, - pg_constants::PG_FILENODEMAP_FORKNUM, - &direntry.path(), - )?, + pg_constants::PG_FILENODEMAP_FORKNUM, + 0, + &direntry.path(), + )?, // Load any relation files into the page server _ => restore_relfile( @@ -221,6 +224,21 @@ fn restore_snapshot( &entry.path(), )?; } + for entry in fs::read_dir(snapshotpath.join("pg_twophase"))? { + let entry = entry?; + let xid = u32::from_str_radix(&entry.path().to_str().unwrap(), 16)?; + restore_nonrel_file( + conf, + timeline, + timelineid, + snapshot, + 0, + 0, + pg_constants::PG_TWOPHASE_FORKNUM, + xid, + &entry.path(), + )?; + } // TODO: Scan pg_tblspc Ok(()) @@ -296,6 +314,7 @@ fn restore_nonrel_file( spcoid: Oid, dboid: Oid, forknum: u8, + blknum: u32, path: &Path, ) -> Result<()> { let lsn = Lsn::from_hex(snapshot)?; @@ -314,7 +333,7 @@ fn restore_nonrel_file( relnode: 0, forknum, }, - blknum: 0, + blknum, }; timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buffer[..])); Ok(()) @@ -344,7 +363,7 @@ fn restore_slru_file( let tag = BufferTag { rel: RelTag { spcnode: 0, - dbnode: 0, + dbnode: 0, relnode: 0, forknum, }, diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index ba8f9af6de..352de3451f 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -915,6 +915,11 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { let _xid = buf.get_u32_le(); trace!("XLOG_XACT_ABORT-XACT_XINFO_HAS_TWOPHASE"); } + } else if info == pg_constants::XLOG_XACT_PREPARE { + let mut blk = DecodedBkpBlock::new(); + blk.forknum = pg_constants::PG_TWOPHASE_FORKNUM; + blk.blkno = xlogrec.xl_xid; + blk.will_init = true; } } else if xlogrec.xl_rmid == pg_constants::RM_DBASE_ID { let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK; diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 5a7dde01e6..02167e5123 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -359,7 +359,7 @@ impl PostgresRedoManagerInternal { } } } - } else { + } else if info != pg_constants::XLOG_XACT_PREPARE { trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {} main_data_offset {}, rec.len {}", status, record.lsn, diff --git a/postgres_ffi/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs index ebbe687502..1596e02638 100644 --- a/postgres_ffi/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -22,6 +22,7 @@ pub const PG_FILENODEMAP_FORKNUM: u8 = 43; pub const PG_XACT_FORKNUM: u8 = 44; pub const PG_MXACT_OFFSETS_FORKNUM: u8 = 45; pub const PG_MXACT_MEMBERS_FORKNUM: u8 = 46; +pub const PG_TWOPHASE_FORKNUM: u8 = 47; // From storage_xlog.h pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001; @@ -50,6 +51,7 @@ pub const CLOG_TRUNCATE: u8 = 0x10; // From xact.h pub const XLOG_XACT_COMMIT: u8 = 0x00; +pub const XLOG_XACT_PREPARE: u8 = 0x10; pub const XLOG_XACT_ABORT: u8 = 0x20; // From srlu.h