diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index a9173b41e9..4189200d5c 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -98,261 +98,258 @@ impl<'a> WalIngest<'a> { self.checkpoint_modified = true; } - // Heap AM records need some special handling, because they modify VM pages - // without registering them with the standard mechanism. - if decoded.xl_rmid == pg_constants::RM_HEAP_ID - || decoded.xl_rmid == pg_constants::RM_HEAP2_ID - { - self.ingest_heapam_record(&mut buf, modification, decoded, ctx) - .await?; - } - if decoded.xl_rmid == pg_constants::RM_NEON_ID { - self.ingest_neonrmgr_record(&mut buf, modification, decoded, ctx) - .await?; - } - // Handle other special record types - if decoded.xl_rmid == pg_constants::RM_SMGR_ID - && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) - == pg_constants::XLOG_SMGR_CREATE - { - let create = XlSmgrCreate::decode(&mut buf); - self.ingest_xlog_smgr_create(modification, &create, ctx) - .await?; - } else if decoded.xl_rmid == pg_constants::RM_SMGR_ID - && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) - == pg_constants::XLOG_SMGR_TRUNCATE - { - let truncate = XlSmgrTruncate::decode(&mut buf); - self.ingest_xlog_smgr_truncate(modification, &truncate, ctx) - .await?; - } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID { - debug!( - "handle RM_DBASE_ID for Postgres version {:?}", - self.timeline.pg_version - ); - if self.timeline.pg_version == 14 { - if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) - == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE - { - let createdb = XlCreateDatabase::decode(&mut buf); - debug!("XLOG_DBASE_CREATE v14"); + match decoded.xl_rmid { + pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => { + // Heap AM records need some special handling, because they modify VM pages + // without registering them with the standard mechanism. + self.ingest_heapam_record(&mut buf, modification, decoded, ctx) + .await?; + } + pg_constants::RM_NEON_ID => { + self.ingest_neonrmgr_record(&mut buf, modification, decoded, ctx) + .await?; + } + // Handle other special record types + pg_constants::RM_SMGR_ID => { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - self.ingest_xlog_dbase_create(modification, &createdb, ctx) + if info == pg_constants::XLOG_SMGR_CREATE { + let create = XlSmgrCreate::decode(&mut buf); + self.ingest_xlog_smgr_create(modification, &create, ctx) .await?; - } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) - == postgres_ffi::v14::bindings::XLOG_DBASE_DROP - { - let dropdb = XlDropDatabase::decode(&mut buf); - for tablespace_id in dropdb.tablespace_ids { - trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); - modification - .drop_dbdir(tablespace_id, dropdb.db_id, ctx) - .await?; - } - } - } else if self.timeline.pg_version == 15 { - if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) - == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG - { - debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); - } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) - == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY - { - // The XLOG record was renamed between v14 and v15, - // but the record format is the same. - // So we can reuse XlCreateDatabase here. - debug!("XLOG_DBASE_CREATE_FILE_COPY"); - let createdb = XlCreateDatabase::decode(&mut buf); - self.ingest_xlog_dbase_create(modification, &createdb, ctx) + } else if info == pg_constants::XLOG_SMGR_TRUNCATE { + let truncate = XlSmgrTruncate::decode(&mut buf); + self.ingest_xlog_smgr_truncate(modification, &truncate, ctx) .await?; - } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) - == postgres_ffi::v15::bindings::XLOG_DBASE_DROP - { - let dropdb = XlDropDatabase::decode(&mut buf); - for tablespace_id in dropdb.tablespace_ids { - trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); - modification - .drop_dbdir(tablespace_id, dropdb.db_id, ctx) - .await?; - } - } - } else if self.timeline.pg_version == 16 { - if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) - == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG - { - debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); - } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) - == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY - { - // The XLOG record was renamed between v14 and v15, - // but the record format is the same. - // So we can reuse XlCreateDatabase here. - debug!("XLOG_DBASE_CREATE_FILE_COPY"); - let createdb = XlCreateDatabase::decode(&mut buf); - self.ingest_xlog_dbase_create(modification, &createdb, ctx) - .await?; - } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) - == postgres_ffi::v16::bindings::XLOG_DBASE_DROP - { - let dropdb = XlDropDatabase::decode(&mut buf); - for tablespace_id in dropdb.tablespace_ids { - trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); - modification - .drop_dbdir(tablespace_id, dropdb.db_id, ctx) - .await?; - } } } - } else if decoded.xl_rmid == pg_constants::RM_TBLSPC_ID { - trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet"); - } else if decoded.xl_rmid == pg_constants::RM_CLOG_ID { - let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK; - if info == pg_constants::CLOG_ZEROPAGE { - let pageno = buf.get_u32_le(); - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - self.put_slru_page_image( - modification, - SlruKind::Clog, - segno, - rpageno, - ZERO_PAGE.clone(), - ctx, - ) - .await?; - } else { - assert!(info == pg_constants::CLOG_TRUNCATE); - let xlrec = XlClogTruncate::decode(&mut buf); - self.ingest_clog_truncate_record(modification, &xlrec, ctx) - .await?; - } - } else if decoded.xl_rmid == pg_constants::RM_XACT_ID { - let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK; - if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT { - let parsed_xact = - XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info); - self.ingest_xact_record( - modification, - &parsed_xact, - info == pg_constants::XLOG_XACT_COMMIT, - ctx, - ) - .await?; - } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED - || info == pg_constants::XLOG_XACT_ABORT_PREPARED - { - let parsed_xact = - XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info); - self.ingest_xact_record( - modification, - &parsed_xact, - info == pg_constants::XLOG_XACT_COMMIT_PREPARED, - ctx, - ) - .await?; - // Remove twophase file. see RemoveTwoPhaseFile() in postgres code - trace!( - "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}", - decoded.xl_xid, - parsed_xact.xid, - lsn, - ); - modification - .drop_twophase_file(parsed_xact.xid, ctx) - .await?; - } else if info == pg_constants::XLOG_XACT_PREPARE { - modification - .put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx) - .await?; - } - } else if decoded.xl_rmid == pg_constants::RM_MULTIXACT_ID { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + pg_constants::RM_DBASE_ID => { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + debug!(%info, pg_version=%self.timeline.pg_version, "handle RM_DBASE_ID"); - if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE { - let pageno = buf.get_u32_le(); - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - self.put_slru_page_image( - modification, - SlruKind::MultiXactOffsets, - segno, - rpageno, - ZERO_PAGE.clone(), - ctx, - ) - .await?; - } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE { - let pageno = buf.get_u32_le(); - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - self.put_slru_page_image( - modification, - SlruKind::MultiXactMembers, - segno, - rpageno, - ZERO_PAGE.clone(), - ctx, - ) - .await?; - } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { - let xlrec = XlMultiXactCreate::decode(&mut buf); - self.ingest_multixact_create_record(modification, &xlrec)?; - } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { - let xlrec = XlMultiXactTruncate::decode(&mut buf); - self.ingest_multixact_truncate_record(modification, &xlrec, ctx) + if self.timeline.pg_version == 14 { + if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE { + let createdb = XlCreateDatabase::decode(&mut buf); + debug!("XLOG_DBASE_CREATE v14"); + + self.ingest_xlog_dbase_create(modification, &createdb, ctx) + .await?; + } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP { + let dropdb = XlDropDatabase::decode(&mut buf); + for tablespace_id in dropdb.tablespace_ids { + trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); + modification + .drop_dbdir(tablespace_id, dropdb.db_id, ctx) + .await?; + } + } + } else if self.timeline.pg_version == 15 { + if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG { + debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); + } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY { + // The XLOG record was renamed between v14 and v15, + // but the record format is the same. + // So we can reuse XlCreateDatabase here. + debug!("XLOG_DBASE_CREATE_FILE_COPY"); + let createdb = XlCreateDatabase::decode(&mut buf); + self.ingest_xlog_dbase_create(modification, &createdb, ctx) + .await?; + } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP { + let dropdb = XlDropDatabase::decode(&mut buf); + for tablespace_id in dropdb.tablespace_ids { + trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); + modification + .drop_dbdir(tablespace_id, dropdb.db_id, ctx) + .await?; + } + } + } else if self.timeline.pg_version == 16 { + if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG { + debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); + } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY { + // The XLOG record was renamed between v14 and v15, + // but the record format is the same. + // So we can reuse XlCreateDatabase here. + debug!("XLOG_DBASE_CREATE_FILE_COPY"); + let createdb = XlCreateDatabase::decode(&mut buf); + self.ingest_xlog_dbase_create(modification, &createdb, ctx) + .await?; + } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP { + let dropdb = XlDropDatabase::decode(&mut buf); + for tablespace_id in dropdb.tablespace_ids { + trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); + modification + .drop_dbdir(tablespace_id, dropdb.db_id, ctx) + .await?; + } + } + } + } + pg_constants::RM_TBLSPC_ID => { + trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet"); + } + pg_constants::RM_CLOG_ID => { + let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK; + + if info == pg_constants::CLOG_ZEROPAGE { + let pageno = buf.get_u32_le(); + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + self.put_slru_page_image( + modification, + SlruKind::Clog, + segno, + rpageno, + ZERO_PAGE.clone(), + ctx, + ) .await?; - } - } else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID { - let xlrec = XlRelmapUpdate::decode(&mut buf); - self.ingest_relmap_page(modification, &xlrec, decoded, ctx) - .await?; - } else if decoded.xl_rmid == pg_constants::RM_XLOG_ID { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - if info == pg_constants::XLOG_NEXTOID { - let next_oid = buf.get_u32_le(); - if self.checkpoint.nextOid != next_oid { - self.checkpoint.nextOid = next_oid; - self.checkpoint_modified = true; + } else { + assert!(info == pg_constants::CLOG_TRUNCATE); + let xlrec = XlClogTruncate::decode(&mut buf); + self.ingest_clog_truncate_record(modification, &xlrec, ctx) + .await?; } - } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE - || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN - { - let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT]; - buf.copy_to_slice(&mut checkpoint_bytes); - let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes)?; - trace!( - "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}", - xlog_checkpoint.oldestXid, - self.checkpoint.oldestXid - ); - if (self - .checkpoint - .oldestXid - .wrapping_sub(xlog_checkpoint.oldestXid) as i32) - < 0 + } + pg_constants::RM_XACT_ID => { + let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK; + + if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT { + let parsed_xact = + XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info); + self.ingest_xact_record( + modification, + &parsed_xact, + info == pg_constants::XLOG_XACT_COMMIT, + ctx, + ) + .await?; + } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED + || info == pg_constants::XLOG_XACT_ABORT_PREPARED { - self.checkpoint.oldestXid = xlog_checkpoint.oldestXid; - self.checkpoint_modified = true; - } - } - } else if decoded.xl_rmid == pg_constants::RM_LOGICALMSG_ID { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - if info == pg_constants::XLOG_LOGICAL_MESSAGE { - let xlrec = XlLogicalMessage::decode(&mut buf); - let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?; - let message = &buf[xlrec.prefix_size..xlrec.prefix_size + xlrec.message_size]; - if prefix == "neon-test" { - // This is a convenient way to make the WAL ingestion pause at - // particular point in the WAL. For more fine-grained control, - // we could peek into the message and only pause if it contains - // a particular string, for example, but this is enough for now. - crate::failpoint_support::sleep_millis_async!( - "wal-ingest-logical-message-sleep" + let parsed_xact = + XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info); + self.ingest_xact_record( + modification, + &parsed_xact, + info == pg_constants::XLOG_XACT_COMMIT_PREPARED, + ctx, + ) + .await?; + // Remove twophase file. see RemoveTwoPhaseFile() in postgres code + trace!( + "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}", + decoded.xl_xid, + parsed_xact.xid, + lsn, ); - } else if let Some(path) = prefix.strip_prefix("neon-file:") { - modification.put_file(path, message, ctx).await?; + modification + .drop_twophase_file(parsed_xact.xid, ctx) + .await?; + } else if info == pg_constants::XLOG_XACT_PREPARE { + modification + .put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx) + .await?; } } + pg_constants::RM_MULTIXACT_ID => { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + + if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE { + let pageno = buf.get_u32_le(); + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + self.put_slru_page_image( + modification, + SlruKind::MultiXactOffsets, + segno, + rpageno, + ZERO_PAGE.clone(), + ctx, + ) + .await?; + } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE { + let pageno = buf.get_u32_le(); + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + self.put_slru_page_image( + modification, + SlruKind::MultiXactMembers, + segno, + rpageno, + ZERO_PAGE.clone(), + ctx, + ) + .await?; + } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { + let xlrec = XlMultiXactCreate::decode(&mut buf); + self.ingest_multixact_create_record(modification, &xlrec)?; + } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { + let xlrec = XlMultiXactTruncate::decode(&mut buf); + self.ingest_multixact_truncate_record(modification, &xlrec, ctx) + .await?; + } + } + pg_constants::RM_RELMAP_ID => { + let xlrec = XlRelmapUpdate::decode(&mut buf); + self.ingest_relmap_page(modification, &xlrec, decoded, ctx) + .await?; + } + pg_constants::RM_XLOG_ID => { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + + if info == pg_constants::XLOG_NEXTOID { + let next_oid = buf.get_u32_le(); + if self.checkpoint.nextOid != next_oid { + self.checkpoint.nextOid = next_oid; + self.checkpoint_modified = true; + } + } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE + || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN + { + let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT]; + buf.copy_to_slice(&mut checkpoint_bytes); + let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes)?; + trace!( + "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}", + xlog_checkpoint.oldestXid, + self.checkpoint.oldestXid + ); + if (self + .checkpoint + .oldestXid + .wrapping_sub(xlog_checkpoint.oldestXid) as i32) + < 0 + { + self.checkpoint.oldestXid = xlog_checkpoint.oldestXid; + self.checkpoint_modified = true; + } + } + } + pg_constants::RM_LOGICALMSG_ID => { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + + if info == pg_constants::XLOG_LOGICAL_MESSAGE { + let xlrec = XlLogicalMessage::decode(&mut buf); + let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?; + let message = &buf[xlrec.prefix_size..xlrec.prefix_size + xlrec.message_size]; + if prefix == "neon-test" { + // This is a convenient way to make the WAL ingestion pause at + // particular point in the WAL. For more fine-grained control, + // we could peek into the message and only pause if it contains + // a particular string, for example, but this is enough for now. + crate::failpoint_support::sleep_millis_async!( + "wal-ingest-logical-message-sleep" + ); + } else if let Some(path) = prefix.strip_prefix("neon-file:") { + modification.put_file(path, message, ctx).await?; + } + } + } + _x => { + // TODO: should probably log & fail here instead of blindly + // doing something without understanding the protocol + } } // Iterate through all the blocks that the record modifies, and