From cb6e2d9ddba63913690fb34e38f969f5b5ec07d5 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 27 May 2021 09:44:46 +0300 Subject: [PATCH] Minor refactoring and cleanup of the Timeline interface. Move `save_decoded_record` out of the Timeline trait. The storage implementation shouldn't need to know how to decode records. Also move put_create_database() out of the Timeline trait. Add a new `list_rels` function to Timeline to support it, instead. Rename `get_relsize` to `get_rel_size`, and `get_relsize_exists` to `get_rel_exists`. Seems nicer. --- pageserver/src/page_service.rs | 4 +- pageserver/src/repository.rs | 114 ++++----------------- pageserver/src/repository/rocksdb.rs | 91 ++++++++--------- pageserver/src/restore_local_repo.rs | 143 ++++++++++++++++++++++++++- pageserver/src/walreceiver.rs | 3 +- 5 files changed, 204 insertions(+), 151 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 9c24504f77..51f3bc8420 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -835,7 +835,7 @@ impl Connection { forknum: req.forknum, }; - let exist = timeline.get_relsize_exists(tag, req.lsn).unwrap_or(false); + let exist = timeline.get_rel_exists(tag, req.lsn).unwrap_or(false); PagestreamBeMessage::Status(PagestreamStatusResponse { ok: exist, @@ -850,7 +850,7 @@ impl Connection { forknum: req.forknum, }; - let n_blocks = timeline.get_relsize(tag, req.lsn).unwrap_or(0); + let n_blocks = timeline.get_rel_size(tag, req.lsn).unwrap_or(0); PagestreamBeMessage::Nblocks(PagestreamStatusResponse { ok: true, n_blocks }) } diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index f923b3a27a..3da371f497 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -1,12 +1,11 @@ pub mod rocksdb; -use crate::waldecoder::{DecodedWALRecord, Oid, XlCreateDatabase, XlSmgrTruncate}; use crate::ZTimelineId; use anyhow::Result; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::forknumber_to_name; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; use std::fmt; use std::sync::Arc; use zenith_utils::lsn::Lsn; @@ -40,10 +39,13 @@ pub trait Timeline { fn get_page_at_lsn(&self, tag: BufferTag, lsn: Lsn) -> Result; /// Get size of relation - fn get_relsize(&self, tag: RelTag, lsn: Lsn) -> Result; + fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result; /// Does relation exist? - fn get_relsize_exists(&self, tag: RelTag, lsn: Lsn) -> Result; + fn get_rel_exists(&self, tag: RelTag, lsn: Lsn) -> Result; + + /// Get a list of all distinct relations in given tablespace and database. + fn list_rels(&self, spcnode: u32, dbnode: u32, lsn: Lsn) -> Result>; //------------------------------------------------------------------------------ // Public PUT functions, to update the repository with new page versions. @@ -63,87 +65,6 @@ pub trait Timeline { /// Truncate relation fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()>; - /// Create a new database from a template database - /// - /// In PostgreSQL, CREATE DATABASE works by scanning the data directory and - /// copying all relation files from the template database. This is the equivalent - /// of that. - fn put_create_database( - &self, - lsn: Lsn, - db_id: Oid, - tablespace_id: Oid, - src_db_id: Oid, - src_tablespace_id: Oid, - ) -> Result<()>; - - /// - /// Helper function to parse a WAL record and call the above functions for all the - /// relations/pages that the record affects. - /// - fn save_decoded_record( - &self, - decoded: DecodedWALRecord, - recdata: Bytes, - lsn: Lsn, - ) -> Result<()> { - // Figure out which blocks the record applies to, and "put" a separate copy - // of the record for each block. - for blk in decoded.blocks.iter() { - let tag = BufferTag { - rel: RelTag { - spcnode: blk.rnode_spcnode, - dbnode: blk.rnode_dbnode, - relnode: blk.rnode_relnode, - forknum: blk.forknum as u8, - }, - blknum: blk.blkno, - }; - - let rec = WALRecord { - lsn, - will_init: blk.will_init || blk.apply_image, - rec: recdata.clone(), - main_data_offset: decoded.main_data_offset as u32, - }; - - self.put_wal_record(tag, rec); - } - - // Handle a few special record types - if decoded.xl_rmid == pg_constants::RM_SMGR_ID - && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) - == pg_constants::XLOG_SMGR_TRUNCATE - { - let truncate = XlSmgrTruncate::decode(&decoded); - if (truncate.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 { - let rel = RelTag { - spcnode: truncate.rnode.spcnode, - dbnode: truncate.rnode.dbnode, - relnode: truncate.rnode.relnode, - forknum: pg_constants::MAIN_FORKNUM, - }; - self.put_truncation(rel, lsn, truncate.blkno)?; - } - } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID - && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) - == pg_constants::XLOG_DBASE_CREATE - { - let createdb = XlCreateDatabase::decode(&decoded); - self.put_create_database( - lsn, - createdb.db_id, - createdb.tablespace_id, - createdb.src_db_id, - createdb.src_tablespace_id, - )?; - } - // Now that this record has been handled, let the repository know that - // it is up-to-date to this LSN - self.advance_last_record_lsn(lsn); - Ok(()) - } - /// Remember the all WAL before the given LSN has been processed. /// /// The WAL receiver calls this after the put_* functions, to indicate that @@ -359,13 +280,14 @@ mod tests { // FIXME: The rocksdb implementation erroneously returns 'true' here, even // though the relation was created only at a later LSN // rocksdb implementation erroneosly returns 'true' here - assert_eq!(tline.get_relsize_exists(TESTREL_A, Lsn(1))?, true); // CORRECT: false - // And this probably should throw an error, becaue the relation doesn't exist at Lsn(1) yet - assert_eq!(tline.get_relsize(TESTREL_A, Lsn(1))?, 0); // CORRECT: throw error + assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(1))?, true); // CORRECT: false - assert_eq!(tline.get_relsize_exists(TESTREL_A, Lsn(2))?, true); - assert_eq!(tline.get_relsize(TESTREL_A, Lsn(2))?, 1); - assert_eq!(tline.get_relsize(TESTREL_A, Lsn(5))?, 3); + // And this probably should throw an error, becaue the relation doesn't exist at Lsn(1) yet + assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(1))?, 0); // CORRECT: throw error + + assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(2))?, true); + assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(2))?, 1); + assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(5))?, 3); // Check page contents at each LSN assert_eq!( @@ -405,7 +327,7 @@ mod tests { tline.advance_last_valid_lsn(Lsn(6)); // Check reported size and contents after truncation - assert_eq!(tline.get_relsize(TESTREL_A, Lsn(6))?, 2); + assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(6))?, 2); assert_eq!( tline.get_page_at_lsn(TEST_BUF(0), Lsn(6))?, TEST_IMG("foo blk 0 at 3") @@ -416,7 +338,7 @@ mod tests { ); // should still see the truncated block with older LSN - assert_eq!(tline.get_relsize(TESTREL_A, Lsn(5))?, 3); + assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(5))?, 3); assert_eq!( tline.get_page_at_lsn(TEST_BUF(2), Lsn(5))?, TEST_IMG("foo blk 2 at 5") @@ -447,7 +369,7 @@ mod tests { tline.advance_last_valid_lsn(Lsn(lsn)); assert_eq!( - tline.get_relsize(TESTREL_A, Lsn(lsn))?, + tline.get_rel_size(TESTREL_A, Lsn(lsn))?, pg_constants::RELSEG_SIZE + 1 ); @@ -456,7 +378,7 @@ mod tests { tline.put_truncation(TESTREL_A, Lsn(lsn), pg_constants::RELSEG_SIZE)?; tline.advance_last_valid_lsn(Lsn(lsn)); assert_eq!( - tline.get_relsize(TESTREL_A, Lsn(lsn))?, + tline.get_rel_size(TESTREL_A, Lsn(lsn))?, pg_constants::RELSEG_SIZE ); @@ -465,7 +387,7 @@ mod tests { tline.put_truncation(TESTREL_A, Lsn(lsn), pg_constants::RELSEG_SIZE - 1)?; tline.advance_last_valid_lsn(Lsn(lsn)); assert_eq!( - tline.get_relsize(TESTREL_A, Lsn(lsn))?, + tline.get_rel_size(TESTREL_A, Lsn(lsn))?, pg_constants::RELSEG_SIZE - 1 ); diff --git a/pageserver/src/repository/rocksdb.rs b/pageserver/src/repository/rocksdb.rs index 6f51fefcc0..c561cd8f07 100644 --- a/pageserver/src/repository/rocksdb.rs +++ b/pageserver/src/repository/rocksdb.rs @@ -7,7 +7,6 @@ use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord}; use crate::restore_local_repo::import_timeline_wal; -use crate::waldecoder::Oid; use crate::walredo::WalRedoManager; use crate::PageServerConf; use crate::ZTimelineId; @@ -18,6 +17,7 @@ use postgres_ffi::pg_constants; use serde::{Deserialize, Serialize}; use std::cmp::min; use std::collections::HashMap; +use std::collections::HashSet; use std::convert::TryInto; use std::str::FromStr; use std::sync::atomic::AtomicU64; @@ -714,7 +714,7 @@ impl Timeline for RocksTimeline { /// /// Get size of relation at given LSN. /// - fn get_relsize(&self, rel: RelTag, lsn: Lsn) -> Result { + fn get_rel_size(&self, rel: RelTag, lsn: Lsn) -> Result { let lsn = self.wait_lsn(lsn)?; self.relsize_get_nowait(rel, lsn) } @@ -723,7 +723,7 @@ impl Timeline for RocksTimeline { /// Does relation exist at given LSN? /// /// FIXME: this actually returns true, if the relation exists at *any* LSN - fn get_relsize_exists(&self, rel: RelTag, req_lsn: Lsn) -> Result { + fn get_rel_exists(&self, rel: RelTag, req_lsn: Lsn) -> Result { let lsn = self.wait_lsn(req_lsn)?; let key = CacheKey { @@ -746,6 +746,46 @@ impl Timeline for RocksTimeline { Ok(false) } + /// Get a list of all distinct relations in given tablespace and database. + /// + /// TODO: This implementation is very inefficient, it scans + /// through all entries in the given database. In practice, this + /// is used for CREATE DATABASE, and usually the template database is small. + /// But if it's not, this will be slow. + fn list_rels<'a>(&'a self, spcnode: u32, dbnode: u32, lsn: Lsn) -> Result> { + trace!("list_rels spcnode {} dbnode {} at {}", spcnode, dbnode, lsn); + + let mut rels: HashSet = HashSet::new(); + + let searchkey = CacheKey { + tag: BufferTag { + rel: RelTag { + spcnode: spcnode, + dbnode: dbnode, + relnode: 0, + forknum: 0u8, + }, + blknum: 0, + }, + lsn: Lsn(0), + }; + let mut iter = self.db.raw_iterator(); + iter.seek(searchkey.ser()?); + while iter.valid() { + let key = CacheKey::des(iter.key().unwrap())?; + if key.tag.rel.spcnode != spcnode || key.tag.rel.dbnode != dbnode { + break; + } + + if key.lsn < lsn { + rels.insert(key.tag.rel); + } + iter.next(); + } + + Ok(rels) + } + // Other public functions, for updating the repository. // These are used by the WAL receiver and WAL redo. @@ -832,51 +872,6 @@ impl Timeline for RocksTimeline { self.num_page_images.fetch_add(1, Ordering::Relaxed); } - fn put_create_database( - &self, - lsn: Lsn, - db_id: Oid, - tablespace_id: Oid, - src_db_id: Oid, - src_tablespace_id: Oid, - ) -> Result<()> { - let key = CacheKey { - tag: BufferTag { - rel: RelTag { - spcnode: src_tablespace_id, - dbnode: src_db_id, - relnode: 0, - forknum: 0u8, - }, - blknum: 0, - }, - lsn: Lsn(0), - }; - let mut iter = self.db.raw_iterator(); - iter.seek(key.ser()?); - let mut n = 0; - while iter.valid() { - let mut key = CacheKey::des(iter.key().unwrap())?; - if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id { - break; - } - - key.tag.rel.spcnode = tablespace_id; - key.tag.rel.dbnode = db_id; - key.lsn = lsn; - - let v = iter.value().unwrap(); - self.db.put(key.ser()?, v)?; - n += 1; - iter.next(); - } - info!( - "Create database {}/{}, copy {} entries", - tablespace_id, db_id, n - ); - Ok(()) - } - /// Remember that WAL has been received and added to the timeline up to the given LSN fn advance_last_valid_lsn(&self, lsn: Lsn) { let old = self.last_valid_lsn.advance(lsn); diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 9610c1b2cb..4cfcdc9898 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -3,7 +3,7 @@ //! zenith repository //! use log::*; -use std::cmp::max; +use std::cmp::{max, min}; use std::fs; use std::fs::File; use std::io::Read; @@ -14,8 +14,9 @@ use std::path::{Path, PathBuf}; use anyhow::Result; use bytes::Bytes; -use crate::repository::{BufferTag, RelTag, Timeline}; -use crate::waldecoder::{decode_wal_record, Oid, WalStreamDecoder}; +use crate::repository::{BufferTag, RelTag, Timeline, WALRecord}; +use crate::waldecoder::{decode_wal_record, DecodedWALRecord, Oid, WalStreamDecoder}; +use crate::waldecoder::{XlCreateDatabase, XlSmgrTruncate}; use crate::PageServerConf; use crate::ZTimelineId; use postgres_ffi::pg_constants; @@ -220,7 +221,7 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: } if let Some((lsn, recdata)) = rec.unwrap() { let decoded = decode_wal_record(recdata.clone()); - timeline.save_decoded_record(decoded, recdata, lsn)?; + save_decoded_record(timeline, decoded, recdata, lsn)?; last_lsn = lsn; } else { break; @@ -241,3 +242,137 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: info!("reached end of WAL at {}", last_lsn); Ok(()) } + +/// +/// Helper function to parse a WAL record and call the Timeline's PUT functions for all the +/// relations/pages that the record affects. +/// +pub fn save_decoded_record( + timeline: &dyn Timeline, + decoded: DecodedWALRecord, + recdata: Bytes, + lsn: Lsn, +) -> Result<()> { + // Figure out which blocks the record applies to, and "put" a separate copy + // of the record for each block. + for blk in decoded.blocks.iter() { + let tag = BufferTag { + rel: RelTag { + spcnode: blk.rnode_spcnode, + dbnode: blk.rnode_dbnode, + relnode: blk.rnode_relnode, + forknum: blk.forknum as u8, + }, + blknum: blk.blkno, + }; + + let rec = WALRecord { + lsn, + will_init: blk.will_init || blk.apply_image, + rec: recdata.clone(), + main_data_offset: decoded.main_data_offset as u32, + }; + + timeline.put_wal_record(tag, rec); + } + + // Handle a few special record types + if decoded.xl_rmid == pg_constants::RM_SMGR_ID + && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_SMGR_TRUNCATE + { + let truncate = XlSmgrTruncate::decode(&decoded); + if (truncate.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 { + let rel = RelTag { + spcnode: truncate.rnode.spcnode, + dbnode: truncate.rnode.dbnode, + relnode: truncate.rnode.relnode, + forknum: pg_constants::MAIN_FORKNUM, + }; + timeline.put_truncation(rel, lsn, truncate.blkno)?; + } + } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID + && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_DBASE_CREATE + { + let createdb = XlCreateDatabase::decode(&decoded); + save_create_database( + timeline, + lsn, + createdb.db_id, + createdb.tablespace_id, + createdb.src_db_id, + createdb.src_tablespace_id, + )?; + } + + // Now that this record has been handled, let the repository know that + // it is up-to-date to this LSN + timeline.advance_last_record_lsn(lsn); + Ok(()) +} + +/// Subroutine of save_decoded_record(), to handle an XLOG_DBASE_CREATE record. +fn save_create_database( + timeline: &dyn Timeline, + lsn: Lsn, + db_id: Oid, + tablespace_id: Oid, + src_db_id: Oid, + src_tablespace_id: Oid, +) -> Result<()> { + // Creating a database is implemented by copying the template (aka. source) database. + // To copy all the relations, we need to ask for the state as of the same LSN, but we + // cannot pass 'lsn' to the Timeline.get_* functions, or they will block waiting for + // the last valid LSN to advance up to it. So we use the previous record's LSN in the + // get calls instead. + let req_lsn = min(timeline.get_last_record_lsn(), lsn); + + let rels = timeline.list_rels(src_tablespace_id, src_db_id, req_lsn)?; + + info!("creatdb: {} rels", rels.len()); + + let mut num_rels_copied = 0; + let mut num_blocks_copied = 0; + for src_rel in rels { + assert_eq!(src_rel.spcnode, src_tablespace_id); + assert_eq!(src_rel.dbnode, src_db_id); + + let nblocks = timeline.get_rel_size(src_rel, req_lsn)?; + let dst_rel = RelTag { + spcnode: tablespace_id, + dbnode: db_id, + relnode: src_rel.relnode, + forknum: src_rel.forknum, + }; + + // Copy content + for blknum in 0..nblocks { + let src_key = BufferTag { + rel: src_rel, + blknum, + }; + let dst_key = BufferTag { + rel: dst_rel, + blknum, + }; + + let content = timeline.get_page_at_lsn(src_key, req_lsn)?; + + info!("copying block {:?} to {:?}", src_key, dst_key); + + timeline.put_page_image(dst_key, lsn, content); + num_blocks_copied += 1; + } + + if nblocks == 0 { + // make sure we have some trace of the relation, even if it's empty + timeline.put_truncation(dst_rel, lsn, 0)?; + } + + num_rels_copied += 1; + } + info!( + "Created database {}/{}, copied {} blocks in {} rels at {}", + tablespace_id, db_id, num_blocks_copied, num_rels_copied, lsn + ); + Ok(()) +} diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index dbdf69a03e..3110ab26f4 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -7,6 +7,7 @@ //! use crate::page_cache; +use crate::restore_local_repo; use crate::waldecoder::*; use crate::PageServerConf; use crate::ZTimelineId; @@ -184,7 +185,7 @@ fn walreceiver_main( while let Some((lsn, recdata)) = waldecoder.poll_decode()? { let decoded = decode_wal_record(recdata.clone()); - timeline.save_decoded_record(decoded, recdata, lsn)?; + restore_local_repo::save_decoded_record(&*timeline, decoded, recdata, lsn)?; // Now that this record has been handled, let the page cache know that // it is up-to-date to this LSN