diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index bb7dba43f7..9f841925b3 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -370,6 +370,19 @@ impl Timeline for ObjectTimeline { Ok(()) } + /// Unlink object. This method is used for marking dropped relations. + fn put_unlink(&self, rel_tag: RelTag, lsn: Lsn) -> Result<()> { + let key = ObjectKey { + timeline: self.timelineid, + tag: ObjectTag::RelationMetadata(rel_tag), + }; + let val = RelationSizeEntry::Unlink; + self.obj_store + .put(&key, lsn, &RelationSizeEntry::ser(&val)?)?; + + Ok(()) + } + /// /// Memorize a full image of a page version /// diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 3bf725bbf9..595b6c593c 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -76,6 +76,9 @@ pub trait Timeline: Send + Sync { /// Truncate relation fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()>; + /// Unlink object. This method is used for marking dropped relations. + fn put_unlink(&self, tag: RelTag, lsn: Lsn) -> Result<()>; + /// Remember the all WAL before the given LSN has been processed. /// /// The WAL receiver calls this after the put_* functions, to indicate that diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 9e480a5135..4999a6b934 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -16,7 +16,7 @@ use bytes::Bytes; use crate::repository::{BufferTag, RelTag, Timeline, WALRecord}; use crate::waldecoder::{decode_wal_record, DecodedWALRecord, Oid, WalStreamDecoder}; -use crate::waldecoder::{XlCreateDatabase, XlSmgrTruncate}; +use crate::waldecoder::{XlCreateDatabase, XlSmgrTruncate, XlXactParsedRecord}; use crate::PageServerConf; use crate::ZTimelineId; use postgres_ffi::pg_constants; @@ -292,6 +292,16 @@ pub fn save_decoded_record( } } else if decoded.xl_rmid == pg_constants::RM_TBLSPC_ID { trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet"); + } else if decoded.xl_rmid == pg_constants::RM_XACT_ID { + let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK; + if info == pg_constants::XLOG_XACT_COMMIT + || info == pg_constants::XLOG_XACT_COMMIT_PREPARED + || info == pg_constants::XLOG_XACT_ABORT + || info == pg_constants::XLOG_XACT_ABORT_PREPARED + { + let parsed_xact = XlXactParsedRecord::decode(&decoded); + save_xact_record(timeline, lsn, &parsed_xact)?; + } } // Now that this record has been handled, let the repository know that @@ -426,3 +436,21 @@ fn save_xlog_smgr_truncate(timeline: &dyn Timeline, lsn: Lsn, rec: &XlSmgrTrunca } Ok(()) } + +/// Subroutine of save_decoded_record(), to handle an XLOG_XACT_* records. +/// +/// We are currently only interested in the dropped relations. +fn save_xact_record(timeline: &dyn Timeline, lsn: Lsn, rec: &XlXactParsedRecord) -> Result<()> { + for xnode in &rec.xnodes { + for forknum in pg_constants::MAIN_FORKNUM..=pg_constants::VISIBILITYMAP_FORKNUM { + let rel_tag = RelTag { + forknum, + spcnode: xnode.spcnode, + dbnode: xnode.dbnode, + relnode: xnode.relnode, + }; + timeline.put_unlink(rel_tag, lsn)?; + } + } + Ok(()) +} diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index a21936880f..cfb9f1355c 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -18,6 +18,7 @@ pub type Oid = u32; pub type TransactionId = u32; pub type BlockNumber = u32; pub type OffsetNumber = u16; +pub type TimestampTz = i64; #[allow(dead_code)] pub struct WalStreamDecoder { @@ -382,6 +383,104 @@ impl XlHeapUpdate { } } +/// +/// Note: Parsing some fields is missing, because they're not needed. +/// +/// This is similar to the xl_xact_parsed_commit and +/// xl_xact_parsed_abort structs in PostgreSQL, but we use the same +/// struct for commits and aborts. +/// +#[derive(Debug)] +pub struct XlXactParsedRecord { + pub info: u8, + pub xact_time: TimestampTz, + pub xinfo: u32, + + pub db_id: Oid, /* MyDatabaseId */ + pub ts_id: Oid, /* MyDatabaseTableSpace */ + + pub subxacts: Vec, + + pub xnodes: Vec, +} + +impl XlXactParsedRecord { + /// Decode a XLOG_XACT_COMMIT/ABORT/COMMIT_PREPARED/ABORT_PREPARED + /// record. This should agree with the ParseCommitRecord and ParseAbortRecord + /// functions in PostgreSQL (in src/backend/access/rmgr/xactdesc.c) + pub fn decode(decoded: &DecodedWALRecord) -> XlXactParsedRecord { + let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK; + let mut buf = decoded.record.clone(); + buf.advance(decoded.main_data_offset); + + // The record starts with time of commit/abort + let xact_time = buf.get_i64_le(); + let xinfo; + if decoded.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { + xinfo = buf.get_u32_le(); + } else { + xinfo = 0; + } + let db_id; + let ts_id; + if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 { + db_id = buf.get_u32_le(); + ts_id = buf.get_u32_le(); + } else { + db_id = 0; + ts_id = 0; + } + let mut subxacts = Vec::::new(); + if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 { + let nsubxacts = buf.get_i32_le(); + for _i in 0..nsubxacts { + let subxact = buf.get_u32_le(); + subxacts.push(subxact); + } + } + let mut xnodes = Vec::::new(); + if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 { + let nrels = buf.get_i32_le(); + for _i in 0..nrels { + let spcnode = buf.get_u32_le(); + let dbnode = buf.get_u32_le(); + let relnode = buf.get_u32_le(); + trace!( + "XLOG_XACT_COMMIT relfilenode {}/{}/{}", + spcnode, + dbnode, + relnode + ); + xnodes.push(RelFileNode { + spcnode, + dbnode, + relnode, + }); + } + } + if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 { + let nmsgs = buf.get_i32_le(); + for _i in 0..nmsgs { + let sizeof_shared_invalidation_message = 0; + buf.advance(sizeof_shared_invalidation_message); + } + } + if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 { + let _xid = buf.get_u32_le(); + trace!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE"); + } + XlXactParsedRecord { + info, + xact_time, + xinfo, + db_id, + ts_id, + subxacts, + xnodes, + } + } +} + /// Main routine to decode a WAL record and figure out which blocks are modified // // See xlogrecord.h for details diff --git a/postgres_ffi/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs index ecd7fe3919..996e5e978e 100644 --- a/postgres_ffi/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -33,11 +33,31 @@ pub const SIZE_OF_PAGE_HEADER: u16 = 24; pub const BITS_PER_HEAPBLOCK: u16 = 2; pub const HEAPBLOCKS_PER_PAGE: u16 = (BLCKSZ - SIZE_OF_PAGE_HEADER) * 8 / BITS_PER_HEAPBLOCK; +// From xact.h +pub const XLOG_XACT_COMMIT: u8 = 0x00; +pub const XLOG_XACT_PREPARE: u8 = 0x10; +pub const XLOG_XACT_ABORT: u8 = 0x20; +pub const XLOG_XACT_COMMIT_PREPARED: u8 = 0x30; +pub const XLOG_XACT_ABORT_PREPARED: u8 = 0x40; + /* mask for filtering opcodes out of xl_info */ pub const XLOG_XACT_OPMASK: u8 = 0x70; /* does this record have a 'xinfo' field or not */ pub const XLOG_XACT_HAS_INFO: u8 = 0x80; +/* + * The following flags, stored in xinfo, determine which information is + * contained in commit/abort records. + */ +pub const XACT_XINFO_HAS_DBINFO: u32 = 1u32 << 0; +pub const XACT_XINFO_HAS_SUBXACTS: u32 = 1u32 << 1; +pub const XACT_XINFO_HAS_RELFILENODES: u32 = 1u32 << 2; +pub const XACT_XINFO_HAS_INVALS: u32 = 1u32 << 3; +pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4; +// pub const XACT_XINFO_HAS_ORIGIN: u32 = 1u32 << 5; +// pub const XACT_XINFO_HAS_AE_LOCKS: u32 = 1u32 << 6; +// pub const XACT_XINFO_HAS_GID: u32 = 1u32 << 7; + // From pg_control.h and rmgrlist.h pub const XLOG_SWITCH: u8 = 0x40; pub const XLOG_SMGR_TRUNCATE: u8 = 0x20; diff --git a/test_runner/batch_others/test_gc.py b/test_runner/batch_others/test_gc.py index e9235bb697..773fcb5949 100644 --- a/test_runner/batch_others/test_gc.py +++ b/test_runner/batch_others/test_gc.py @@ -84,3 +84,14 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin): assert row['dropped'] == 0 assert row['truncated'] == 0 assert row['deleted'] == 0 + + # + # Test DROP TABLE checks that relation data and metadata was deleted by GC from object storage + # + cur.execute("DROP TABLE foo") + + pscur.execute(f"do_gc {timeline} 0") + row = pscur.fetchone() + print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) + # Each relation fork is counted separately, hence 3. + assert row['dropped'] == 3