Replace assert! with ensure! for anyhow::Result functions

This commit is contained in:
Kirill Bulatov
2022-03-18 20:59:55 +02:00
committed by Kirill Bulatov
parent edc7bebcb5
commit f6b1d76c30
7 changed files with 48 additions and 43 deletions

View File

@@ -10,7 +10,7 @@
//! This module is responsible for creation of such tarball
//! from data stored in object storage.
//!
use anyhow::{Context, Result};
use anyhow::{ensure, Context, Result};
use bytes::{BufMut, BytesMut};
use log::*;
use std::fmt::Write as FmtWrite;
@@ -163,7 +163,7 @@ impl<'a> Basebackup<'a> {
let img =
self.timeline
.get_page_at_lsn(RelishTag::Slru { slru, segno }, blknum, self.lsn)?;
assert!(img.len() == pg_constants::BLCKSZ as usize);
ensure!(img.len() == pg_constants::BLCKSZ as usize);
slru_buf.extend_from_slice(&img);
}
@@ -197,7 +197,7 @@ impl<'a> Basebackup<'a> {
String::from("global/pg_filenode.map") // filenode map for global tablespace
} else {
// User defined tablespaces are not supported
assert!(spcnode == pg_constants::DEFAULTTABLESPACE_OID);
ensure!(spcnode == pg_constants::DEFAULTTABLESPACE_OID);
// Append dir path for each database
let path = format!("base/{}", dbnode);
@@ -211,7 +211,7 @@ impl<'a> Basebackup<'a> {
format!("base/{}/pg_filenode.map", dbnode)
};
assert!(img.len() == 512);
ensure!(img.len() == 512);
let header = new_tar_header(&path, img.len() as u64)?;
self.ar.append(&header, &img[..])?;
Ok(())
@@ -292,7 +292,7 @@ impl<'a> Basebackup<'a> {
let wal_file_path = format!("pg_wal/{}", wal_file_name);
let header = new_tar_header(&wal_file_path, pg_constants::WAL_SEGMENT_SIZE as u64)?;
let wal_seg = generate_wal_segment(segno, pg_control.system_identifier);
assert!(wal_seg.len() == pg_constants::WAL_SEGMENT_SIZE);
ensure!(wal_seg.len() == pg_constants::WAL_SEGMENT_SIZE);
self.ar.append(&header, &wal_seg[..])?;
Ok(())
}

View File

@@ -791,10 +791,10 @@ impl Timeline for LayeredTimeline {
}
/// Wait until WAL has been received up to the given LSN.
fn wait_lsn(&self, lsn: Lsn) -> Result<()> {
fn wait_lsn(&self, lsn: Lsn) -> anyhow::Result<()> {
// This should never be called from the WAL receiver thread, because that could lead
// to a deadlock.
assert!(
ensure!(
!IS_WAL_RECEIVER.with(|c| c.get()),
"wait_lsn called by WAL receiver thread"
);
@@ -1262,7 +1262,7 @@ impl LayeredTimeline {
seg: SegmentTag,
lsn: Lsn,
self_layers: &MutexGuard<LayerMap>,
) -> Result<Option<(Arc<dyn Layer>, Lsn)>> {
) -> anyhow::Result<Option<(Arc<dyn Layer>, Lsn)>> {
trace!("get_layer_for_read called for {} at {}", seg, lsn);
// If you requested a page at an older LSN, before the branch point, dig into
@@ -1310,7 +1310,7 @@ impl LayeredTimeline {
layer.get_end_lsn()
);
assert!(layer.get_start_lsn() <= lsn);
ensure!(layer.get_start_lsn() <= lsn);
if layer.is_dropped() && layer.get_end_lsn() <= lsn {
return Ok(None);
@@ -1338,13 +1338,13 @@ impl LayeredTimeline {
///
/// Get a handle to the latest layer for appending.
///
fn get_layer_for_write(&self, seg: SegmentTag, lsn: Lsn) -> Result<Arc<InMemoryLayer>> {
fn get_layer_for_write(&self, seg: SegmentTag, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut layers = self.layers.lock().unwrap();
assert!(lsn.is_aligned());
ensure!(lsn.is_aligned());
let last_record_lsn = self.get_last_record_lsn();
assert!(
ensure!(
lsn > last_record_lsn,
"cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
lsn,
@@ -1360,7 +1360,7 @@ impl LayeredTimeline {
// Open layer exists, but it is dropped, so create a new one.
if open_layer.is_dropped() {
assert!(!open_layer.is_writeable());
ensure!(!open_layer.is_writeable());
// Layer that is created after dropped one represents a new relish segment.
trace!(
"creating layer for write for new relish segment after dropped layer {} at {}/{}",

View File

@@ -209,10 +209,10 @@ impl Layer for DeltaLayer {
blknum: SegmentBlk,
lsn: Lsn,
reconstruct_data: &mut PageReconstructData,
) -> Result<PageReconstructResult> {
) -> anyhow::Result<PageReconstructResult> {
let mut need_image = true;
assert!((0..RELISH_SEG_SIZE).contains(&blknum));
ensure!((0..RELISH_SEG_SIZE).contains(&blknum));
match &reconstruct_data.page_img {
Some((cached_lsn, _)) if &self.end_lsn <= cached_lsn => {
@@ -289,8 +289,8 @@ impl Layer for DeltaLayer {
}
/// Get size of the relation at given LSN
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk> {
assert!(lsn >= self.start_lsn);
fn get_seg_size(&self, lsn: Lsn) -> anyhow::Result<SegmentBlk> {
ensure!(lsn >= self.start_lsn);
ensure!(
self.seg.rel.is_blocky(),
"get_seg_size() called on a non-blocky rel"
@@ -641,7 +641,7 @@ impl DeltaLayerWriter {
///
/// 'seg_sizes' is a list of size changes to store with the actual data.
///
pub fn finish(self, seg_sizes: VecMap<Lsn, SegmentBlk>) -> Result<DeltaLayer> {
pub fn finish(self, seg_sizes: VecMap<Lsn, SegmentBlk>) -> anyhow::Result<DeltaLayer> {
// Close the page-versions chapter
let book = self.page_version_writer.close()?;
@@ -652,7 +652,7 @@ impl DeltaLayerWriter {
let book = chapter.close()?;
if self.seg.rel.is_blocky() {
assert!(!seg_sizes.is_empty());
ensure!(!seg_sizes.is_empty());
}
// and seg_sizes to separate chapter

View File

@@ -146,9 +146,9 @@ impl Layer for ImageLayer {
blknum: SegmentBlk,
lsn: Lsn,
reconstruct_data: &mut PageReconstructData,
) -> Result<PageReconstructResult> {
assert!((0..RELISH_SEG_SIZE).contains(&blknum));
assert!(lsn >= self.lsn);
) -> anyhow::Result<PageReconstructResult> {
ensure!((0..RELISH_SEG_SIZE).contains(&blknum));
ensure!(lsn >= self.lsn);
match reconstruct_data.page_img {
Some((cached_lsn, _)) if self.lsn <= cached_lsn => {
@@ -432,7 +432,7 @@ impl ImageLayerWriter {
seg: SegmentTag,
lsn: Lsn,
num_blocks: SegmentBlk,
) -> Result<ImageLayerWriter> {
) -> anyhow::Result<ImageLayerWriter> {
// Create the file
//
// Note: This overwrites any existing file. There shouldn't be any.
@@ -452,7 +452,7 @@ impl ImageLayerWriter {
let chapter = if seg.rel.is_blocky() {
book.new_chapter(BLOCKY_IMAGES_CHAPTER)
} else {
assert_eq!(num_blocks, 1);
ensure!(num_blocks == 1);
book.new_chapter(NONBLOCKY_IMAGE_CHAPTER)
};
@@ -475,19 +475,19 @@ impl ImageLayerWriter {
///
/// The page versions must be appended in blknum order.
///
pub fn put_page_image(&mut self, block_bytes: &[u8]) -> Result<()> {
assert!(self.num_blocks_written < self.num_blocks);
pub fn put_page_image(&mut self, block_bytes: &[u8]) -> anyhow::Result<()> {
ensure!(self.num_blocks_written < self.num_blocks);
if self.seg.rel.is_blocky() {
assert_eq!(block_bytes.len(), BLOCK_SIZE);
ensure!(block_bytes.len() == BLOCK_SIZE);
}
self.page_image_writer.write_all(block_bytes)?;
self.num_blocks_written += 1;
Ok(())
}
pub fn finish(self) -> Result<ImageLayer> {
pub fn finish(self) -> anyhow::Result<ImageLayer> {
// Check that the `put_page_image' was called for every block.
assert!(self.num_blocks_written == self.num_blocks);
ensure!(self.num_blocks_written == self.num_blocks);
// Close the page-images chapter
let book = self.page_image_writer.close()?;

View File

@@ -17,7 +17,7 @@ use crate::layered_repository::LayeredTimeline;
use crate::layered_repository::ZERO_PAGE;
use crate::repository::ZenithWalRecord;
use crate::{ZTenantId, ZTimelineId};
use anyhow::{ensure, Result, bail};
use anyhow::{bail, ensure, Result};
use bytes::Bytes;
use log::*;
use std::collections::HashMap;
@@ -224,10 +224,10 @@ impl Layer for InMemoryLayer {
blknum: SegmentBlk,
lsn: Lsn,
reconstruct_data: &mut PageReconstructData,
) -> Result<PageReconstructResult> {
) -> anyhow::Result<PageReconstructResult> {
let mut need_image = true;
assert!((0..RELISH_SEG_SIZE).contains(&blknum));
ensure!((0..RELISH_SEG_SIZE).contains(&blknum));
{
let inner = self.inner.read().unwrap();
@@ -288,8 +288,8 @@ impl Layer for InMemoryLayer {
}
/// Get size of the relation at given LSN
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk> {
assert!(lsn >= self.start_lsn);
fn get_seg_size(&self, lsn: Lsn) -> anyhow::Result<SegmentBlk> {
ensure!(lsn >= self.start_lsn);
ensure!(
self.seg.rel.is_blocky(),
"get_seg_size() called on a non-blocky rel"
@@ -300,13 +300,13 @@ impl Layer for InMemoryLayer {
}
/// Does this segment exist at given LSN?
fn get_seg_exists(&self, lsn: Lsn) -> Result<bool> {
fn get_seg_exists(&self, lsn: Lsn) -> anyhow::Result<bool> {
let inner = self.inner.read().unwrap();
// If the segment created after requested LSN,
// it doesn't exist in the layer. But we shouldn't
// have requested it in the first place.
assert!(lsn >= self.start_lsn);
ensure!(lsn >= self.start_lsn);
// Is the requested LSN after the segment was dropped?
if inner.dropped {
@@ -466,8 +466,13 @@ impl InMemoryLayer {
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
/// Adds the page version to the in-memory tree
pub fn put_page_version(&self, blknum: SegmentBlk, lsn: Lsn, pv: PageVersion) -> Result<u32> {
assert!((0..RELISH_SEG_SIZE).contains(&blknum));
pub fn put_page_version(
&self,
blknum: SegmentBlk,
lsn: Lsn,
pv: PageVersion,
) -> anyhow::Result<u32> {
ensure!((0..RELISH_SEG_SIZE).contains(&blknum));
trace!(
"put_page_version blk {} of {} at {}/{}",
@@ -479,7 +484,7 @@ impl InMemoryLayer {
let mut inner = self.inner.write().unwrap();
inner.assert_writeable();
assert!(lsn >= inner.latest_lsn);
ensure!(lsn >= inner.latest_lsn);
inner.latest_lsn = lsn;
// Write the page version to the file, and remember its offset in 'page_versions'

View File

@@ -96,7 +96,7 @@ impl TimelineMetadata {
);
let data = TimelineMetadata::from(serialize::DeTimelineMetadata::des_prefix(data)?);
assert!(data.disk_consistent_lsn.is_aligned());
ensure!(data.disk_consistent_lsn.is_aligned());
Ok(data)
}
@@ -104,7 +104,7 @@ impl TimelineMetadata {
pub fn to_bytes(&self) -> anyhow::Result<Vec<u8>> {
let serializeable_metadata = serialize::SeTimelineMetadata::from(self);
let mut metadata_bytes = serialize::SeTimelineMetadata::ser(&serializeable_metadata)?;
assert!(metadata_bytes.len() <= METADATA_MAX_DATA_SIZE);
ensure!(metadata_bytes.len() <= METADATA_MAX_DATA_SIZE);
metadata_bytes.resize(METADATA_MAX_SAFE_SIZE, 0u8);
let checksum = crc32c::crc32c(&metadata_bytes[..METADATA_MAX_DATA_SIZE]);

View File

@@ -146,7 +146,7 @@ fn walreceiver_main(
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
wal_producer_connstr: &str,
) -> Result<(), Error> {
) -> anyhow::Result<(), Error> {
// Connect to the database in replication mode.
info!("connecting to {:?}", wal_producer_connstr);
let connect_cfg = format!(
@@ -255,7 +255,7 @@ fn walreceiver_main(
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
// at risk of hittind a deadlock.
assert!(lsn.is_aligned());
anyhow::ensure!(lsn.is_aligned());
let writer = timeline.writer();
walingest.ingest_record(writer.as_ref(), recdata, lsn)?;