mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-02 18:20:37 +00:00
Compare commits
1 Commits
hack/compu
...
two_phase_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e9c7665c81 |
@@ -5,6 +5,7 @@ use std::sync::Arc;
|
|||||||
use std::time::SystemTime;
|
use std::time::SystemTime;
|
||||||
use tar::{Builder, Header};
|
use tar::{Builder, Header};
|
||||||
use walkdir::WalkDir;
|
use walkdir::WalkDir;
|
||||||
|
use bytes::{BufMut, BytesMut};
|
||||||
|
|
||||||
use crate::repository::{BufferTag, RelTag, Timeline};
|
use crate::repository::{BufferTag, RelTag, Timeline};
|
||||||
use postgres_ffi::relfile_utils::*;
|
use postgres_ffi::relfile_utils::*;
|
||||||
@@ -101,7 +102,6 @@ fn add_relmap_files(
|
|||||||
ar.append_path_with_name(&src_path, &dst_path)?;
|
ar.append_path_with_name(&src_path, &dst_path)?;
|
||||||
format!("base/{}/pg_filenode.map", db.dbnode)
|
format!("base/{}/pg_filenode.map", db.dbnode)
|
||||||
};
|
};
|
||||||
info!("Deliver {}", path);
|
|
||||||
assert!(img.len() == 512);
|
assert!(img.len() == 512);
|
||||||
let header = new_tar_header(&path, img.len() as u64)?;
|
let header = new_tar_header(&path, img.len() as u64)?;
|
||||||
ar.append(&header, &img[..])?;
|
ar.append(&header, &img[..])?;
|
||||||
@@ -128,9 +128,13 @@ fn add_twophase_files(
|
|||||||
blknum: *xid,
|
blknum: *xid,
|
||||||
};
|
};
|
||||||
let img = timeline.get_page_at_lsn(tag, lsn)?;
|
let img = timeline.get_page_at_lsn(tag, lsn)?;
|
||||||
|
let mut buf = BytesMut::new();
|
||||||
|
buf.extend_from_slice(&img[..]);
|
||||||
|
let crc = crc32c::crc32c(&img[..]);
|
||||||
|
buf.put_u32_le(crc);
|
||||||
let path = format!("pg_twophase/{:>08X}", xid);
|
let path = format!("pg_twophase/{:>08X}", xid);
|
||||||
let header = new_tar_header(&path, img.len() as u64)?;
|
let header = new_tar_header(&path, buf.len() as u64)?;
|
||||||
ar.append(&header, &img[..])?;
|
ar.append(&header, &buf[..])?;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -800,20 +800,21 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
|||||||
blocks.push(blk);
|
blocks.push(blk);
|
||||||
} else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
|
} else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
|
||||||
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||||
if info == pg_constants::XLOG_XACT_COMMIT {
|
if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
|
||||||
let mut blk = DecodedBkpBlock::new();
|
if info == pg_constants::XLOG_XACT_COMMIT {
|
||||||
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
let mut blk = DecodedBkpBlock::new();
|
||||||
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||||
trace!(
|
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||||
"XLOG_XACT_COMMIT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
|
trace!(
|
||||||
xlogrec.xl_info, (xlogrec.xl_prev >> 32),
|
"XLOG_XACT_COMMIT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
|
||||||
xlogrec.xl_prev & 0xffffffff,
|
xlogrec.xl_info, (xlogrec.xl_prev >> 32),
|
||||||
xlogrec.xl_xid,
|
xlogrec.xl_prev & 0xffffffff,
|
||||||
blk.blkno,
|
xlogrec.xl_xid,
|
||||||
main_data_len
|
blk.blkno,
|
||||||
);
|
main_data_len
|
||||||
blocks.push(blk);
|
);
|
||||||
|
blocks.push(blk);
|
||||||
|
}
|
||||||
//parse commit record to extract subtrans entries
|
//parse commit record to extract subtrans entries
|
||||||
// xl_xact_commit starts with time of commit
|
// xl_xact_commit starts with time of commit
|
||||||
let _xact_time = buf.get_i64_le();
|
let _xact_time = buf.get_i64_le();
|
||||||
@@ -864,23 +865,29 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
|
if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
|
||||||
let _xid = buf.get_u32_le();
|
let xid = buf.get_u32_le();
|
||||||
|
let mut blk = DecodedBkpBlock::new();
|
||||||
|
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||||
|
blk.blkno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||||
|
blocks.push(blk);
|
||||||
trace!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE");
|
trace!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE");
|
||||||
//TODO handle this to be able to restore pg_twophase on node start
|
//TODO handle this to be able to restore pg_twophase on node start
|
||||||
}
|
}
|
||||||
} else if info == pg_constants::XLOG_XACT_ABORT {
|
} else if info == pg_constants::XLOG_XACT_ABORT || info == pg_constants::XLOG_XACT_ABORT_PREPARED {
|
||||||
let mut blk = DecodedBkpBlock::new();
|
if info == pg_constants::XLOG_XACT_ABORT {
|
||||||
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
let mut blk = DecodedBkpBlock::new();
|
||||||
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||||
trace!(
|
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||||
"XLOG_XACT_ABORT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
|
trace!(
|
||||||
xlogrec.xl_info, (xlogrec.xl_prev >> 32),
|
"XLOG_XACT_ABORT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
|
||||||
xlogrec.xl_prev & 0xffffffff,
|
xlogrec.xl_info, (xlogrec.xl_prev >> 32),
|
||||||
xlogrec.xl_xid,
|
xlogrec.xl_prev & 0xffffffff,
|
||||||
blk.blkno,
|
xlogrec.xl_xid,
|
||||||
main_data_len
|
blk.blkno,
|
||||||
);
|
main_data_len
|
||||||
blocks.push(blk);
|
);
|
||||||
|
blocks.push(blk);
|
||||||
|
}
|
||||||
//parse abort record to extract subtrans entries
|
//parse abort record to extract subtrans entries
|
||||||
// xl_xact_abort starts with time of commit
|
// xl_xact_abort starts with time of commit
|
||||||
let _xact_time = buf.get_i64_le();
|
let _xact_time = buf.get_i64_le();
|
||||||
@@ -924,7 +931,11 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
|
if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
|
||||||
let _xid = buf.get_u32_le();
|
let xid = buf.get_u32_le();
|
||||||
|
let mut blk = DecodedBkpBlock::new();
|
||||||
|
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||||
|
blk.blkno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||||
|
blocks.push(blk);
|
||||||
trace!("XLOG_XACT_ABORT-XACT_XINFO_HAS_TWOPHASE");
|
trace!("XLOG_XACT_ABORT-XACT_XINFO_HAS_TWOPHASE");
|
||||||
}
|
}
|
||||||
} else if info == pg_constants::XLOG_XACT_PREPARE {
|
} else if info == pg_constants::XLOG_XACT_PREPARE {
|
||||||
@@ -932,6 +943,8 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
|||||||
blk.forknum = pg_constants::PG_TWOPHASE_FORKNUM;
|
blk.forknum = pg_constants::PG_TWOPHASE_FORKNUM;
|
||||||
blk.blkno = xlogrec.xl_xid;
|
blk.blkno = xlogrec.xl_xid;
|
||||||
blk.will_init = true;
|
blk.will_init = true;
|
||||||
|
blocks.push(blk);
|
||||||
|
info!("Prepare transaction {}", xlogrec.xl_xid);
|
||||||
}
|
}
|
||||||
} else if xlogrec.xl_rmid == pg_constants::RM_DBASE_ID {
|
} else if xlogrec.xl_rmid == pg_constants::RM_DBASE_ID {
|
||||||
let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK;
|
let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK;
|
||||||
|
|||||||
@@ -287,9 +287,11 @@ impl PostgresRedoManagerInternal {
|
|||||||
} else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
|
} else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
|
||||||
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||||
let mut status = 0;
|
let mut status = 0;
|
||||||
if info == pg_constants::XLOG_XACT_COMMIT {
|
if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
|
||||||
status = pg_constants::TRANSACTION_STATUS_COMMITTED;
|
status = pg_constants::TRANSACTION_STATUS_COMMITTED;
|
||||||
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
|
if info == pg_constants::XLOG_XACT_COMMIT {
|
||||||
|
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
|
||||||
|
}
|
||||||
//handle subtrans
|
//handle subtrans
|
||||||
let _xact_time = buf.get_i64_le();
|
let _xact_time = buf.get_i64_le();
|
||||||
let mut xinfo = 0;
|
let mut xinfo = 0;
|
||||||
@@ -313,9 +315,38 @@ impl PostgresRedoManagerInternal {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if info == pg_constants::XLOG_XACT_ABORT {
|
if info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
|
||||||
|
if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
|
||||||
|
let nrels = buf.get_i32_le();
|
||||||
|
for _i in 0..nrels {
|
||||||
|
let spcnode = buf.get_u32_le();
|
||||||
|
let dbnode = buf.get_u32_le();
|
||||||
|
let relnode = buf.get_u32_le();
|
||||||
|
//TODO handle this too?
|
||||||
|
trace!(
|
||||||
|
"XLOG_XACT_COMMIT relfilenode {}/{}/{}",
|
||||||
|
spcnode,
|
||||||
|
dbnode,
|
||||||
|
relnode
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
|
||||||
|
let nmsgs = buf.get_i32_le();
|
||||||
|
for _i in 0..nmsgs {
|
||||||
|
let sizeof_shared_invalidation_message = 0;
|
||||||
|
buf.advance(sizeof_shared_invalidation_message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert!((xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE) != 0);
|
||||||
|
let xid = buf.get_u32_le();
|
||||||
|
transaction_id_set_status(xid, status, &mut page);
|
||||||
|
}
|
||||||
|
} else if info == pg_constants::XLOG_XACT_ABORT || info == pg_constants::XLOG_XACT_ABORT_PREPARED {
|
||||||
status = pg_constants::TRANSACTION_STATUS_ABORTED;
|
status = pg_constants::TRANSACTION_STATUS_ABORTED;
|
||||||
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
|
if info == pg_constants::XLOG_XACT_ABORT {
|
||||||
|
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
|
||||||
|
}
|
||||||
//handle subtrans
|
//handle subtrans
|
||||||
let _xact_time = buf.get_i64_le();
|
let _xact_time = buf.get_i64_le();
|
||||||
let mut xinfo = 0;
|
let mut xinfo = 0;
|
||||||
@@ -339,8 +370,39 @@ impl PostgresRedoManagerInternal {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if info != pg_constants::XLOG_XACT_PREPARE {
|
if info == pg_constants::XLOG_XACT_ABORT_PREPARED {
|
||||||
trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {} main_data_offset {}, rec.len {}",
|
if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
|
||||||
|
let nrels = buf.get_i32_le();
|
||||||
|
for _i in 0..nrels {
|
||||||
|
let spcnode = buf.get_u32_le();
|
||||||
|
let dbnode = buf.get_u32_le();
|
||||||
|
let relnode = buf.get_u32_le();
|
||||||
|
//TODO handle this too?
|
||||||
|
trace!(
|
||||||
|
"XLOG_XACT_COMMIT relfilenode {}/{}/{}",
|
||||||
|
spcnode,
|
||||||
|
dbnode,
|
||||||
|
relnode
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
|
||||||
|
let nmsgs = buf.get_i32_le();
|
||||||
|
for _i in 0..nmsgs {
|
||||||
|
let sizeof_shared_invalidation_message = 0;
|
||||||
|
buf.advance(sizeof_shared_invalidation_message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert!((xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE) != 0);
|
||||||
|
let xid = buf.get_u32_le();
|
||||||
|
transaction_id_set_status(xid, status, &mut page);
|
||||||
|
}
|
||||||
|
} else if info == pg_constants::XLOG_XACT_PREPARE {
|
||||||
|
info!("Apply prepare {} record", xlogrec.xl_xid);
|
||||||
|
page.clear();
|
||||||
|
page.extend_from_slice(&buf[..]);
|
||||||
|
} else {
|
||||||
|
error!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {} main_data_offset {}, rec.len {}",
|
||||||
status,
|
status,
|
||||||
record.lsn,
|
record.lsn,
|
||||||
record.main_data_offset, record.rec.len());
|
record.main_data_offset, record.rec.len());
|
||||||
|
|||||||
@@ -60,6 +60,8 @@ pub const CLOG_TRUNCATE: u8 = 0x10;
|
|||||||
pub const XLOG_XACT_COMMIT: u8 = 0x00;
|
pub const XLOG_XACT_COMMIT: u8 = 0x00;
|
||||||
pub const XLOG_XACT_PREPARE: u8 = 0x10;
|
pub const XLOG_XACT_PREPARE: u8 = 0x10;
|
||||||
pub const XLOG_XACT_ABORT: u8 = 0x20;
|
pub const XLOG_XACT_ABORT: u8 = 0x20;
|
||||||
|
pub const XLOG_XACT_COMMIT_PREPARED: u8 = 0x30;
|
||||||
|
pub const XLOG_XACT_ABORT_PREPARED: u8 = 0x40;
|
||||||
|
|
||||||
// From srlu.h
|
// From srlu.h
|
||||||
pub const SLRU_PAGES_PER_SEGMENT: u32 = 32;
|
pub const SLRU_PAGES_PER_SEGMENT: u32 = 32;
|
||||||
|
|||||||
58
test_runner/batch_others/test_twophase.py
Normal file
58
test_runner/batch_others/test_twophase.py
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
#
|
||||||
|
# Test branching, when a transaction is in prepared state
|
||||||
|
#
|
||||||
|
import pytest
|
||||||
|
import getpass
|
||||||
|
import psycopg2
|
||||||
|
|
||||||
|
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||||
|
|
||||||
|
def test_twophase(zenith_cli, pageserver, postgres, pg_bin):
|
||||||
|
zenith_cli.run(["branch", "test_twophase", "empty"]);
|
||||||
|
|
||||||
|
pg = postgres.create_start('test_twophase', ['max_prepared_transactions=5'])
|
||||||
|
print("postgres is running on 'test_twophase' branch")
|
||||||
|
|
||||||
|
conn = psycopg2.connect(pg.connstr());
|
||||||
|
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||||
|
cur = conn.cursor()
|
||||||
|
|
||||||
|
cur.execute('CREATE TABLE foo (t text)');
|
||||||
|
|
||||||
|
# Prepare a transaction that will insert a row
|
||||||
|
cur.execute('BEGIN');
|
||||||
|
cur.execute("INSERT INTO foo VALUES ('one')");
|
||||||
|
cur.execute("PREPARE TRANSACTION 'insert_one'");
|
||||||
|
|
||||||
|
# Prepare another transaction that will insert a row
|
||||||
|
cur.execute('BEGIN');
|
||||||
|
cur.execute("INSERT INTO foo VALUES ('two')");
|
||||||
|
cur.execute("PREPARE TRANSACTION 'insert_two'");
|
||||||
|
|
||||||
|
cur.execute('BEGIN');
|
||||||
|
cur.execute("INSERT INTO foo VALUES ('three')");
|
||||||
|
cur.execute("PREPARE TRANSACTION 'insert_three'");
|
||||||
|
cur.execute("COMMIT PREPARED 'insert_three'");
|
||||||
|
|
||||||
|
cur.execute('SELECT pg_current_wal_insert_lsn()');
|
||||||
|
lsn = cur.fetchone()[0]
|
||||||
|
|
||||||
|
# Create a branch with the transaction in prepared state
|
||||||
|
zenith_cli.run(["branch", "test_twophase_prepared", "test_twophase@"+lsn]);
|
||||||
|
|
||||||
|
pg2 = postgres.create_start('test_twophase_prepared', ['max_prepared_transactions=5'])
|
||||||
|
conn2 = psycopg2.connect(pg2.connstr());
|
||||||
|
conn2.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||||
|
cur2 = conn2.cursor()
|
||||||
|
|
||||||
|
# On the new branch, commit one of the prepared transactions, abort the other one.
|
||||||
|
cur2.execute("COMMIT PREPARED 'insert_one'");
|
||||||
|
cur2.execute("ROLLBACK PREPARED 'insert_two'");
|
||||||
|
|
||||||
|
cur2.execute('SELECT * FROM foo');
|
||||||
|
assert(cur2.fetchall() == [('one',),('three',)]);
|
||||||
|
|
||||||
|
# Neither insert is visible on the original branch, the transactions are still
|
||||||
|
# in prepared state there.
|
||||||
|
cur.execute('SELECT * FROM foo');
|
||||||
|
assert(cur.fetchall() == [('three',)]);
|
||||||
Reference in New Issue
Block a user