diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 797ee9f436..7b4b05ed18 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -10,7 +10,7 @@ use crate::keyspace::{KeySpace, KeySpaceAccum}; use crate::repository::*; use crate::tenant::Timeline; use crate::walrecord::NeonWalRecord; -use anyhow::{bail, ensure, Context, Result}; +use anyhow::{self, bail, ensure, Context}; use bytes::{Buf, Bytes}; use pageserver_api::reltag::{RelTag, SlruKind}; use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; @@ -97,7 +97,7 @@ impl Timeline { blknum: BlockNumber, lsn: Lsn, latest: bool, - ) -> Result { + ) -> anyhow::Result { ensure!(tag.relnode != 0, "invalid relnode"); let nblocks = self.get_rel_size(tag, lsn, latest)?; @@ -114,7 +114,13 @@ impl Timeline { } // Get size of a database in blocks - pub fn get_db_size(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn, latest: bool) -> Result { + pub fn get_db_size( + &self, + spcnode: Oid, + dbnode: Oid, + lsn: Lsn, + latest: bool, + ) -> anyhow::Result { let mut total_blocks = 0; let rels = self.list_rels(spcnode, dbnode, lsn)?; @@ -127,7 +133,7 @@ impl Timeline { } /// Get size of a relation file - pub fn get_rel_size(&self, tag: RelTag, lsn: Lsn, latest: bool) -> Result { + pub fn get_rel_size(&self, tag: RelTag, lsn: Lsn, latest: bool) -> anyhow::Result { ensure!(tag.relnode != 0, "invalid relnode"); if let Some(nblocks) = self.get_cached_rel_size(&tag, lsn) { @@ -162,7 +168,7 @@ impl Timeline { } /// Does relation exist? - pub fn get_rel_exists(&self, tag: RelTag, lsn: Lsn, _latest: bool) -> Result { + pub fn get_rel_exists(&self, tag: RelTag, lsn: Lsn, _latest: bool) -> anyhow::Result { ensure!(tag.relnode != 0, "invalid relnode"); // first try to lookup relation in cache @@ -180,7 +186,12 @@ impl Timeline { } /// Get a list of all existing relations in given tablespace and database. - pub fn list_rels(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result> { + pub fn list_rels( + &self, + spcnode: Oid, + dbnode: Oid, + lsn: Lsn, + ) -> anyhow::Result> { // fetch directory listing let key = rel_dir_to_key(spcnode, dbnode); let buf = self.get(key, lsn)?; @@ -204,7 +215,7 @@ impl Timeline { segno: u32, blknum: BlockNumber, lsn: Lsn, - ) -> Result { + ) -> anyhow::Result { let key = slru_block_to_key(kind, segno, blknum); self.get(key, lsn) } @@ -215,14 +226,19 @@ impl Timeline { kind: SlruKind, segno: u32, lsn: Lsn, - ) -> Result { + ) -> anyhow::Result { let key = slru_segment_size_to_key(kind, segno); let mut buf = self.get(key, lsn)?; Ok(buf.get_u32_le()) } /// Get size of an SLRU segment - pub fn get_slru_segment_exists(&self, kind: SlruKind, segno: u32, lsn: Lsn) -> Result { + pub fn get_slru_segment_exists( + &self, + kind: SlruKind, + segno: u32, + lsn: Lsn, + ) -> anyhow::Result { // fetch directory listing let key = slru_dir_to_key(kind); let buf = self.get(key, lsn)?; @@ -239,7 +255,10 @@ impl Timeline { /// so it's not well defined which LSN you get if there were multiple commits /// "in flight" at that point in time. /// - pub fn find_lsn_for_timestamp(&self, search_timestamp: TimestampTz) -> Result { + pub fn find_lsn_for_timestamp( + &self, + search_timestamp: TimestampTz, + ) -> anyhow::Result { let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn(); let min_lsn = *gc_cutoff_lsn_guard; let max_lsn = self.get_last_record_lsn(); @@ -308,7 +327,7 @@ impl Timeline { probe_lsn: Lsn, found_smaller: &mut bool, found_larger: &mut bool, - ) -> Result { + ) -> anyhow::Result { for segno in self.list_slru_segments(SlruKind::Clog, probe_lsn)? { let nblocks = self.get_slru_segment_size(SlruKind::Clog, segno, probe_lsn)?; for blknum in (0..nblocks).rev() { @@ -333,7 +352,7 @@ impl Timeline { } /// Get a list of SLRU segments - pub fn list_slru_segments(&self, kind: SlruKind, lsn: Lsn) -> Result> { + pub fn list_slru_segments(&self, kind: SlruKind, lsn: Lsn) -> anyhow::Result> { // fetch directory entry let key = slru_dir_to_key(kind); @@ -343,14 +362,14 @@ impl Timeline { Ok(dir.segments) } - pub fn get_relmap_file(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result { + pub fn get_relmap_file(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> anyhow::Result { let key = relmap_file_key(spcnode, dbnode); let buf = self.get(key, lsn)?; Ok(buf) } - pub fn list_dbdirs(&self, lsn: Lsn) -> Result> { + pub fn list_dbdirs(&self, lsn: Lsn) -> anyhow::Result> { // fetch directory entry let buf = self.get(DBDIR_KEY, lsn)?; let dir = DbDirectory::des(&buf)?; @@ -358,13 +377,13 @@ impl Timeline { Ok(dir.dbdirs) } - pub fn get_twophase_file(&self, xid: TransactionId, lsn: Lsn) -> Result { + pub fn get_twophase_file(&self, xid: TransactionId, lsn: Lsn) -> anyhow::Result { let key = twophase_file_key(xid); let buf = self.get(key, lsn)?; Ok(buf) } - pub fn list_twophase_files(&self, lsn: Lsn) -> Result> { + pub fn list_twophase_files(&self, lsn: Lsn) -> anyhow::Result> { // fetch directory entry let buf = self.get(TWOPHASEDIR_KEY, lsn)?; let dir = TwoPhaseDirectory::des(&buf)?; @@ -372,11 +391,11 @@ impl Timeline { Ok(dir.xids) } - pub fn get_control_file(&self, lsn: Lsn) -> Result { + pub fn get_control_file(&self, lsn: Lsn) -> anyhow::Result { self.get(CONTROLFILE_KEY, lsn) } - pub fn get_checkpoint(&self, lsn: Lsn) -> Result { + pub fn get_checkpoint(&self, lsn: Lsn) -> anyhow::Result { self.get(CHECKPOINT_KEY, lsn) } @@ -414,7 +433,7 @@ impl Timeline { /// Get a KeySpace that covers all the Keys that are in use at the given LSN. /// Anything that's not listed maybe removed from the underlying storage (from /// that LSN forwards). - pub fn collect_keyspace(&self, lsn: Lsn) -> Result { + pub fn collect_keyspace(&self, lsn: Lsn) -> anyhow::Result { // Iterate through key ranges, greedily packing them into partitions let mut result = KeySpaceAccum::new(); @@ -553,7 +572,7 @@ impl<'a> DatadirModification<'a> { /// /// This inserts the directory metadata entries that are assumed to /// always exist. - pub fn init_empty(&mut self) -> Result<()> { + pub fn init_empty(&mut self) -> anyhow::Result<()> { let buf = DbDirectory::ser(&DbDirectory { dbdirs: HashMap::new(), })?; @@ -586,7 +605,7 @@ impl<'a> DatadirModification<'a> { rel: RelTag, blknum: BlockNumber, rec: NeonWalRecord, - ) -> Result<()> { + ) -> anyhow::Result<()> { ensure!(rel.relnode != 0, "invalid relnode"); self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec)); Ok(()) @@ -599,7 +618,7 @@ impl<'a> DatadirModification<'a> { segno: u32, blknum: BlockNumber, rec: NeonWalRecord, - ) -> Result<()> { + ) -> anyhow::Result<()> { self.put( slru_block_to_key(kind, segno, blknum), Value::WalRecord(rec), @@ -613,7 +632,7 @@ impl<'a> DatadirModification<'a> { rel: RelTag, blknum: BlockNumber, img: Bytes, - ) -> Result<()> { + ) -> anyhow::Result<()> { ensure!(rel.relnode != 0, "invalid relnode"); self.put(rel_block_to_key(rel, blknum), Value::Image(img)); Ok(()) @@ -625,13 +644,13 @@ impl<'a> DatadirModification<'a> { segno: u32, blknum: BlockNumber, img: Bytes, - ) -> Result<()> { + ) -> anyhow::Result<()> { self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img)); Ok(()) } /// Store a relmapper file (pg_filenode.map) in the repository - pub fn put_relmap_file(&mut self, spcnode: Oid, dbnode: Oid, img: Bytes) -> Result<()> { + pub fn put_relmap_file(&mut self, spcnode: Oid, dbnode: Oid, img: Bytes) -> anyhow::Result<()> { // Add it to the directory (if it doesn't exist already) let buf = self.get(DBDIR_KEY)?; let mut dbdir = DbDirectory::des(&buf)?; @@ -659,7 +678,7 @@ impl<'a> DatadirModification<'a> { Ok(()) } - pub fn put_twophase_file(&mut self, xid: TransactionId, img: Bytes) -> Result<()> { + pub fn put_twophase_file(&mut self, xid: TransactionId, img: Bytes) -> anyhow::Result<()> { // Add it to the directory entry let buf = self.get(TWOPHASEDIR_KEY)?; let mut dir = TwoPhaseDirectory::des(&buf)?; @@ -675,17 +694,17 @@ impl<'a> DatadirModification<'a> { Ok(()) } - pub fn put_control_file(&mut self, img: Bytes) -> Result<()> { + pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> { self.put(CONTROLFILE_KEY, Value::Image(img)); Ok(()) } - pub fn put_checkpoint(&mut self, img: Bytes) -> Result<()> { + pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> { self.put(CHECKPOINT_KEY, Value::Image(img)); Ok(()) } - pub fn drop_dbdir(&mut self, spcnode: Oid, dbnode: Oid) -> Result<()> { + pub fn drop_dbdir(&mut self, spcnode: Oid, dbnode: Oid) -> anyhow::Result<()> { let req_lsn = self.tline.get_last_record_lsn(); let total_blocks = self.tline.get_db_size(spcnode, dbnode, req_lsn, true)?; @@ -714,7 +733,7 @@ impl<'a> DatadirModification<'a> { /// Create a relation fork. /// /// 'nblocks' is the initial size. - pub fn put_rel_creation(&mut self, rel: RelTag, nblocks: BlockNumber) -> Result<()> { + pub fn put_rel_creation(&mut self, rel: RelTag, nblocks: BlockNumber) -> anyhow::Result<()> { ensure!(rel.relnode != 0, "invalid relnode"); // It's possible that this is the first rel for this db in this // tablespace. Create the reldir entry for it if so. @@ -758,7 +777,7 @@ impl<'a> DatadirModification<'a> { } /// Truncate relation - pub fn put_rel_truncation(&mut self, rel: RelTag, nblocks: BlockNumber) -> Result<()> { + pub fn put_rel_truncation(&mut self, rel: RelTag, nblocks: BlockNumber) -> anyhow::Result<()> { ensure!(rel.relnode != 0, "invalid relnode"); let last_lsn = self.tline.get_last_record_lsn(); if self.tline.get_rel_exists(rel, last_lsn, true)? { @@ -784,7 +803,7 @@ impl<'a> DatadirModification<'a> { /// Extend relation /// If new size is smaller, do nothing. - pub fn put_rel_extend(&mut self, rel: RelTag, nblocks: BlockNumber) -> Result<()> { + pub fn put_rel_extend(&mut self, rel: RelTag, nblocks: BlockNumber) -> anyhow::Result<()> { ensure!(rel.relnode != 0, "invalid relnode"); // Put size @@ -805,7 +824,7 @@ impl<'a> DatadirModification<'a> { } /// Drop a relation. - pub fn put_rel_drop(&mut self, rel: RelTag) -> Result<()> { + pub fn put_rel_drop(&mut self, rel: RelTag) -> anyhow::Result<()> { ensure!(rel.relnode != 0, "invalid relnode"); // Remove it from the directory entry @@ -838,7 +857,7 @@ impl<'a> DatadirModification<'a> { kind: SlruKind, segno: u32, nblocks: BlockNumber, - ) -> Result<()> { + ) -> anyhow::Result<()> { // Add it to the directory entry let dir_key = slru_dir_to_key(kind); let buf = self.get(dir_key)?; @@ -868,7 +887,7 @@ impl<'a> DatadirModification<'a> { kind: SlruKind, segno: u32, nblocks: BlockNumber, - ) -> Result<()> { + ) -> anyhow::Result<()> { // Put size let size_key = slru_segment_size_to_key(kind, segno); let buf = nblocks.to_le_bytes(); @@ -877,7 +896,7 @@ impl<'a> DatadirModification<'a> { } /// This method is used for marking truncated SLRU files - pub fn drop_slru_segment(&mut self, kind: SlruKind, segno: u32) -> Result<()> { + pub fn drop_slru_segment(&mut self, kind: SlruKind, segno: u32) -> anyhow::Result<()> { // Remove it from the directory entry let dir_key = slru_dir_to_key(kind); let buf = self.get(dir_key)?; @@ -898,13 +917,13 @@ impl<'a> DatadirModification<'a> { } /// Drop a relmapper file (pg_filenode.map) - pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> Result<()> { + pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> { // TODO Ok(()) } /// This method is used for marking truncated SLRU files - pub fn drop_twophase_file(&mut self, xid: TransactionId) -> Result<()> { + pub fn drop_twophase_file(&mut self, xid: TransactionId) -> anyhow::Result<()> { // Remove it from the directory entry let buf = self.get(TWOPHASEDIR_KEY)?; let mut dir = TwoPhaseDirectory::des(&buf)?; @@ -941,7 +960,7 @@ impl<'a> DatadirModification<'a> { /// retains all the metadata, but data pages are flushed. That's again OK /// for bulk import, where you are just loading data pages and won't try to /// modify the same pages twice. - pub fn flush(&mut self) -> Result<()> { + pub fn flush(&mut self) -> anyhow::Result<()> { // Unless we have accumulated a decent amount of changes, it's not worth it // to scan through the pending_updates list. let pending_nblocks = self.pending_nblocks; @@ -952,7 +971,7 @@ impl<'a> DatadirModification<'a> { let writer = self.tline.writer(); // Flush relation and SLRU data blocks, keep metadata. - let mut result: Result<()> = Ok(()); + let mut result: anyhow::Result<()> = Ok(()); self.pending_updates.retain(|&key, value| { if result.is_ok() && (is_rel_block_key(key) || is_slru_block_key(key)) { result = writer.put(key, self.lsn, value); @@ -1000,7 +1019,7 @@ impl<'a> DatadirModification<'a> { // Internal helper functions to batch the modifications - fn get(&self, key: Key) -> Result { + fn get(&self, key: Key) -> anyhow::Result { // Have we already updated the same key? Read the pending updated // version in that case. // @@ -1370,7 +1389,7 @@ const CHECKPOINT_KEY: Key = Key { // Reverse mappings for a few Keys. // These are needed by WAL redo manager. -pub fn key_to_rel_block(key: Key) -> Result<(RelTag, BlockNumber)> { +pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> { Ok(match key.field1 { 0x00 => ( RelTag { @@ -1400,7 +1419,7 @@ pub fn is_rel_vm_block_key(key: Key) -> bool { && key.field6 != 0xffffffff } -pub fn key_to_slru_block(key: Key) -> Result<(SlruKind, u32, BlockNumber)> { +pub fn key_to_slru_block(key: Key) -> anyhow::Result<(SlruKind, u32, BlockNumber)> { Ok(match key.field1 { 0x01 => { let kind = match key.field2 { @@ -1429,7 +1448,7 @@ pub fn create_test_timeline( tenant: &crate::tenant::Tenant, timeline_id: utils::id::TimelineId, pg_version: u32, -) -> Result> { +) -> anyhow::Result> { let tline = tenant .create_empty_timeline(timeline_id, Lsn(8), pg_version)? .initialize()?;