From ced338fd2086c1957800b73b4254d3120fdfe93f Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 28 Jun 2021 18:50:10 +0300 Subject: [PATCH] Handle relation DROPs in page server. Add back code to parse transaction commit and abort records, and in particular the list of dropped relations in them. Add 'put_unlink' function to the Timeline trait and implementation. We had the code to handle dropped relations in the GC code and elsewhere in ObjectRepository already, but there was nothing to create the RelationSizeEntry::Unlink tombstone entries until now. Also add a test to check that GC correctly removes all page versions of a dropped relation. Implements https://github.com/zenithdb/zenith/issues/232, except for the "orphaned" rels. Reviewed-by: Konstantin Knizhnik --- pageserver/src/object_repository.rs | 13 ++++ pageserver/src/repository.rs | 3 + pageserver/src/restore_local_repo.rs | 30 ++++++++- pageserver/src/waldecoder.rs | 99 ++++++++++++++++++++++++++++ postgres_ffi/src/pg_constants.rs | 20 ++++++ test_runner/batch_others/test_gc.py | 11 ++++ 6 files changed, 175 insertions(+), 1 deletion(-) diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index bb7dba43f7..9f841925b3 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -370,6 +370,19 @@ impl Timeline for ObjectTimeline { Ok(()) } + /// Unlink object. This method is used for marking dropped relations. + fn put_unlink(&self, rel_tag: RelTag, lsn: Lsn) -> Result<()> { + let key = ObjectKey { + timeline: self.timelineid, + tag: ObjectTag::RelationMetadata(rel_tag), + }; + let val = RelationSizeEntry::Unlink; + self.obj_store + .put(&key, lsn, &RelationSizeEntry::ser(&val)?)?; + + Ok(()) + } + /// /// Memorize a full image of a page version /// diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 3bf725bbf9..595b6c593c 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -76,6 +76,9 @@ pub trait Timeline: Send + Sync { /// Truncate relation fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()>; + /// Unlink object. This method is used for marking dropped relations. + fn put_unlink(&self, tag: RelTag, lsn: Lsn) -> Result<()>; + /// Remember the all WAL before the given LSN has been processed. /// /// The WAL receiver calls this after the put_* functions, to indicate that diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 9e480a5135..4999a6b934 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -16,7 +16,7 @@ use bytes::Bytes; use crate::repository::{BufferTag, RelTag, Timeline, WALRecord}; use crate::waldecoder::{decode_wal_record, DecodedWALRecord, Oid, WalStreamDecoder}; -use crate::waldecoder::{XlCreateDatabase, XlSmgrTruncate}; +use crate::waldecoder::{XlCreateDatabase, XlSmgrTruncate, XlXactParsedRecord}; use crate::PageServerConf; use crate::ZTimelineId; use postgres_ffi::pg_constants; @@ -292,6 +292,16 @@ pub fn save_decoded_record( } } else if decoded.xl_rmid == pg_constants::RM_TBLSPC_ID { trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet"); + } else if decoded.xl_rmid == pg_constants::RM_XACT_ID { + let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK; + if info == pg_constants::XLOG_XACT_COMMIT + || info == pg_constants::XLOG_XACT_COMMIT_PREPARED + || info == pg_constants::XLOG_XACT_ABORT + || info == pg_constants::XLOG_XACT_ABORT_PREPARED + { + let parsed_xact = XlXactParsedRecord::decode(&decoded); + save_xact_record(timeline, lsn, &parsed_xact)?; + } } // Now that this record has been handled, let the repository know that @@ -426,3 +436,21 @@ fn save_xlog_smgr_truncate(timeline: &dyn Timeline, lsn: Lsn, rec: &XlSmgrTrunca } Ok(()) } + +/// Subroutine of save_decoded_record(), to handle an XLOG_XACT_* records. +/// +/// We are currently only interested in the dropped relations. +fn save_xact_record(timeline: &dyn Timeline, lsn: Lsn, rec: &XlXactParsedRecord) -> Result<()> { + for xnode in &rec.xnodes { + for forknum in pg_constants::MAIN_FORKNUM..=pg_constants::VISIBILITYMAP_FORKNUM { + let rel_tag = RelTag { + forknum, + spcnode: xnode.spcnode, + dbnode: xnode.dbnode, + relnode: xnode.relnode, + }; + timeline.put_unlink(rel_tag, lsn)?; + } + } + Ok(()) +} diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index a21936880f..cfb9f1355c 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -18,6 +18,7 @@ pub type Oid = u32; pub type TransactionId = u32; pub type BlockNumber = u32; pub type OffsetNumber = u16; +pub type TimestampTz = i64; #[allow(dead_code)] pub struct WalStreamDecoder { @@ -382,6 +383,104 @@ impl XlHeapUpdate { } } +/// +/// Note: Parsing some fields is missing, because they're not needed. +/// +/// This is similar to the xl_xact_parsed_commit and +/// xl_xact_parsed_abort structs in PostgreSQL, but we use the same +/// struct for commits and aborts. +/// +#[derive(Debug)] +pub struct XlXactParsedRecord { + pub info: u8, + pub xact_time: TimestampTz, + pub xinfo: u32, + + pub db_id: Oid, /* MyDatabaseId */ + pub ts_id: Oid, /* MyDatabaseTableSpace */ + + pub subxacts: Vec, + + pub xnodes: Vec, +} + +impl XlXactParsedRecord { + /// Decode a XLOG_XACT_COMMIT/ABORT/COMMIT_PREPARED/ABORT_PREPARED + /// record. This should agree with the ParseCommitRecord and ParseAbortRecord + /// functions in PostgreSQL (in src/backend/access/rmgr/xactdesc.c) + pub fn decode(decoded: &DecodedWALRecord) -> XlXactParsedRecord { + let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK; + let mut buf = decoded.record.clone(); + buf.advance(decoded.main_data_offset); + + // The record starts with time of commit/abort + let xact_time = buf.get_i64_le(); + let xinfo; + if decoded.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { + xinfo = buf.get_u32_le(); + } else { + xinfo = 0; + } + let db_id; + let ts_id; + if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 { + db_id = buf.get_u32_le(); + ts_id = buf.get_u32_le(); + } else { + db_id = 0; + ts_id = 0; + } + let mut subxacts = Vec::::new(); + if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 { + let nsubxacts = buf.get_i32_le(); + for _i in 0..nsubxacts { + let subxact = buf.get_u32_le(); + subxacts.push(subxact); + } + } + let mut xnodes = Vec::::new(); + if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 { + let nrels = buf.get_i32_le(); + for _i in 0..nrels { + let spcnode = buf.get_u32_le(); + let dbnode = buf.get_u32_le(); + let relnode = buf.get_u32_le(); + trace!( + "XLOG_XACT_COMMIT relfilenode {}/{}/{}", + spcnode, + dbnode, + relnode + ); + xnodes.push(RelFileNode { + spcnode, + dbnode, + relnode, + }); + } + } + if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 { + let nmsgs = buf.get_i32_le(); + for _i in 0..nmsgs { + let sizeof_shared_invalidation_message = 0; + buf.advance(sizeof_shared_invalidation_message); + } + } + if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 { + let _xid = buf.get_u32_le(); + trace!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE"); + } + XlXactParsedRecord { + info, + xact_time, + xinfo, + db_id, + ts_id, + subxacts, + xnodes, + } + } +} + /// Main routine to decode a WAL record and figure out which blocks are modified // // See xlogrecord.h for details diff --git a/postgres_ffi/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs index ecd7fe3919..996e5e978e 100644 --- a/postgres_ffi/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -33,11 +33,31 @@ pub const SIZE_OF_PAGE_HEADER: u16 = 24; pub const BITS_PER_HEAPBLOCK: u16 = 2; pub const HEAPBLOCKS_PER_PAGE: u16 = (BLCKSZ - SIZE_OF_PAGE_HEADER) * 8 / BITS_PER_HEAPBLOCK; +// From xact.h +pub const XLOG_XACT_COMMIT: u8 = 0x00; +pub const XLOG_XACT_PREPARE: u8 = 0x10; +pub const XLOG_XACT_ABORT: u8 = 0x20; +pub const XLOG_XACT_COMMIT_PREPARED: u8 = 0x30; +pub const XLOG_XACT_ABORT_PREPARED: u8 = 0x40; + /* mask for filtering opcodes out of xl_info */ pub const XLOG_XACT_OPMASK: u8 = 0x70; /* does this record have a 'xinfo' field or not */ pub const XLOG_XACT_HAS_INFO: u8 = 0x80; +/* + * The following flags, stored in xinfo, determine which information is + * contained in commit/abort records. + */ +pub const XACT_XINFO_HAS_DBINFO: u32 = 1u32 << 0; +pub const XACT_XINFO_HAS_SUBXACTS: u32 = 1u32 << 1; +pub const XACT_XINFO_HAS_RELFILENODES: u32 = 1u32 << 2; +pub const XACT_XINFO_HAS_INVALS: u32 = 1u32 << 3; +pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4; +// pub const XACT_XINFO_HAS_ORIGIN: u32 = 1u32 << 5; +// pub const XACT_XINFO_HAS_AE_LOCKS: u32 = 1u32 << 6; +// pub const XACT_XINFO_HAS_GID: u32 = 1u32 << 7; + // From pg_control.h and rmgrlist.h pub const XLOG_SWITCH: u8 = 0x40; pub const XLOG_SMGR_TRUNCATE: u8 = 0x20; diff --git a/test_runner/batch_others/test_gc.py b/test_runner/batch_others/test_gc.py index e9235bb697..773fcb5949 100644 --- a/test_runner/batch_others/test_gc.py +++ b/test_runner/batch_others/test_gc.py @@ -84,3 +84,14 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin): assert row['dropped'] == 0 assert row['truncated'] == 0 assert row['deleted'] == 0 + + # + # Test DROP TABLE checks that relation data and metadata was deleted by GC from object storage + # + cur.execute("DROP TABLE foo") + + pscur.execute(f"do_gc {timeline} 0") + row = pscur.fetchone() + print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) + # Each relation fork is counted separately, hence 3. + assert row['dropped'] == 3