mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 14:02:55 +00:00
no-op: pgdatadir_mapping: qualified use of anyhow::Result
This commit is contained in:
committed by
Christian Schwarz
parent
749a2f00d7
commit
2460987328
@@ -10,7 +10,7 @@ use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::repository::*;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use anyhow::{self, bail, ensure, Context};
|
||||
use bytes::{Buf, Bytes};
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
@@ -97,7 +97,7 @@ impl Timeline {
|
||||
blknum: BlockNumber,
|
||||
lsn: Lsn,
|
||||
latest: bool,
|
||||
) -> Result<Bytes> {
|
||||
) -> anyhow::Result<Bytes> {
|
||||
ensure!(tag.relnode != 0, "invalid relnode");
|
||||
|
||||
let nblocks = self.get_rel_size(tag, lsn, latest)?;
|
||||
@@ -114,7 +114,13 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Get size of a database in blocks
|
||||
pub fn get_db_size(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn, latest: bool) -> Result<usize> {
|
||||
pub fn get_db_size(
|
||||
&self,
|
||||
spcnode: Oid,
|
||||
dbnode: Oid,
|
||||
lsn: Lsn,
|
||||
latest: bool,
|
||||
) -> anyhow::Result<usize> {
|
||||
let mut total_blocks = 0;
|
||||
|
||||
let rels = self.list_rels(spcnode, dbnode, lsn)?;
|
||||
@@ -127,7 +133,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Get size of a relation file
|
||||
pub fn get_rel_size(&self, tag: RelTag, lsn: Lsn, latest: bool) -> Result<BlockNumber> {
|
||||
pub fn get_rel_size(&self, tag: RelTag, lsn: Lsn, latest: bool) -> anyhow::Result<BlockNumber> {
|
||||
ensure!(tag.relnode != 0, "invalid relnode");
|
||||
|
||||
if let Some(nblocks) = self.get_cached_rel_size(&tag, lsn) {
|
||||
@@ -162,7 +168,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Does relation exist?
|
||||
pub fn get_rel_exists(&self, tag: RelTag, lsn: Lsn, _latest: bool) -> Result<bool> {
|
||||
pub fn get_rel_exists(&self, tag: RelTag, lsn: Lsn, _latest: bool) -> anyhow::Result<bool> {
|
||||
ensure!(tag.relnode != 0, "invalid relnode");
|
||||
|
||||
// first try to lookup relation in cache
|
||||
@@ -180,7 +186,12 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Get a list of all existing relations in given tablespace and database.
|
||||
pub fn list_rels(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result<HashSet<RelTag>> {
|
||||
pub fn list_rels(
|
||||
&self,
|
||||
spcnode: Oid,
|
||||
dbnode: Oid,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<HashSet<RelTag>> {
|
||||
// fetch directory listing
|
||||
let key = rel_dir_to_key(spcnode, dbnode);
|
||||
let buf = self.get(key, lsn)?;
|
||||
@@ -204,7 +215,7 @@ impl Timeline {
|
||||
segno: u32,
|
||||
blknum: BlockNumber,
|
||||
lsn: Lsn,
|
||||
) -> Result<Bytes> {
|
||||
) -> anyhow::Result<Bytes> {
|
||||
let key = slru_block_to_key(kind, segno, blknum);
|
||||
self.get(key, lsn)
|
||||
}
|
||||
@@ -215,14 +226,19 @@ impl Timeline {
|
||||
kind: SlruKind,
|
||||
segno: u32,
|
||||
lsn: Lsn,
|
||||
) -> Result<BlockNumber> {
|
||||
) -> anyhow::Result<BlockNumber> {
|
||||
let key = slru_segment_size_to_key(kind, segno);
|
||||
let mut buf = self.get(key, lsn)?;
|
||||
Ok(buf.get_u32_le())
|
||||
}
|
||||
|
||||
/// Get size of an SLRU segment
|
||||
pub fn get_slru_segment_exists(&self, kind: SlruKind, segno: u32, lsn: Lsn) -> Result<bool> {
|
||||
pub fn get_slru_segment_exists(
|
||||
&self,
|
||||
kind: SlruKind,
|
||||
segno: u32,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<bool> {
|
||||
// fetch directory listing
|
||||
let key = slru_dir_to_key(kind);
|
||||
let buf = self.get(key, lsn)?;
|
||||
@@ -239,7 +255,10 @@ impl Timeline {
|
||||
/// so it's not well defined which LSN you get if there were multiple commits
|
||||
/// "in flight" at that point in time.
|
||||
///
|
||||
pub fn find_lsn_for_timestamp(&self, search_timestamp: TimestampTz) -> Result<LsnForTimestamp> {
|
||||
pub fn find_lsn_for_timestamp(
|
||||
&self,
|
||||
search_timestamp: TimestampTz,
|
||||
) -> anyhow::Result<LsnForTimestamp> {
|
||||
let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
|
||||
let min_lsn = *gc_cutoff_lsn_guard;
|
||||
let max_lsn = self.get_last_record_lsn();
|
||||
@@ -308,7 +327,7 @@ impl Timeline {
|
||||
probe_lsn: Lsn,
|
||||
found_smaller: &mut bool,
|
||||
found_larger: &mut bool,
|
||||
) -> Result<bool> {
|
||||
) -> anyhow::Result<bool> {
|
||||
for segno in self.list_slru_segments(SlruKind::Clog, probe_lsn)? {
|
||||
let nblocks = self.get_slru_segment_size(SlruKind::Clog, segno, probe_lsn)?;
|
||||
for blknum in (0..nblocks).rev() {
|
||||
@@ -333,7 +352,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Get a list of SLRU segments
|
||||
pub fn list_slru_segments(&self, kind: SlruKind, lsn: Lsn) -> Result<HashSet<u32>> {
|
||||
pub fn list_slru_segments(&self, kind: SlruKind, lsn: Lsn) -> anyhow::Result<HashSet<u32>> {
|
||||
// fetch directory entry
|
||||
let key = slru_dir_to_key(kind);
|
||||
|
||||
@@ -343,14 +362,14 @@ impl Timeline {
|
||||
Ok(dir.segments)
|
||||
}
|
||||
|
||||
pub fn get_relmap_file(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result<Bytes> {
|
||||
pub fn get_relmap_file(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> anyhow::Result<Bytes> {
|
||||
let key = relmap_file_key(spcnode, dbnode);
|
||||
|
||||
let buf = self.get(key, lsn)?;
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
pub fn list_dbdirs(&self, lsn: Lsn) -> Result<HashMap<(Oid, Oid), bool>> {
|
||||
pub fn list_dbdirs(&self, lsn: Lsn) -> anyhow::Result<HashMap<(Oid, Oid), bool>> {
|
||||
// fetch directory entry
|
||||
let buf = self.get(DBDIR_KEY, lsn)?;
|
||||
let dir = DbDirectory::des(&buf)?;
|
||||
@@ -358,13 +377,13 @@ impl Timeline {
|
||||
Ok(dir.dbdirs)
|
||||
}
|
||||
|
||||
pub fn get_twophase_file(&self, xid: TransactionId, lsn: Lsn) -> Result<Bytes> {
|
||||
pub fn get_twophase_file(&self, xid: TransactionId, lsn: Lsn) -> anyhow::Result<Bytes> {
|
||||
let key = twophase_file_key(xid);
|
||||
let buf = self.get(key, lsn)?;
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
pub fn list_twophase_files(&self, lsn: Lsn) -> Result<HashSet<TransactionId>> {
|
||||
pub fn list_twophase_files(&self, lsn: Lsn) -> anyhow::Result<HashSet<TransactionId>> {
|
||||
// fetch directory entry
|
||||
let buf = self.get(TWOPHASEDIR_KEY, lsn)?;
|
||||
let dir = TwoPhaseDirectory::des(&buf)?;
|
||||
@@ -372,11 +391,11 @@ impl Timeline {
|
||||
Ok(dir.xids)
|
||||
}
|
||||
|
||||
pub fn get_control_file(&self, lsn: Lsn) -> Result<Bytes> {
|
||||
pub fn get_control_file(&self, lsn: Lsn) -> anyhow::Result<Bytes> {
|
||||
self.get(CONTROLFILE_KEY, lsn)
|
||||
}
|
||||
|
||||
pub fn get_checkpoint(&self, lsn: Lsn) -> Result<Bytes> {
|
||||
pub fn get_checkpoint(&self, lsn: Lsn) -> anyhow::Result<Bytes> {
|
||||
self.get(CHECKPOINT_KEY, lsn)
|
||||
}
|
||||
|
||||
@@ -414,7 +433,7 @@ impl Timeline {
|
||||
/// Get a KeySpace that covers all the Keys that are in use at the given LSN.
|
||||
/// Anything that's not listed maybe removed from the underlying storage (from
|
||||
/// that LSN forwards).
|
||||
pub fn collect_keyspace(&self, lsn: Lsn) -> Result<KeySpace> {
|
||||
pub fn collect_keyspace(&self, lsn: Lsn) -> anyhow::Result<KeySpace> {
|
||||
// Iterate through key ranges, greedily packing them into partitions
|
||||
let mut result = KeySpaceAccum::new();
|
||||
|
||||
@@ -553,7 +572,7 @@ impl<'a> DatadirModification<'a> {
|
||||
///
|
||||
/// This inserts the directory metadata entries that are assumed to
|
||||
/// always exist.
|
||||
pub fn init_empty(&mut self) -> Result<()> {
|
||||
pub fn init_empty(&mut self) -> anyhow::Result<()> {
|
||||
let buf = DbDirectory::ser(&DbDirectory {
|
||||
dbdirs: HashMap::new(),
|
||||
})?;
|
||||
@@ -586,7 +605,7 @@ impl<'a> DatadirModification<'a> {
|
||||
rel: RelTag,
|
||||
blknum: BlockNumber,
|
||||
rec: NeonWalRecord,
|
||||
) -> Result<()> {
|
||||
) -> anyhow::Result<()> {
|
||||
ensure!(rel.relnode != 0, "invalid relnode");
|
||||
self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
|
||||
Ok(())
|
||||
@@ -599,7 +618,7 @@ impl<'a> DatadirModification<'a> {
|
||||
segno: u32,
|
||||
blknum: BlockNumber,
|
||||
rec: NeonWalRecord,
|
||||
) -> Result<()> {
|
||||
) -> anyhow::Result<()> {
|
||||
self.put(
|
||||
slru_block_to_key(kind, segno, blknum),
|
||||
Value::WalRecord(rec),
|
||||
@@ -613,7 +632,7 @@ impl<'a> DatadirModification<'a> {
|
||||
rel: RelTag,
|
||||
blknum: BlockNumber,
|
||||
img: Bytes,
|
||||
) -> Result<()> {
|
||||
) -> anyhow::Result<()> {
|
||||
ensure!(rel.relnode != 0, "invalid relnode");
|
||||
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
|
||||
Ok(())
|
||||
@@ -625,13 +644,13 @@ impl<'a> DatadirModification<'a> {
|
||||
segno: u32,
|
||||
blknum: BlockNumber,
|
||||
img: Bytes,
|
||||
) -> Result<()> {
|
||||
) -> anyhow::Result<()> {
|
||||
self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Store a relmapper file (pg_filenode.map) in the repository
|
||||
pub fn put_relmap_file(&mut self, spcnode: Oid, dbnode: Oid, img: Bytes) -> Result<()> {
|
||||
pub fn put_relmap_file(&mut self, spcnode: Oid, dbnode: Oid, img: Bytes) -> anyhow::Result<()> {
|
||||
// Add it to the directory (if it doesn't exist already)
|
||||
let buf = self.get(DBDIR_KEY)?;
|
||||
let mut dbdir = DbDirectory::des(&buf)?;
|
||||
@@ -659,7 +678,7 @@ impl<'a> DatadirModification<'a> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn put_twophase_file(&mut self, xid: TransactionId, img: Bytes) -> Result<()> {
|
||||
pub fn put_twophase_file(&mut self, xid: TransactionId, img: Bytes) -> anyhow::Result<()> {
|
||||
// Add it to the directory entry
|
||||
let buf = self.get(TWOPHASEDIR_KEY)?;
|
||||
let mut dir = TwoPhaseDirectory::des(&buf)?;
|
||||
@@ -675,17 +694,17 @@ impl<'a> DatadirModification<'a> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn put_control_file(&mut self, img: Bytes) -> Result<()> {
|
||||
pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
|
||||
self.put(CONTROLFILE_KEY, Value::Image(img));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn put_checkpoint(&mut self, img: Bytes) -> Result<()> {
|
||||
pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
|
||||
self.put(CHECKPOINT_KEY, Value::Image(img));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn drop_dbdir(&mut self, spcnode: Oid, dbnode: Oid) -> Result<()> {
|
||||
pub fn drop_dbdir(&mut self, spcnode: Oid, dbnode: Oid) -> anyhow::Result<()> {
|
||||
let req_lsn = self.tline.get_last_record_lsn();
|
||||
|
||||
let total_blocks = self.tline.get_db_size(spcnode, dbnode, req_lsn, true)?;
|
||||
@@ -714,7 +733,7 @@ impl<'a> DatadirModification<'a> {
|
||||
/// Create a relation fork.
|
||||
///
|
||||
/// 'nblocks' is the initial size.
|
||||
pub fn put_rel_creation(&mut self, rel: RelTag, nblocks: BlockNumber) -> Result<()> {
|
||||
pub fn put_rel_creation(&mut self, rel: RelTag, nblocks: BlockNumber) -> anyhow::Result<()> {
|
||||
ensure!(rel.relnode != 0, "invalid relnode");
|
||||
// It's possible that this is the first rel for this db in this
|
||||
// tablespace. Create the reldir entry for it if so.
|
||||
@@ -758,7 +777,7 @@ impl<'a> DatadirModification<'a> {
|
||||
}
|
||||
|
||||
/// Truncate relation
|
||||
pub fn put_rel_truncation(&mut self, rel: RelTag, nblocks: BlockNumber) -> Result<()> {
|
||||
pub fn put_rel_truncation(&mut self, rel: RelTag, nblocks: BlockNumber) -> anyhow::Result<()> {
|
||||
ensure!(rel.relnode != 0, "invalid relnode");
|
||||
let last_lsn = self.tline.get_last_record_lsn();
|
||||
if self.tline.get_rel_exists(rel, last_lsn, true)? {
|
||||
@@ -784,7 +803,7 @@ impl<'a> DatadirModification<'a> {
|
||||
|
||||
/// Extend relation
|
||||
/// If new size is smaller, do nothing.
|
||||
pub fn put_rel_extend(&mut self, rel: RelTag, nblocks: BlockNumber) -> Result<()> {
|
||||
pub fn put_rel_extend(&mut self, rel: RelTag, nblocks: BlockNumber) -> anyhow::Result<()> {
|
||||
ensure!(rel.relnode != 0, "invalid relnode");
|
||||
|
||||
// Put size
|
||||
@@ -805,7 +824,7 @@ impl<'a> DatadirModification<'a> {
|
||||
}
|
||||
|
||||
/// Drop a relation.
|
||||
pub fn put_rel_drop(&mut self, rel: RelTag) -> Result<()> {
|
||||
pub fn put_rel_drop(&mut self, rel: RelTag) -> anyhow::Result<()> {
|
||||
ensure!(rel.relnode != 0, "invalid relnode");
|
||||
|
||||
// Remove it from the directory entry
|
||||
@@ -838,7 +857,7 @@ impl<'a> DatadirModification<'a> {
|
||||
kind: SlruKind,
|
||||
segno: u32,
|
||||
nblocks: BlockNumber,
|
||||
) -> Result<()> {
|
||||
) -> anyhow::Result<()> {
|
||||
// Add it to the directory entry
|
||||
let dir_key = slru_dir_to_key(kind);
|
||||
let buf = self.get(dir_key)?;
|
||||
@@ -868,7 +887,7 @@ impl<'a> DatadirModification<'a> {
|
||||
kind: SlruKind,
|
||||
segno: u32,
|
||||
nblocks: BlockNumber,
|
||||
) -> Result<()> {
|
||||
) -> anyhow::Result<()> {
|
||||
// Put size
|
||||
let size_key = slru_segment_size_to_key(kind, segno);
|
||||
let buf = nblocks.to_le_bytes();
|
||||
@@ -877,7 +896,7 @@ impl<'a> DatadirModification<'a> {
|
||||
}
|
||||
|
||||
/// This method is used for marking truncated SLRU files
|
||||
pub fn drop_slru_segment(&mut self, kind: SlruKind, segno: u32) -> Result<()> {
|
||||
pub fn drop_slru_segment(&mut self, kind: SlruKind, segno: u32) -> anyhow::Result<()> {
|
||||
// Remove it from the directory entry
|
||||
let dir_key = slru_dir_to_key(kind);
|
||||
let buf = self.get(dir_key)?;
|
||||
@@ -898,13 +917,13 @@ impl<'a> DatadirModification<'a> {
|
||||
}
|
||||
|
||||
/// Drop a relmapper file (pg_filenode.map)
|
||||
pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> Result<()> {
|
||||
pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
|
||||
// TODO
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// This method is used for marking truncated SLRU files
|
||||
pub fn drop_twophase_file(&mut self, xid: TransactionId) -> Result<()> {
|
||||
pub fn drop_twophase_file(&mut self, xid: TransactionId) -> anyhow::Result<()> {
|
||||
// Remove it from the directory entry
|
||||
let buf = self.get(TWOPHASEDIR_KEY)?;
|
||||
let mut dir = TwoPhaseDirectory::des(&buf)?;
|
||||
@@ -941,7 +960,7 @@ impl<'a> DatadirModification<'a> {
|
||||
/// retains all the metadata, but data pages are flushed. That's again OK
|
||||
/// for bulk import, where you are just loading data pages and won't try to
|
||||
/// modify the same pages twice.
|
||||
pub fn flush(&mut self) -> Result<()> {
|
||||
pub fn flush(&mut self) -> anyhow::Result<()> {
|
||||
// Unless we have accumulated a decent amount of changes, it's not worth it
|
||||
// to scan through the pending_updates list.
|
||||
let pending_nblocks = self.pending_nblocks;
|
||||
@@ -952,7 +971,7 @@ impl<'a> DatadirModification<'a> {
|
||||
let writer = self.tline.writer();
|
||||
|
||||
// Flush relation and SLRU data blocks, keep metadata.
|
||||
let mut result: Result<()> = Ok(());
|
||||
let mut result: anyhow::Result<()> = Ok(());
|
||||
self.pending_updates.retain(|&key, value| {
|
||||
if result.is_ok() && (is_rel_block_key(key) || is_slru_block_key(key)) {
|
||||
result = writer.put(key, self.lsn, value);
|
||||
@@ -1000,7 +1019,7 @@ impl<'a> DatadirModification<'a> {
|
||||
|
||||
// Internal helper functions to batch the modifications
|
||||
|
||||
fn get(&self, key: Key) -> Result<Bytes> {
|
||||
fn get(&self, key: Key) -> anyhow::Result<Bytes> {
|
||||
// Have we already updated the same key? Read the pending updated
|
||||
// version in that case.
|
||||
//
|
||||
@@ -1370,7 +1389,7 @@ const CHECKPOINT_KEY: Key = Key {
|
||||
// Reverse mappings for a few Keys.
|
||||
// These are needed by WAL redo manager.
|
||||
|
||||
pub fn key_to_rel_block(key: Key) -> Result<(RelTag, BlockNumber)> {
|
||||
pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
|
||||
Ok(match key.field1 {
|
||||
0x00 => (
|
||||
RelTag {
|
||||
@@ -1400,7 +1419,7 @@ pub fn is_rel_vm_block_key(key: Key) -> bool {
|
||||
&& key.field6 != 0xffffffff
|
||||
}
|
||||
|
||||
pub fn key_to_slru_block(key: Key) -> Result<(SlruKind, u32, BlockNumber)> {
|
||||
pub fn key_to_slru_block(key: Key) -> anyhow::Result<(SlruKind, u32, BlockNumber)> {
|
||||
Ok(match key.field1 {
|
||||
0x01 => {
|
||||
let kind = match key.field2 {
|
||||
@@ -1429,7 +1448,7 @@ pub fn create_test_timeline(
|
||||
tenant: &crate::tenant::Tenant,
|
||||
timeline_id: utils::id::TimelineId,
|
||||
pg_version: u32,
|
||||
) -> Result<std::sync::Arc<Timeline>> {
|
||||
) -> anyhow::Result<std::sync::Arc<Timeline>> {
|
||||
let tline = tenant
|
||||
.create_empty_timeline(timeline_id, Lsn(8), pg_version)?
|
||||
.initialize()?;
|
||||
|
||||
Reference in New Issue
Block a user