mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 14:02:55 +00:00
Fix 2PC support
This commit is contained in:
@@ -5,6 +5,7 @@ use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
use tar::{Builder, Header};
|
||||
use walkdir::WalkDir;
|
||||
use bytes::{BufMut, BytesMut};
|
||||
|
||||
use crate::repository::{BufferTag, RelTag, Timeline};
|
||||
use postgres_ffi::relfile_utils::*;
|
||||
@@ -101,7 +102,6 @@ fn add_relmap_files(
|
||||
ar.append_path_with_name(&src_path, &dst_path)?;
|
||||
format!("base/{}/pg_filenode.map", db.dbnode)
|
||||
};
|
||||
info!("Deliver {}", path);
|
||||
assert!(img.len() == 512);
|
||||
let header = new_tar_header(&path, img.len() as u64)?;
|
||||
ar.append(&header, &img[..])?;
|
||||
@@ -128,9 +128,13 @@ fn add_twophase_files(
|
||||
blknum: *xid,
|
||||
};
|
||||
let img = timeline.get_page_at_lsn(tag, lsn)?;
|
||||
let mut buf = BytesMut::new();
|
||||
buf.extend_from_slice(&img[..]);
|
||||
let crc = crc32c::crc32c(&img[..]);
|
||||
buf.put_u32_le(crc);
|
||||
let path = format!("pg_twophase/{:>08X}", xid);
|
||||
let header = new_tar_header(&path, img.len() as u64)?;
|
||||
ar.append(&header, &img[..])?;
|
||||
let header = new_tar_header(&path, buf.len() as u64)?;
|
||||
ar.append(&header, &buf[..])?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -800,20 +800,21 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
||||
blocks.push(blk);
|
||||
} else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
|
||||
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||
if info == pg_constants::XLOG_XACT_COMMIT {
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||
trace!(
|
||||
"XLOG_XACT_COMMIT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
|
||||
xlogrec.xl_info, (xlogrec.xl_prev >> 32),
|
||||
xlogrec.xl_prev & 0xffffffff,
|
||||
xlogrec.xl_xid,
|
||||
blk.blkno,
|
||||
main_data_len
|
||||
);
|
||||
blocks.push(blk);
|
||||
|
||||
if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
|
||||
if info == pg_constants::XLOG_XACT_COMMIT {
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||
trace!(
|
||||
"XLOG_XACT_COMMIT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
|
||||
xlogrec.xl_info, (xlogrec.xl_prev >> 32),
|
||||
xlogrec.xl_prev & 0xffffffff,
|
||||
xlogrec.xl_xid,
|
||||
blk.blkno,
|
||||
main_data_len
|
||||
);
|
||||
blocks.push(blk);
|
||||
}
|
||||
//parse commit record to extract subtrans entries
|
||||
// xl_xact_commit starts with time of commit
|
||||
let _xact_time = buf.get_i64_le();
|
||||
@@ -864,23 +865,29 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
||||
}
|
||||
}
|
||||
if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
|
||||
let _xid = buf.get_u32_le();
|
||||
let xid = buf.get_u32_le();
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||
blk.blkno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||
blocks.push(blk);
|
||||
trace!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE");
|
||||
//TODO handle this to be able to restore pg_twophase on node start
|
||||
}
|
||||
} else if info == pg_constants::XLOG_XACT_ABORT {
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||
trace!(
|
||||
"XLOG_XACT_ABORT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
|
||||
xlogrec.xl_info, (xlogrec.xl_prev >> 32),
|
||||
xlogrec.xl_prev & 0xffffffff,
|
||||
xlogrec.xl_xid,
|
||||
blk.blkno,
|
||||
main_data_len
|
||||
);
|
||||
blocks.push(blk);
|
||||
} else if info == pg_constants::XLOG_XACT_ABORT || info == pg_constants::XLOG_XACT_ABORT_PREPARED {
|
||||
if info == pg_constants::XLOG_XACT_ABORT {
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||
trace!(
|
||||
"XLOG_XACT_ABORT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
|
||||
xlogrec.xl_info, (xlogrec.xl_prev >> 32),
|
||||
xlogrec.xl_prev & 0xffffffff,
|
||||
xlogrec.xl_xid,
|
||||
blk.blkno,
|
||||
main_data_len
|
||||
);
|
||||
blocks.push(blk);
|
||||
}
|
||||
//parse abort record to extract subtrans entries
|
||||
// xl_xact_abort starts with time of commit
|
||||
let _xact_time = buf.get_i64_le();
|
||||
@@ -924,7 +931,11 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
||||
}
|
||||
}
|
||||
if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
|
||||
let _xid = buf.get_u32_le();
|
||||
let xid = buf.get_u32_le();
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||
blk.blkno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||
blocks.push(blk);
|
||||
trace!("XLOG_XACT_ABORT-XACT_XINFO_HAS_TWOPHASE");
|
||||
}
|
||||
} else if info == pg_constants::XLOG_XACT_PREPARE {
|
||||
@@ -932,6 +943,8 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
||||
blk.forknum = pg_constants::PG_TWOPHASE_FORKNUM;
|
||||
blk.blkno = xlogrec.xl_xid;
|
||||
blk.will_init = true;
|
||||
blocks.push(blk);
|
||||
info!("Prepare transaction {}", xlogrec.xl_xid);
|
||||
}
|
||||
} else if xlogrec.xl_rmid == pg_constants::RM_DBASE_ID {
|
||||
let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK;
|
||||
|
||||
@@ -287,9 +287,11 @@ impl PostgresRedoManagerInternal {
|
||||
} else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
|
||||
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||
let mut status = 0;
|
||||
if info == pg_constants::XLOG_XACT_COMMIT {
|
||||
if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
|
||||
status = pg_constants::TRANSACTION_STATUS_COMMITTED;
|
||||
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
|
||||
if info == pg_constants::XLOG_XACT_COMMIT {
|
||||
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
|
||||
}
|
||||
//handle subtrans
|
||||
let _xact_time = buf.get_i64_le();
|
||||
let mut xinfo = 0;
|
||||
@@ -313,9 +315,38 @@ impl PostgresRedoManagerInternal {
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if info == pg_constants::XLOG_XACT_ABORT {
|
||||
if info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
|
||||
if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
|
||||
let nrels = buf.get_i32_le();
|
||||
for _i in 0..nrels {
|
||||
let spcnode = buf.get_u32_le();
|
||||
let dbnode = buf.get_u32_le();
|
||||
let relnode = buf.get_u32_le();
|
||||
//TODO handle this too?
|
||||
trace!(
|
||||
"XLOG_XACT_COMMIT relfilenode {}/{}/{}",
|
||||
spcnode,
|
||||
dbnode,
|
||||
relnode
|
||||
);
|
||||
}
|
||||
}
|
||||
if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
|
||||
let nmsgs = buf.get_i32_le();
|
||||
for _i in 0..nmsgs {
|
||||
let sizeof_shared_invalidation_message = 0;
|
||||
buf.advance(sizeof_shared_invalidation_message);
|
||||
}
|
||||
}
|
||||
assert!((xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE) != 0);
|
||||
let xid = buf.get_u32_le();
|
||||
transaction_id_set_status(xid, status, &mut page);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_XACT_ABORT || info == pg_constants::XLOG_XACT_ABORT_PREPARED {
|
||||
status = pg_constants::TRANSACTION_STATUS_ABORTED;
|
||||
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
|
||||
if info == pg_constants::XLOG_XACT_ABORT {
|
||||
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
|
||||
}
|
||||
//handle subtrans
|
||||
let _xact_time = buf.get_i64_le();
|
||||
let mut xinfo = 0;
|
||||
@@ -339,8 +370,39 @@ impl PostgresRedoManagerInternal {
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if info != pg_constants::XLOG_XACT_PREPARE {
|
||||
trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {} main_data_offset {}, rec.len {}",
|
||||
if info == pg_constants::XLOG_XACT_ABORT_PREPARED {
|
||||
if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
|
||||
let nrels = buf.get_i32_le();
|
||||
for _i in 0..nrels {
|
||||
let spcnode = buf.get_u32_le();
|
||||
let dbnode = buf.get_u32_le();
|
||||
let relnode = buf.get_u32_le();
|
||||
//TODO handle this too?
|
||||
trace!(
|
||||
"XLOG_XACT_COMMIT relfilenode {}/{}/{}",
|
||||
spcnode,
|
||||
dbnode,
|
||||
relnode
|
||||
);
|
||||
}
|
||||
}
|
||||
if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
|
||||
let nmsgs = buf.get_i32_le();
|
||||
for _i in 0..nmsgs {
|
||||
let sizeof_shared_invalidation_message = 0;
|
||||
buf.advance(sizeof_shared_invalidation_message);
|
||||
}
|
||||
}
|
||||
assert!((xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE) != 0);
|
||||
let xid = buf.get_u32_le();
|
||||
transaction_id_set_status(xid, status, &mut page);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_XACT_PREPARE {
|
||||
info!("Apply prepare {} record", xlogrec.xl_xid);
|
||||
page.clear();
|
||||
page.extend_from_slice(&buf[..]);
|
||||
} else {
|
||||
error!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {} main_data_offset {}, rec.len {}",
|
||||
status,
|
||||
record.lsn,
|
||||
record.main_data_offset, record.rec.len());
|
||||
|
||||
@@ -60,6 +60,8 @@ pub const CLOG_TRUNCATE: u8 = 0x10;
|
||||
pub const XLOG_XACT_COMMIT: u8 = 0x00;
|
||||
pub const XLOG_XACT_PREPARE: u8 = 0x10;
|
||||
pub const XLOG_XACT_ABORT: u8 = 0x20;
|
||||
pub const XLOG_XACT_COMMIT_PREPARED: u8 = 0x30;
|
||||
pub const XLOG_XACT_ABORT_PREPARED: u8 = 0x40;
|
||||
|
||||
// From srlu.h
|
||||
pub const SLRU_PAGES_PER_SEGMENT: u32 = 32;
|
||||
|
||||
58
test_runner/batch_others/test_twophase.py
Normal file
58
test_runner/batch_others/test_twophase.py
Normal file
@@ -0,0 +1,58 @@
|
||||
#
|
||||
# Test branching, when a transaction is in prepared state
|
||||
#
|
||||
import pytest
|
||||
import getpass
|
||||
import psycopg2
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
def test_twophase(zenith_cli, pageserver, postgres, pg_bin):
|
||||
zenith_cli.run(["branch", "test_twophase", "empty"]);
|
||||
|
||||
pg = postgres.create_start('test_twophase', ['max_prepared_transactions=5'])
|
||||
print("postgres is running on 'test_twophase' branch")
|
||||
|
||||
conn = psycopg2.connect(pg.connstr());
|
||||
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
cur = conn.cursor()
|
||||
|
||||
cur.execute('CREATE TABLE foo (t text)');
|
||||
|
||||
# Prepare a transaction that will insert a row
|
||||
cur.execute('BEGIN');
|
||||
cur.execute("INSERT INTO foo VALUES ('one')");
|
||||
cur.execute("PREPARE TRANSACTION 'insert_one'");
|
||||
|
||||
# Prepare another transaction that will insert a row
|
||||
cur.execute('BEGIN');
|
||||
cur.execute("INSERT INTO foo VALUES ('two')");
|
||||
cur.execute("PREPARE TRANSACTION 'insert_two'");
|
||||
|
||||
cur.execute('BEGIN');
|
||||
cur.execute("INSERT INTO foo VALUES ('three')");
|
||||
cur.execute("PREPARE TRANSACTION 'insert_three'");
|
||||
cur.execute("COMMIT PREPARED 'insert_three'");
|
||||
|
||||
cur.execute('SELECT pg_current_wal_insert_lsn()');
|
||||
lsn = cur.fetchone()[0]
|
||||
|
||||
# Create a branch with the transaction in prepared state
|
||||
zenith_cli.run(["branch", "test_twophase_prepared", "test_twophase@"+lsn]);
|
||||
|
||||
pg2 = postgres.create_start('test_twophase_prepared', ['max_prepared_transactions=5'])
|
||||
conn2 = psycopg2.connect(pg2.connstr());
|
||||
conn2.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
cur2 = conn2.cursor()
|
||||
|
||||
# On the new branch, commit one of the prepared transactions, abort the other one.
|
||||
cur2.execute("COMMIT PREPARED 'insert_one'");
|
||||
cur2.execute("ROLLBACK PREPARED 'insert_two'");
|
||||
|
||||
cur2.execute('SELECT * FROM foo');
|
||||
assert(cur2.fetchall() == [('one',),('three',)]);
|
||||
|
||||
# Neither insert is visible on the original branch, the transactions are still
|
||||
# in prepared state there.
|
||||
cur.execute('SELECT * FROM foo');
|
||||
assert(cur.fetchall() == [('three',)]);
|
||||
Reference in New Issue
Block a user