From 47ef9c7ef47e005eeae0ea84b9aa90859c8f9d4e Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Fri, 4 Jun 2021 18:14:19 +0300 Subject: [PATCH] Fix various bugs caused by switch to new storage model --- pageserver/src/basebackup.rs | 17 +++-- pageserver/src/branches.rs | 6 +- pageserver/src/object_repository.rs | 104 +++++++++++++++------------ pageserver/src/repository.rs | 14 ++++ pageserver/src/restore_local_repo.rs | 51 +++++++++---- pageserver/src/waldecoder.rs | 2 +- pageserver/src/walreceiver.rs | 2 +- pageserver/src/walredo.rs | 6 +- 8 files changed, 127 insertions(+), 75 deletions(-) diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index ad947c264f..d3fbf14c92 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -123,13 +123,12 @@ impl<'a> Basebackup<'a> { tag: &ObjectTag, page: u32, ) -> anyhow::Result<()> { - let img = self.timeline.get_page_at_lsn(*tag, self.lsn)?; + let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?; // Zero length image indicates truncated segment: just skip it if !img.is_empty() { assert!(img.len() == pg_constants::BLCKSZ as usize); - let segno = page / pg_constants::SLRU_PAGES_PER_SEGMENT; - if self.slru_segno != segno || self.slru_path != path { + if self.slru_path != "" && (self.slru_segno != segno || self.slru_path != path) { let segname = format!("{}/{:>04X}", self.slru_path, self.slru_segno); let header = new_tar_header(&segname, pg_constants::SLRU_SEG_SIZE as u64)?; self.ar.append(&header, &self.slru_buf[..])?; @@ -158,7 +157,8 @@ impl<'a> Basebackup<'a> { // Extract pg_filenode.map files from repository // fn add_relmap_file(&mut self, tag: &ObjectTag, db: &DatabaseTag) -> anyhow::Result<()> { - let img = self.timeline.get_page_at_lsn(*tag, self.lsn)?; + let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?; + info!("add_relmap_file {:?}", db); let path = if db.spcnode == pg_constants::GLOBALTABLESPACE_OID { String::from("global/pg_filenode.map") } else { @@ -179,7 +179,7 @@ impl<'a> Basebackup<'a> { // Extract twophase state files // fn add_twophase_file(&mut self, tag: &ObjectTag, xid: TransactionId) -> anyhow::Result<()> { - let img = self.timeline.get_page_at_lsn(*tag, self.lsn)?; + let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?; let mut buf = BytesMut::new(); buf.extend_from_slice(&img[..]); let crc = crc32c::crc32c(&img[..]); @@ -197,13 +197,12 @@ impl<'a> Basebackup<'a> { let most_recent_lsn = Lsn(0); let checkpoint_bytes = self .timeline - .get_page_at_lsn(ObjectTag::Checkpoint, most_recent_lsn)?; + .get_page_at_lsn_nowait(ObjectTag::Checkpoint, most_recent_lsn)?; let pg_control_bytes = self .timeline - .get_page_at_lsn(ObjectTag::ControlFile, most_recent_lsn)?; + .get_page_at_lsn_nowait(ObjectTag::ControlFile, most_recent_lsn)?; let mut pg_control = postgres_ffi::decode_pg_control(pg_control_bytes)?; let mut checkpoint = postgres_ffi::decode_checkpoint(checkpoint_bytes)?; - // Here starts pg_resetwal inspired magic // Generate new pg_control and WAL needed for bootstrap let new_segno = self.lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE) + 1; @@ -241,7 +240,7 @@ impl<'a> Basebackup<'a> { let hdr = XLogLongPageHeaderData { std: { XLogPageHeaderData { - xlp_magic: XLOG_PAGE_MAGIC, + xlp_magic: XLOG_PAGE_MAGIC as u16, xlp_info: pg_constants::XLP_LONG_HEADER, xlp_tli: 1, // FIXME: always use Postgres timeline 1 xlp_pageaddr: pg_control.checkPointCopy.redo - SizeOfXLogLongPHD as u64, diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index 6efca05a0c..afb32143a6 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -127,9 +127,6 @@ pub fn init_repo(conf: &'static PageServerConf, repo_dir: &Path) -> Result<()> { // Remove pg_wal fs::remove_dir_all(tmppath.join("pg_wal"))?; - force_crash_recovery(&tmppath)?; - println!("updated pg_control"); - // Move the data directory as an initial base backup. // FIXME: It would be enough to only copy the non-relation files here, the relation // data was already loaded into the repository. @@ -345,6 +342,7 @@ fn parse_point_in_time(conf: &PageServerConf, s: &str) -> Result { bail!("could not parse point-in-time {}", s); } +<<<<<<< HEAD // If control file says the cluster was shut down cleanly, modify it, to mark // it as crashed. That forces crash recovery when you start the cluster. // @@ -366,6 +364,8 @@ fn force_crash_recovery(datadir: &Path) -> Result<()> { Ok(()) } +======= +>>>>>>> Fix various bugs caused by switch to new storage model fn create_timeline(conf: &PageServerConf, ancestor: Option) -> Result { // Create initial timeline let mut tli_buf = [0u8; 16]; diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index b3f4a5e3d0..ec7f1d31f6 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -138,8 +138,7 @@ impl Repository for ObjectRepository { /// Branch a timeline fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, at_lsn: Lsn) -> Result<()> { - // just to check the source timeline exists - let _ = self.get_timeline(src)?; + let src_timeline = self.get_timeline(src)?; // Write a metadata key, noting the ancestor of th new timeline. There is initially // no data in it, but all the read-calls know to look into the ancestor. @@ -156,6 +155,19 @@ impl Repository for ObjectRepository { &ObjectValue::ser(&val)?, )?; + // Copy non-rel objects + for tag in src_timeline.list_nonrels(at_lsn)? { + match tag { + ObjectTag::TimelineMetadataTag => {} // skip it + _ => { + let img = src_timeline.get_page_at_lsn_nowait(tag, at_lsn)?; + let val = ObjectValue::Page(img); + let key = ObjectKey { timeline: dst, tag }; + let lsn = if tag.is_versioned() { at_lsn } else { Lsn(0) }; + self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?)?; + } + } + } Ok(()) } } @@ -255,6 +267,50 @@ impl Timeline for ObjectTimeline { self.get_page_at_lsn_nowait(tag, lsn) } + fn get_page_at_lsn_nowait(&self, tag: ObjectTag, lsn: Lsn) -> Result { + // Look up the page entry. If it's a page image, return that. If it's a WAL record, + // ask the WAL redo service to reconstruct the page image from the WAL records. + let searchkey = ObjectKey { + timeline: self.timelineid, + tag, + }; + let mut iter = self.object_versions(&*self.obj_store, &searchkey, lsn)?; + + if let Some((version_lsn, value)) = iter.next().transpose()? { + let page_img: Bytes; + + match ObjectValue::des(&value)? { + ObjectValue::Page(img) => { + page_img = img; + } + ObjectValue::WALRecord(_rec) => { + // Request the WAL redo manager to apply the WAL records for us. + let (base_img, records) = self.collect_records_for_apply(tag, lsn)?; + page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?; + + self.put_page_image(tag, lsn, page_img.clone())?; + } + x => bail!("Unexpected object value: {:?}", x), + } + // FIXME: assumes little-endian. Only used for the debugging log though + let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap()); + let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap()); + trace!( + "Returning page with LSN {:X}/{:X} for {:?} from {} (request {})", + page_lsn_hi, + page_lsn_lo, + tag, + version_lsn, + lsn + ); + return Ok(page_img); + } + static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; + trace!("page {:?} at {} not found", tag, lsn); + Ok(Bytes::from_static(&ZERO_PAGE)) + /* return Err("could not find page image")?; */ + } + /// Get size of relation fn get_rel_size(&self, rel: RelTag, lsn: Lsn) -> Result { let lsn = self.wait_lsn(lsn)?; @@ -553,50 +609,6 @@ impl Timeline for ObjectTimeline { } impl ObjectTimeline { - fn get_page_at_lsn_nowait(&self, tag: ObjectTag, lsn: Lsn) -> Result { - // Look up the page entry. If it's a page image, return that. If it's a WAL record, - // ask the WAL redo service to reconstruct the page image from the WAL records. - let searchkey = ObjectKey { - timeline: self.timelineid, - tag, - }; - let mut iter = self.object_versions(&*self.obj_store, &searchkey, lsn)?; - - if let Some((version_lsn, value)) = iter.next().transpose()? { - let page_img: Bytes; - - match ObjectValue::des(&value)? { - ObjectValue::Page(img) => { - page_img = img; - } - ObjectValue::WALRecord(_rec) => { - // Request the WAL redo manager to apply the WAL records for us. - let (base_img, records) = self.collect_records_for_apply(tag, lsn)?; - page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?; - - self.put_page_image(tag, lsn, page_img.clone())?; - } - x => bail!("Unexpected object value: {:?}", x), - } - // FIXME: assumes little-endian. Only used for the debugging log though - let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap()); - let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap()); - trace!( - "Returning page with LSN {:X}/{:X} for {:?} from {} (request {})", - page_lsn_hi, - page_lsn_lo, - tag, - version_lsn, - lsn - ); - return Ok(page_img); - } - static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; - trace!("page {:?} at {} not found", tag, lsn); - Ok(Bytes::from_static(&ZERO_PAGE)) - /* return Err("could not find page image")?; */ - } - /// /// Internal function to get relation size at given LSN. /// diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 1caa35840c..475e955bb8 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -38,6 +38,9 @@ pub trait Timeline: Send + Sync { /// Look up given page in the cache. fn get_page_at_lsn(&self, tag: ObjectTag, lsn: Lsn) -> Result; + /// Look up given page in the cache. + fn get_page_at_lsn_nowait(&self, tag: ObjectTag, lsn: Lsn) -> Result; + /// Get size of relation fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result; @@ -273,6 +276,17 @@ pub enum ObjectTag { RelationBuffer(BufferTag), } +impl ObjectTag { + pub fn is_versioned(&self) -> bool { + match self { + ObjectTag::Checkpoint => false, + ObjectTag::ControlFile => false, + ObjectTag::TimelineMetadataTag => false, + _ => true, + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WALRecord { pub lsn: Lsn, // LSN at the *end* of the record diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 5ffe704fa2..d8cf0918ca 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -254,7 +254,6 @@ fn import_slru_file( let mut file = File::open(path)?; let mut buf: [u8; 8192] = [0u8; 8192]; let segno = u32::from_str_radix(path.file_name().unwrap().to_str().unwrap(), 16)?; - let mut blknum: u32 = segno * pg_constants::SLRU_PAGES_PER_SEGMENT; loop { let r = file.read_exact(&mut buf); @@ -291,9 +290,13 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE); let mut last_lsn = startpoint; - let pg_control_bytes = timeline.get_page_at_lsn(ObjectTag::ControlFile, Lsn(0))?; - let pg_control = decode_pg_control(pg_control_bytes)?; - let mut checkpoint = pg_control.checkPointCopy.clone(); + let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, Lsn(0))?; + let mut checkpoint = decode_checkpoint(checkpoint_bytes)?; + if checkpoint.nextXid.value == 0 { + let pg_control_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::ControlFile, Lsn(0))?; + let pg_control = decode_pg_control(pg_control_bytes)?; + checkpoint = pg_control.checkPointCopy; + } loop { // FIXME: assume postgresql tli 1 for now @@ -375,14 +378,17 @@ pub fn save_decoded_record( // Iterate through all the blocks that the record modifies, and // "put" a separate copy of the record for each block. for blk in decoded.blocks.iter() { - let rec = WALRecord { - lsn, - will_init: blk.will_init || blk.apply_image, - rec: recdata.clone(), - main_data_offset: decoded.main_data_offset as u32, - }; - - timeline.put_wal_record(blk.tag, rec)?; + if blk.will_drop { + timeline.put_unlink(blk.tag, lsn)?; + } else { + let rec = WALRecord { + lsn, + will_init: blk.will_init || blk.apply_image, + rec: recdata.clone(), + main_data_offset: decoded.main_data_offset as u32, + }; + timeline.put_wal_record(blk.tag, rec)?; + } } // Handle a few special record types @@ -447,9 +453,9 @@ fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatab blknum, }); - let content = timeline.get_page_at_lsn(src_key, req_lsn)?; + let content = timeline.get_page_at_lsn_nowait(src_key, req_lsn)?; - info!("copying block {:?} to {:?}", src_key, dst_key); + debug!("copying block {:?} to {:?}", src_key, dst_key); timeline.put_page_image(dst_key, lsn, content)?; num_blocks_copied += 1; @@ -462,6 +468,23 @@ fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatab num_rels_copied += 1; } + // Copy relfilemap + for tag in timeline.list_nonrels(req_lsn)? { + match tag { + ObjectTag::FileNodeMap(db) => { + if db.spcnode == src_tablespace_id && db.dbnode == src_db_id { + let img = timeline.get_page_at_lsn_nowait(tag, req_lsn)?; + let new_tag = ObjectTag::FileNodeMap(DatabaseTag { + spcnode: tablespace_id, + dbnode: db_id, + }); + timeline.put_page_image(new_tag, lsn, img)?; + break; + } + } + _ => {} // do nothing + } + } info!( "Created database {}/{}, copied {} blocks in {} rels at {}", tablespace_id, db_id, num_blocks_copied, num_rels_copied, lsn diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 7072cdbbd8..07975d8872 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -903,7 +903,7 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW }); blk.will_init = true; blocks.push(blk); - info!("Prepare transaction {}", xlogrec.xl_xid); + debug!("Prepare transaction {}", xlogrec.xl_xid); } } else if xlogrec.xl_rmid == pg_constants::RM_DBASE_ID { let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK; diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 22c7bfe6ff..84ca9f8db7 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -174,7 +174,7 @@ fn walreceiver_main( let mut waldecoder = WalStreamDecoder::new(startpoint); - let checkpoint_bytes = timeline.get_page_at_lsn(ObjectTag::Checkpoint, Lsn(0))?; + let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, Lsn(0))?; let mut checkpoint = decode_checkpoint(checkpoint_bytes)?; trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value); diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index c9140c50e1..25f9514831 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -305,7 +305,11 @@ impl PostgresRedoManagerInternal { let mut status = 0; let tag_blknum = match tag { ObjectTag::Clog(slru) => slru.blknum, - _ => panic!("Not CLOG object tag"), + ObjectTag::TwoPhase(_) => { + assert!(info == pg_constants::XLOG_XACT_PREPARE); + 0 // not used by XLOG_XACT_PREPARE + } + _ => panic!("Not valid XACT object tag {:?}", tag), }; if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_COMMIT_PREPARED