Add support of twophase transactions

This commit is contained in:
Konstantin Knizhnik
2021-05-16 00:03:08 +03:00
parent 6b11b4250e
commit 04dc698d4b
8 changed files with 169 additions and 55 deletions

View File

@@ -12,17 +12,22 @@ use postgres_ffi::relfile_utils::*;
use zenith_utils::lsn::Lsn;
fn new_tar_header(path: &str, size: u64) -> anyhow::Result<Header> {
let mut header = Header::new_gnu();
header.set_size(size);
let mut header = Header::new_gnu();
header.set_size(size);
header.set_path(path)?;
header.set_mode(0b110000000);
header.set_mtime(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs());
header.set_cksum();
Ok(header)
header.set_mode(0b110000000);
header.set_mtime(
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs(),
);
header.set_cksum();
Ok(header)
}
//
// Generate SRLU segment files from repoistory
// Generate SRLU segment files from repository
//
fn add_slru_segments(
ar: &mut Builder<&mut dyn Write>,
@@ -52,7 +57,7 @@ fn add_slru_segments(
let segno = page / pg_constants::SLRU_PAGES_PER_SEGMENT;
if curr_segno.is_some() && curr_segno.unwrap() != segno {
let segname = format!("{}/{:>04X}", path, curr_segno.unwrap());
let header = new_tar_header(&segname, SEG_SIZE as u64)?;
let header = new_tar_header(&segname, SEG_SIZE as u64)?;
ar.append(&header, &seg_buf[..])?;
seg_buf = [0u8; SEG_SIZE];
}
@@ -65,21 +70,21 @@ fn add_slru_segments(
}
if curr_segno.is_some() {
let segname = format!("{}/{:>04X}", path, curr_segno.unwrap());
let header = new_tar_header(&segname, SEG_SIZE as u64)?;
let header = new_tar_header(&segname, SEG_SIZE as u64)?;
ar.append(&header, &seg_buf[..])?;
}
Ok(())
}
//
// Extract pg_filenode.map files from repoistory
// Extract pg_filenode.map files from repository
//
fn add_relmap_files(
ar: &mut Builder<&mut dyn Write>,
timeline: &Arc<dyn Timeline>,
lsn: Lsn,
) -> anyhow::Result<()> {
for db in timeline.get_databases()?.iter() {
for db in timeline.get_databases(lsn)?.iter() {
let tag = BufferTag {
rel: *db,
blknum: 0,
@@ -92,7 +97,34 @@ fn add_relmap_files(
assert!(db.spcnode == pg_constants::DEFAULTTABLESPACE_OID);
format!("base/{}/pg_filenode.map", db.dbnode)
};
let header = new_tar_header(&path, 512)?;
assert!(img.len() == 512);
let header = new_tar_header(&path, img.len() as u64)?;
ar.append(&header, &img[..])?;
}
Ok(())
}
//
// Extract twophase state files
//
fn add_twophase_files(
ar: &mut Builder<&mut dyn Write>,
timeline: &Arc<dyn Timeline>,
lsn: Lsn,
) -> anyhow::Result<()> {
for xid in timeline.get_twophase(lsn)?.iter() {
let tag = BufferTag {
rel: RelTag {
spcnode: 0,
dbnode: 0,
relnode: 0,
forknum: pg_constants::PG_TWOPHASE_FORKNUM,
},
blknum: *xid,
};
let img = timeline.get_page_at_lsn(tag, lsn)?;
let path = format!("pg_twophase/{:>08X}", xid);
let header = new_tar_header(&path, img.len() as u64)?;
ar.append(&header, &img[..])?;
}
Ok(())
@@ -175,6 +207,7 @@ pub fn send_tarball_at_lsn(
lsn,
)?;
add_relmap_files(&mut ar, timeline, lsn)?;
add_twophase_files(&mut ar, timeline, lsn)?;
// FIXME: Also send all the WAL. The compute node would only need
// the WAL that applies to non-relation files, because the page

View File

@@ -827,12 +827,14 @@ impl Connection {
) -> anyhow::Result<()> {
// check that the timeline exists
let repository = page_cache::get_repository();
let timeline = repository.get_or_restore_timeline(timelineid).map_err(|_| {
anyhow!(
let timeline = repository
.get_or_restore_timeline(timelineid)
.map_err(|_| {
anyhow!(
"client requested basebackup on timeline {} which does not exist in page server",
timelineid
)
})?;
})?;
/* switch client to COPYOUT */
let stream = &mut self.stream;
stream.write_u8(b'H')?;

View File

@@ -1,6 +1,6 @@
pub mod rocksdb;
use crate::waldecoder::{DecodedWALRecord, Oid};
use crate::waldecoder::{DecodedWALRecord, Oid, TransactionId};
use crate::ZTimelineId;
use anyhow::Result;
use bytes::{Buf, BufMut, Bytes, BytesMut};
@@ -112,7 +112,10 @@ pub trait Timeline {
fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)>;
/// Get vector of databases (represented using RelTag only dbnode and spcnode fields are used)
fn get_databases(&self) -> Result<Vec<RelTag>>;
fn get_databases(&self, lsn: Lsn) -> Result<Vec<RelTag>>;
/// Get vector of prepared twophase transactions
fn get_twophase(&self, lsn: Lsn) -> Result<Vec<TransactionId>>;
}
#[derive(Clone)]

View File

@@ -7,7 +7,7 @@
use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord};
use crate::restore_local_repo::restore_timeline;
use crate::waldecoder::{DecodedWALRecord, Oid, XlCreateDatabase, XlSmgrTruncate};
use crate::waldecoder::{DecodedWALRecord, Oid, TransactionId, XlCreateDatabase, XlSmgrTruncate};
use crate::walredo::WalRedoManager;
use crate::PageServerConf;
use crate::ZTimelineId;
@@ -423,6 +423,19 @@ impl RocksTimeline {
let mut minkey = maxkey.clone();
minkey.lsn = Lsn(0); // first version
// Special handling of delete of PREPARE WAL record
if last_lsn < horizon
&& key.tag.rel.forknum == pg_constants::PG_TWOPHASE_FORKNUM
{
if (v[0] & UNUSED_VERSION_FLAG) == 0 {
let mut v = v.to_owned();
v[0] |= UNUSED_VERSION_FLAG;
self.db.put(k, &v[..])?;
deleted += 1;
}
maxkey = minkey;
continue;
}
// reconstruct most recent page version
if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD {
// force reconstruction of most recent page version
@@ -632,8 +645,45 @@ impl Timeline for RocksTimeline {
self.relsize_get_nowait(rel, lsn)
}
/// Get vector of prepared twophase transactions
fn get_twophase(&self, lsn: Lsn) -> Result<Vec<TransactionId>> {
let key = CacheKey {
// minimal key
tag: BufferTag {
rel: RelTag {
forknum: pg_constants::PG_TWOPHASE_FORKNUM,
spcnode: 0,
dbnode: 0,
relnode: 0,
},
blknum: 0,
},
lsn: Lsn(0),
};
let mut buf = BytesMut::new();
key.pack(&mut buf);
let mut gxacts = Vec::new();
let mut iter = self.db.raw_iterator();
iter.seek(&buf[..]);
while iter.valid() {
let k = iter.key().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut buf);
if key.tag.rel.forknum != pg_constants::PG_TWOPHASE_FORKNUM {
break; // we are done with this fork
}
if key.lsn <= lsn {
gxacts.push(key.tag.blknum); // XID
}
iter.next();
}
return Ok(gxacts);
}
/// Get databases. This function is used to local pg_filenode.map files
fn get_databases(&self) -> Result<Vec<RelTag>> {
fn get_databases(&self, lsn: Lsn) -> Result<Vec<RelTag>> {
let key = CacheKey {
// minimal key
tag: BufferTag {
@@ -658,13 +708,13 @@ impl Timeline for RocksTimeline {
let k = iter.key().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let tag = RelTag::unpack(&mut buf);
if tag.forknum != pg_constants::PG_FILENODEMAP_FORKNUM {
let key = CacheKey::unpack(&mut buf);
if key.tag.rel.forknum != pg_constants::PG_FILENODEMAP_FORKNUM {
break; // we are done with this fork
}
if tag != prev_tag {
dbs.push(tag); // collect unique tags
prev_tag = tag;
if key.tag.rel != prev_tag && key.lsn <= lsn {
prev_tag = key.tag.rel;
dbs.push(prev_tag); // collect unique tags
}
iter.next();
}
@@ -816,8 +866,8 @@ impl Timeline for RocksTimeline {
let mut val_buf = BytesMut::new();
content.pack(&mut val_buf);
// Zero size of page image indicates that SLRU page was truncated
if img_len == 0 && key.tag.rel.forknum > pg_constants::PG_XACT_FORKNUM {
// Zero size of page image indicates that page can be removed
if img_len == 0 {
if (val_buf[0] & UNUSED_VERSION_FLAG) != 0 {
// records already marked for deletion
return;

View File

@@ -23,7 +23,7 @@ use anyhow::Result;
use bytes::Bytes;
use crate::repository::{BufferTag, RelTag, Timeline};
use crate::waldecoder::{Oid, decode_wal_record, WalStreamDecoder};
use crate::waldecoder::{decode_wal_record, Oid, WalStreamDecoder};
use crate::PageServerConf;
use crate::ZTimelineId;
use postgres_ffi::pg_constants;
@@ -121,25 +121,27 @@ fn restore_snapshot(
// These special files appear in the snapshot, but are not needed by the page server
Some("pg_control") => restore_nonrel_file(
conf,
timeline,
timelineid,
snapshot,
pg_constants::GLOBALTABLESPACE_OID,
0,
pg_constants::PG_CONTROLFILE_FORKNUM,
&direntry.path(),
)?,
conf,
timeline,
timelineid,
snapshot,
pg_constants::GLOBALTABLESPACE_OID,
0,
pg_constants::PG_CONTROLFILE_FORKNUM,
0,
&direntry.path(),
)?,
Some("pg_filenode.map") => restore_nonrel_file(
conf,
timeline,
timelineid,
snapshot,
pg_constants::GLOBALTABLESPACE_OID,
0,
pg_constants::PG_FILENODEMAP_FORKNUM,
&direntry.path(),
)?,
conf,
timeline,
timelineid,
snapshot,
pg_constants::GLOBALTABLESPACE_OID,
0,
pg_constants::PG_FILENODEMAP_FORKNUM,
0,
&direntry.path(),
)?,
// Load any relation files into the page server
_ => restore_relfile(
@@ -167,15 +169,16 @@ fn restore_snapshot(
// These special files appear in the snapshot, but are not needed by the page server
Some("PG_VERSION") => continue,
Some("pg_filenode.map") => restore_nonrel_file(
conf,
timeline,
timelineid,
snapshot,
conf,
timeline,
timelineid,
snapshot,
pg_constants::DEFAULTTABLESPACE_OID,
dboid,
pg_constants::PG_FILENODEMAP_FORKNUM,
&direntry.path(),
)?,
pg_constants::PG_FILENODEMAP_FORKNUM,
0,
&direntry.path(),
)?,
// Load any relation files into the page server
_ => restore_relfile(
@@ -221,6 +224,21 @@ fn restore_snapshot(
&entry.path(),
)?;
}
for entry in fs::read_dir(snapshotpath.join("pg_twophase"))? {
let entry = entry?;
let xid = u32::from_str_radix(&entry.path().to_str().unwrap(), 16)?;
restore_nonrel_file(
conf,
timeline,
timelineid,
snapshot,
0,
0,
pg_constants::PG_TWOPHASE_FORKNUM,
xid,
&entry.path(),
)?;
}
// TODO: Scan pg_tblspc
Ok(())
@@ -296,6 +314,7 @@ fn restore_nonrel_file(
spcoid: Oid,
dboid: Oid,
forknum: u8,
blknum: u32,
path: &Path,
) -> Result<()> {
let lsn = Lsn::from_hex(snapshot)?;
@@ -314,7 +333,7 @@ fn restore_nonrel_file(
relnode: 0,
forknum,
},
blknum: 0,
blknum,
};
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buffer[..]));
Ok(())
@@ -344,7 +363,7 @@ fn restore_slru_file(
let tag = BufferTag {
rel: RelTag {
spcnode: 0,
dbnode: 0,
dbnode: 0,
relnode: 0,
forknum,
},

View File

@@ -915,6 +915,11 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
let _xid = buf.get_u32_le();
trace!("XLOG_XACT_ABORT-XACT_XINFO_HAS_TWOPHASE");
}
} else if info == pg_constants::XLOG_XACT_PREPARE {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_TWOPHASE_FORKNUM;
blk.blkno = xlogrec.xl_xid;
blk.will_init = true;
}
} else if xlogrec.xl_rmid == pg_constants::RM_DBASE_ID {
let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK;

View File

@@ -359,7 +359,7 @@ impl PostgresRedoManagerInternal {
}
}
}
} else {
} else if info != pg_constants::XLOG_XACT_PREPARE {
trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {} main_data_offset {}, rec.len {}",
status,
record.lsn,

View File

@@ -22,6 +22,7 @@ pub const PG_FILENODEMAP_FORKNUM: u8 = 43;
pub const PG_XACT_FORKNUM: u8 = 44;
pub const PG_MXACT_OFFSETS_FORKNUM: u8 = 45;
pub const PG_MXACT_MEMBERS_FORKNUM: u8 = 46;
pub const PG_TWOPHASE_FORKNUM: u8 = 47;
// From storage_xlog.h
pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001;
@@ -50,6 +51,7 @@ pub const CLOG_TRUNCATE: u8 = 0x10;
// From xact.h
pub const XLOG_XACT_COMMIT: u8 = 0x00;
pub const XLOG_XACT_PREPARE: u8 = 0x10;
pub const XLOG_XACT_ABORT: u8 = 0x20;
// From srlu.h