From e9c7665c81a56f5cdc3920f3abe3be8456e20c3e Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Fri, 21 May 2021 20:01:44 +0300 Subject: [PATCH] Fix 2PC support --- pageserver/src/basebackup.rs | 10 ++- pageserver/src/waldecoder.rs | 71 +++++++++++++--------- pageserver/src/walredo.rs | 74 +++++++++++++++++++++-- postgres_ffi/src/pg_constants.rs | 2 + test_runner/batch_others/test_twophase.py | 58 ++++++++++++++++++ 5 files changed, 177 insertions(+), 38 deletions(-) create mode 100644 test_runner/batch_others/test_twophase.py diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 7ac1f30cf2..740b00beba 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use std::time::SystemTime; use tar::{Builder, Header}; use walkdir::WalkDir; +use bytes::{BufMut, BytesMut}; use crate::repository::{BufferTag, RelTag, Timeline}; use postgres_ffi::relfile_utils::*; @@ -101,7 +102,6 @@ fn add_relmap_files( ar.append_path_with_name(&src_path, &dst_path)?; format!("base/{}/pg_filenode.map", db.dbnode) }; - info!("Deliver {}", path); assert!(img.len() == 512); let header = new_tar_header(&path, img.len() as u64)?; ar.append(&header, &img[..])?; @@ -128,9 +128,13 @@ fn add_twophase_files( blknum: *xid, }; let img = timeline.get_page_at_lsn(tag, lsn)?; + let mut buf = BytesMut::new(); + buf.extend_from_slice(&img[..]); + let crc = crc32c::crc32c(&img[..]); + buf.put_u32_le(crc); let path = format!("pg_twophase/{:>08X}", xid); - let header = new_tar_header(&path, img.len() as u64)?; - ar.append(&header, &img[..])?; + let header = new_tar_header(&path, buf.len() as u64)?; + ar.append(&header, &buf[..])?; } Ok(()) } diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 8ab7c2e507..de9f312cd1 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -800,20 +800,21 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW blocks.push(blk); } else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID { let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; - if info == pg_constants::XLOG_XACT_COMMIT { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_XACT_FORKNUM; - blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE; - trace!( - "XLOG_XACT_COMMIT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}", - xlogrec.xl_info, (xlogrec.xl_prev >> 32), - xlogrec.xl_prev & 0xffffffff, - xlogrec.xl_xid, - blk.blkno, - main_data_len - ); - blocks.push(blk); - + if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_COMMIT_PREPARED { + if info == pg_constants::XLOG_XACT_COMMIT { + let mut blk = DecodedBkpBlock::new(); + blk.forknum = pg_constants::PG_XACT_FORKNUM; + blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE; + trace!( + "XLOG_XACT_COMMIT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}", + xlogrec.xl_info, (xlogrec.xl_prev >> 32), + xlogrec.xl_prev & 0xffffffff, + xlogrec.xl_xid, + blk.blkno, + main_data_len + ); + blocks.push(blk); + } //parse commit record to extract subtrans entries // xl_xact_commit starts with time of commit let _xact_time = buf.get_i64_le(); @@ -864,23 +865,29 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW } } if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 { - let _xid = buf.get_u32_le(); + let xid = buf.get_u32_le(); + let mut blk = DecodedBkpBlock::new(); + blk.forknum = pg_constants::PG_XACT_FORKNUM; + blk.blkno = xid / pg_constants::CLOG_XACTS_PER_PAGE; + blocks.push(blk); trace!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE"); //TODO handle this to be able to restore pg_twophase on node start } - } else if info == pg_constants::XLOG_XACT_ABORT { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_XACT_FORKNUM; - blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE; - trace!( - "XLOG_XACT_ABORT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}", - xlogrec.xl_info, (xlogrec.xl_prev >> 32), - xlogrec.xl_prev & 0xffffffff, - xlogrec.xl_xid, - blk.blkno, - main_data_len - ); - blocks.push(blk); + } else if info == pg_constants::XLOG_XACT_ABORT || info == pg_constants::XLOG_XACT_ABORT_PREPARED { + if info == pg_constants::XLOG_XACT_ABORT { + let mut blk = DecodedBkpBlock::new(); + blk.forknum = pg_constants::PG_XACT_FORKNUM; + blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE; + trace!( + "XLOG_XACT_ABORT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}", + xlogrec.xl_info, (xlogrec.xl_prev >> 32), + xlogrec.xl_prev & 0xffffffff, + xlogrec.xl_xid, + blk.blkno, + main_data_len + ); + blocks.push(blk); + } //parse abort record to extract subtrans entries // xl_xact_abort starts with time of commit let _xact_time = buf.get_i64_le(); @@ -924,7 +931,11 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW } } if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 { - let _xid = buf.get_u32_le(); + let xid = buf.get_u32_le(); + let mut blk = DecodedBkpBlock::new(); + blk.forknum = pg_constants::PG_XACT_FORKNUM; + blk.blkno = xid / pg_constants::CLOG_XACTS_PER_PAGE; + blocks.push(blk); trace!("XLOG_XACT_ABORT-XACT_XINFO_HAS_TWOPHASE"); } } else if info == pg_constants::XLOG_XACT_PREPARE { @@ -932,6 +943,8 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW blk.forknum = pg_constants::PG_TWOPHASE_FORKNUM; blk.blkno = xlogrec.xl_xid; blk.will_init = true; + blocks.push(blk); + info!("Prepare transaction {}", xlogrec.xl_xid); } } else if xlogrec.xl_rmid == pg_constants::RM_DBASE_ID { let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK; diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 52e8218981..6f6062c6d2 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -287,9 +287,11 @@ impl PostgresRedoManagerInternal { } else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID { let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; let mut status = 0; - if info == pg_constants::XLOG_XACT_COMMIT { + if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_COMMIT_PREPARED { status = pg_constants::TRANSACTION_STATUS_COMMITTED; - transaction_id_set_status(xlogrec.xl_xid, status, &mut page); + if info == pg_constants::XLOG_XACT_COMMIT { + transaction_id_set_status(xlogrec.xl_xid, status, &mut page); + } //handle subtrans let _xact_time = buf.get_i64_le(); let mut xinfo = 0; @@ -313,9 +315,38 @@ impl PostgresRedoManagerInternal { } } } - } else if info == pg_constants::XLOG_XACT_ABORT { + if info == pg_constants::XLOG_XACT_COMMIT_PREPARED { + if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 { + let nrels = buf.get_i32_le(); + for _i in 0..nrels { + let spcnode = buf.get_u32_le(); + let dbnode = buf.get_u32_le(); + let relnode = buf.get_u32_le(); + //TODO handle this too? + trace!( + "XLOG_XACT_COMMIT relfilenode {}/{}/{}", + spcnode, + dbnode, + relnode + ); + } + } + if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 { + let nmsgs = buf.get_i32_le(); + for _i in 0..nmsgs { + let sizeof_shared_invalidation_message = 0; + buf.advance(sizeof_shared_invalidation_message); + } + } + assert!((xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE) != 0); + let xid = buf.get_u32_le(); + transaction_id_set_status(xid, status, &mut page); + } + } else if info == pg_constants::XLOG_XACT_ABORT || info == pg_constants::XLOG_XACT_ABORT_PREPARED { status = pg_constants::TRANSACTION_STATUS_ABORTED; - transaction_id_set_status(xlogrec.xl_xid, status, &mut page); + if info == pg_constants::XLOG_XACT_ABORT { + transaction_id_set_status(xlogrec.xl_xid, status, &mut page); + } //handle subtrans let _xact_time = buf.get_i64_le(); let mut xinfo = 0; @@ -339,8 +370,39 @@ impl PostgresRedoManagerInternal { } } } - } else if info != pg_constants::XLOG_XACT_PREPARE { - trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {} main_data_offset {}, rec.len {}", + if info == pg_constants::XLOG_XACT_ABORT_PREPARED { + if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 { + let nrels = buf.get_i32_le(); + for _i in 0..nrels { + let spcnode = buf.get_u32_le(); + let dbnode = buf.get_u32_le(); + let relnode = buf.get_u32_le(); + //TODO handle this too? + trace!( + "XLOG_XACT_COMMIT relfilenode {}/{}/{}", + spcnode, + dbnode, + relnode + ); + } + } + if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 { + let nmsgs = buf.get_i32_le(); + for _i in 0..nmsgs { + let sizeof_shared_invalidation_message = 0; + buf.advance(sizeof_shared_invalidation_message); + } + } + assert!((xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE) != 0); + let xid = buf.get_u32_le(); + transaction_id_set_status(xid, status, &mut page); + } + } else if info == pg_constants::XLOG_XACT_PREPARE { + info!("Apply prepare {} record", xlogrec.xl_xid); + page.clear(); + page.extend_from_slice(&buf[..]); + } else { + error!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {} main_data_offset {}, rec.len {}", status, record.lsn, record.main_data_offset, record.rec.len()); diff --git a/postgres_ffi/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs index 29d07b8c0c..41b4624cea 100644 --- a/postgres_ffi/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -60,6 +60,8 @@ pub const CLOG_TRUNCATE: u8 = 0x10; pub const XLOG_XACT_COMMIT: u8 = 0x00; pub const XLOG_XACT_PREPARE: u8 = 0x10; pub const XLOG_XACT_ABORT: u8 = 0x20; +pub const XLOG_XACT_COMMIT_PREPARED: u8 = 0x30; +pub const XLOG_XACT_ABORT_PREPARED: u8 = 0x40; // From srlu.h pub const SLRU_PAGES_PER_SEGMENT: u32 = 32; diff --git a/test_runner/batch_others/test_twophase.py b/test_runner/batch_others/test_twophase.py new file mode 100644 index 0000000000..15638b8987 --- /dev/null +++ b/test_runner/batch_others/test_twophase.py @@ -0,0 +1,58 @@ +# +# Test branching, when a transaction is in prepared state +# +import pytest +import getpass +import psycopg2 + +pytest_plugins = ("fixtures.zenith_fixtures") + +def test_twophase(zenith_cli, pageserver, postgres, pg_bin): + zenith_cli.run(["branch", "test_twophase", "empty"]); + + pg = postgres.create_start('test_twophase', ['max_prepared_transactions=5']) + print("postgres is running on 'test_twophase' branch") + + conn = psycopg2.connect(pg.connstr()); + conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + cur = conn.cursor() + + cur.execute('CREATE TABLE foo (t text)'); + + # Prepare a transaction that will insert a row + cur.execute('BEGIN'); + cur.execute("INSERT INTO foo VALUES ('one')"); + cur.execute("PREPARE TRANSACTION 'insert_one'"); + + # Prepare another transaction that will insert a row + cur.execute('BEGIN'); + cur.execute("INSERT INTO foo VALUES ('two')"); + cur.execute("PREPARE TRANSACTION 'insert_two'"); + + cur.execute('BEGIN'); + cur.execute("INSERT INTO foo VALUES ('three')"); + cur.execute("PREPARE TRANSACTION 'insert_three'"); + cur.execute("COMMIT PREPARED 'insert_three'"); + + cur.execute('SELECT pg_current_wal_insert_lsn()'); + lsn = cur.fetchone()[0] + + # Create a branch with the transaction in prepared state + zenith_cli.run(["branch", "test_twophase_prepared", "test_twophase@"+lsn]); + + pg2 = postgres.create_start('test_twophase_prepared', ['max_prepared_transactions=5']) + conn2 = psycopg2.connect(pg2.connstr()); + conn2.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + cur2 = conn2.cursor() + + # On the new branch, commit one of the prepared transactions, abort the other one. + cur2.execute("COMMIT PREPARED 'insert_one'"); + cur2.execute("ROLLBACK PREPARED 'insert_two'"); + + cur2.execute('SELECT * FROM foo'); + assert(cur2.fetchall() == [('one',),('three',)]); + + # Neither insert is visible on the original branch, the transactions are still + # in prepared state there. + cur.execute('SELECT * FROM foo'); + assert(cur.fetchall() == [('three',)]);