diff --git a/compute_tools/src/pg_helpers.rs b/compute_tools/src/pg_helpers.rs index 940e3adebc..8b6dc04069 100644 --- a/compute_tools/src/pg_helpers.rs +++ b/compute_tools/src/pg_helpers.rs @@ -5,7 +5,7 @@ use std::process::Command; use std::str::FromStr; use std::{fs, thread, time}; -use anyhow::{anyhow, Result}; +use anyhow::{bail, Result}; use postgres::{Client, Transaction}; use serde::Deserialize; @@ -226,7 +226,7 @@ pub fn wait_for_postgres(port: &str, pgdata: &Path) -> Result<()> { // but postgres starts listening almost immediately, even if it is not really // ready to accept connections). if slept >= POSTGRES_WAIT_TIMEOUT { - return Err(anyhow!("timed out while waiting for Postgres to start")); + bail!("timed out while waiting for Postgres to start"); } if pid_path.exists() { diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index a851126d67..d8bac30baf 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -82,15 +82,11 @@ impl ComputeControlPlane { let mut strings = s.split('@'); let name = strings.next().unwrap(); - let lsn: Option; - if let Some(lsnstr) = strings.next() { - lsn = Some( - Lsn::from_str(lsnstr) - .with_context(|| "invalid LSN in point-in-time specification")?, - ); - } else { - lsn = None - } + let lsn = strings + .next() + .map(Lsn::from_str) + .transpose() + .context("invalid LSN in point-in-time specification")?; // Resolve the timeline ID, given the human-readable branch name let timeline_id = self @@ -253,16 +249,16 @@ impl PostgresNode { let mut client = self .pageserver .page_server_psql_client() - .with_context(|| "connecting to page server failed")?; + .context("connecting to page server failed")?; let copyreader = client .copy_out(sql.as_str()) - .with_context(|| "page server 'basebackup' command failed")?; + .context("page server 'basebackup' command failed")?; // Read the archive directly from the `CopyOutReader` tar::Archive::new(copyreader) .unpack(&self.pgdata()) - .with_context(|| "extracting base backup failed")?; + .context("extracting base backup failed")?; Ok(()) } @@ -443,7 +439,7 @@ impl PostgresNode { if let Some(token) = auth_token { cmd.env("ZENITH_AUTH_TOKEN", token); } - let pg_ctl = cmd.status().with_context(|| "pg_ctl failed")?; + let pg_ctl = cmd.status().context("pg_ctl failed")?; if !pg_ctl.success() { anyhow::bail!("pg_ctl failed"); diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 376d78a9cb..b80e137cb9 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -251,7 +251,7 @@ impl LocalEnv { .arg("2048") .stdout(Stdio::null()) .output() - .with_context(|| "failed to generate auth private key")?; + .context("failed to generate auth private key")?; if !keygen_output.status.success() { bail!( "openssl failed: '{}'", @@ -270,7 +270,7 @@ impl LocalEnv { .args(&["-out", public_key_path.to_str().unwrap()]) .stdout(Stdio::null()) .output() - .with_context(|| "failed to generate auth private key")?; + .context("failed to generate auth private key")?; if !keygen_output.status.success() { bail!( "openssl failed: '{}'", diff --git a/control_plane/src/postgresql_conf.rs b/control_plane/src/postgresql_conf.rs index 7f50fe9c2f..83765b2c95 100644 --- a/control_plane/src/postgresql_conf.rs +++ b/control_plane/src/postgresql_conf.rs @@ -4,7 +4,7 @@ /// NOTE: This doesn't implement the full, correct postgresql.conf syntax. Just /// enough to extract a few settings we need in Zenith, assuming you don't do /// funny stuff like include-directives or funny escaping. -use anyhow::{anyhow, bail, Context, Result}; +use anyhow::{bail, Context, Result}; use lazy_static::lazy_static; use regex::Regex; use std::collections::HashMap; @@ -78,7 +78,7 @@ impl PostgresConf { ::Err: std::error::Error + Send + Sync + 'static, { self.get(field_name) - .ok_or_else(|| anyhow!("could not find '{}' option {}", field_name, context))? + .with_context(|| format!("could not find '{}' option {}", field_name, context))? .parse::() .with_context(|| format!("could not parse '{}' option {}", field_name, context)) } diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index 32652f3b02..662050018b 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -4,7 +4,7 @@ // TODO: move all paths construction to conf impl // -use anyhow::{anyhow, bail, Context, Result}; +use anyhow::{bail, Context, Result}; use postgres_ffi::ControlFileData; use serde::{Deserialize, Serialize}; use std::{ @@ -118,7 +118,7 @@ pub fn init_pageserver(conf: &'static PageServerConf, create_tenant: Option<&str if let Some(tenantid) = create_tenant { let tenantid = ZTenantId::from_str(tenantid)?; println!("initializing tenantid {}", tenantid); - create_repo(conf, tenantid, dummy_redo_mgr).with_context(|| "failed to create repo")?; + create_repo(conf, tenantid, dummy_redo_mgr).context("failed to create repo")?; } crashsafe_dir::create_dir_all(conf.tenants_path())?; @@ -197,7 +197,7 @@ fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path) -> Result<()> { .env("DYLD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap()) .stdout(Stdio::null()) .output() - .with_context(|| "failed to execute initdb")?; + .context("failed to execute initdb")?; if !initdb_output.status.success() { anyhow::bail!( "initdb failed: '{}'", @@ -308,7 +308,7 @@ pub(crate) fn create_branch( let timeline = repo .get_timeline(startpoint.timelineid)? .local_timeline() - .ok_or_else(|| anyhow!("Cannot branch off the timeline that's not present locally"))?; + .context("Cannot branch off the timeline that's not present locally")?; if startpoint.lsn == Lsn(0) { // Find end of WAL on the old timeline let end_of_wal = timeline.get_last_record_lsn(); @@ -383,14 +383,11 @@ fn parse_point_in_time( let mut strings = s.split('@'); let name = strings.next().unwrap(); - let lsn: Option; - if let Some(lsnstr) = strings.next() { - lsn = Some( - Lsn::from_str(lsnstr).with_context(|| "invalid LSN in point-in-time specification")?, - ); - } else { - lsn = None - } + let lsn = strings + .next() + .map(Lsn::from_str) + .transpose() + .context("invalid LSN in point-in-time specification")?; // Check if it's a tag if lsn.is_none() { diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 6e0d907f6b..e89ec562ed 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -4,7 +4,7 @@ //! file, or on the command line. //! See also `settings.md` for better description on every parameter. -use anyhow::{anyhow, bail, ensure, Context, Result}; +use anyhow::{bail, ensure, Context, Result}; use toml_edit; use toml_edit::{Document, Item}; use zenith_utils::postgres_backend::AuthType; @@ -306,9 +306,7 @@ impl PageServerConf { }) .ok() .and_then(NonZeroUsize::new) - .ok_or_else(|| { - anyhow!("'max_concurrent_sync' must be a non-zero positive integer") - })? + .context("'max_concurrent_sync' must be a non-zero positive integer")? } else { NonZeroUsize::new(defaults::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNC).unwrap() }; @@ -321,7 +319,7 @@ impl PageServerConf { }) .ok() .and_then(NonZeroU32::new) - .ok_or_else(|| anyhow!("'max_sync_errors' must be a non-zero positive integer"))? + .context("'max_sync_errors' must be a non-zero positive integer")? } else { NonZeroU32::new(defaults::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS).unwrap() }; @@ -396,7 +394,7 @@ impl PageServerConf { fn parse_toml_string(name: &str, item: &Item) -> Result { let s = item .as_str() - .ok_or_else(|| anyhow!("configure option {} is not a string", name))?; + .with_context(|| format!("configure option {} is not a string", name))?; Ok(s.to_string()) } @@ -405,7 +403,7 @@ fn parse_toml_u64(name: &str, item: &Item) -> Result { // for our use, though. let i: i64 = item .as_integer() - .ok_or_else(|| anyhow!("configure option {} is not an integer", name))?; + .with_context(|| format!("configure option {} is not an integer", name))?; if i < 0 { bail!("configure option {} cannot be negative", name); } @@ -415,7 +413,7 @@ fn parse_toml_u64(name: &str, item: &Item) -> Result { fn parse_toml_duration(name: &str, item: &Item) -> Result { let s = item .as_str() - .ok_or_else(|| anyhow!("configure option {} is not a string", name))?; + .with_context(|| format!("configure option {} is not a string", name))?; Ok(humantime::parse_duration(s)?) } @@ -423,7 +421,7 @@ fn parse_toml_duration(name: &str, item: &Item) -> Result { fn parse_toml_auth_type(name: &str, item: &Item) -> Result { let v = item .as_str() - .ok_or_else(|| anyhow!("configure option {} is not a string", name))?; + .with_context(|| format!("configure option {} is not a string", name))?; AuthType::from_str(v) } diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index d4e8b9ae72..e317118bb5 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -7,7 +7,7 @@ use std::fs::File; use std::io::{Read, Seek, SeekFrom}; use std::path::{Path, PathBuf}; -use anyhow::{anyhow, bail, ensure, Result}; +use anyhow::{bail, ensure, Context, Result}; use bytes::Bytes; use tracing::*; @@ -126,7 +126,7 @@ pub fn import_timeline_from_postgres_datadir( writer.advance_last_record_lsn(lsn); // We expect the Postgres server to be shut down cleanly. - let pg_control = pg_control.ok_or_else(|| anyhow!("pg_control file not found"))?; + let pg_control = pg_control.context("pg_control file not found")?; ensure!( pg_control.state == DBState_DB_SHUTDOWNED, "Postgres cluster was not shut down cleanly" diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 5b8ad61c5f..2871ae2db8 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -11,7 +11,7 @@ //! parent timeline, and the last LSN that has been written to disk. //! -use anyhow::{anyhow, bail, ensure, Context, Result}; +use anyhow::{bail, ensure, Context, Result}; use bookfile::Book; use bytes::Bytes; use lazy_static::lazy_static; @@ -2171,11 +2171,10 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> { let oldsize = self .tl .get_relish_size(rel, self.tl.get_last_record_lsn())? - .ok_or_else(|| { - anyhow!( + .with_context(|| { + format!( "attempted to truncate non-existent relish {} at {}", - rel, - lsn + rel, lsn ) })?; @@ -2298,8 +2297,5 @@ fn rename_to_backup(path: PathBuf) -> anyhow::Result<()> { } } - Err(anyhow!( - "couldn't find an unused backup number for {:?}", - path - )) + bail!("couldn't find an unused backup number for {:?}", path) } diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index 41692fcac6..17d5eef0a5 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -169,7 +169,7 @@ impl DeltaLayerInner { if let Some((_entry_lsn, entry)) = slice.last() { Ok(*entry) } else { - Err(anyhow::anyhow!("could not find seg size in delta layer")) + bail!("could not find seg size in delta layer") } } } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 70ba7ec927..d8ee59375b 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -10,7 +10,7 @@ // *callmemaybe $url* -- ask pageserver to start walreceiver on $url // -use anyhow::{anyhow, bail, ensure, Context, Result}; +use anyhow::{bail, ensure, Context, Result}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use lazy_static::lazy_static; use regex::Regex; @@ -624,7 +624,7 @@ impl postgres_backend::Handler for PageServerHandler { let re = Regex::new(r"^callmemaybe ([[:xdigit:]]+) ([[:xdigit:]]+) (.*)$").unwrap(); let caps = re .captures(query_string) - .ok_or_else(|| anyhow!("invalid callmemaybe: '{}'", query_string))?; + .with_context(|| format!("invalid callmemaybe: '{}'", query_string))?; let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; @@ -643,18 +643,18 @@ impl postgres_backend::Handler for PageServerHandler { pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; } else if query_string.starts_with("branch_create ") { - let err = || anyhow!("invalid branch_create: '{}'", query_string); + let err = || format!("invalid branch_create: '{}'", query_string); // branch_create // TODO lazy static // TODO: escaping, to allow branch names with spaces let re = Regex::new(r"^branch_create ([[:xdigit:]]+) (\S+) ([^\r\n\s;]+)[\r\n\s;]*;?$") .unwrap(); - let caps = re.captures(query_string).ok_or_else(err)?; + let caps = re.captures(query_string).with_context(err)?; let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; - let branchname = caps.get(2).ok_or_else(err)?.as_str().to_owned(); - let startpoint_str = caps.get(3).ok_or_else(err)?.as_str().to_owned(); + let branchname = caps.get(2).with_context(err)?.as_str().to_owned(); + let startpoint_str = caps.get(3).with_context(err)?.as_str().to_owned(); self.check_permission(Some(tenantid))?; @@ -673,7 +673,7 @@ impl postgres_backend::Handler for PageServerHandler { let re = Regex::new(r"^branch_list ([[:xdigit:]]+)$").unwrap(); let caps = re .captures(query_string) - .ok_or_else(|| anyhow!("invalid branch_list: '{}'", query_string))?; + .with_context(|| format!("invalid branch_list: '{}'", query_string))?; let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; @@ -693,11 +693,11 @@ impl postgres_backend::Handler for PageServerHandler { .write_message_noflush(&BeMessage::DataRow(&[Some(&tenants_buf)]))? .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; } else if query_string.starts_with("tenant_create") { - let err = || anyhow!("invalid tenant_create: '{}'", query_string); + let err = || format!("invalid tenant_create: '{}'", query_string); // tenant_create let re = Regex::new(r"^tenant_create ([[:xdigit:]]+)$").unwrap(); - let caps = re.captures(query_string).ok_or_else(err)?; + let caps = re.captures(query_string).with_context(err)?; self.check_permission(None)?; @@ -728,7 +728,7 @@ impl postgres_backend::Handler for PageServerHandler { let caps = re .captures(query_string) - .ok_or_else(|| anyhow!("invalid do_gc: '{}'", query_string))?; + .with_context(|| format!("invalid do_gc: '{}'", query_string))?; let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; @@ -812,7 +812,7 @@ impl postgres_backend::Handler for PageServerHandler { let caps = re .captures(query_string) - .ok_or_else(|| anyhow!("invalid checkpoint command: '{}'", query_string))?; + .with_context(|| format!("invalid checkpoint command: '{}'", query_string))?; let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; diff --git a/pageserver/src/remote_storage/storage_sync/compression.rs b/pageserver/src/remote_storage/storage_sync/compression.rs index 251c7b0184..12c33e7107 100644 --- a/pageserver/src/remote_storage/storage_sync/compression.rs +++ b/pageserver/src/remote_storage/storage_sync/compression.rs @@ -34,7 +34,7 @@ use std::{ sync::Arc, }; -use anyhow::{anyhow, bail, ensure, Context}; +use anyhow::{bail, ensure, Context}; use async_compression::tokio::bufread::{ZstdDecoder, ZstdEncoder}; use serde::{Deserialize, Serialize}; use tokio::{ @@ -211,16 +211,18 @@ pub async fn read_archive_header( pub fn parse_archive_name(archive_path: &Path) -> anyhow::Result<(Lsn, u64)> { let archive_name = archive_path .file_name() - .ok_or_else(|| anyhow!("Archive '{}' has no file name", archive_path.display()))? + .with_context(|| format!("Archive '{}' has no file name", archive_path.display()))? .to_string_lossy(); let (lsn_str, header_size_str) = - archive_name.rsplit_once(ARCHIVE_EXTENSION).ok_or_else(|| { - anyhow!( - "Archive '{}' has incorrect extension, expected to contain '{}'", - archive_path.display(), - ARCHIVE_EXTENSION - ) - })?; + archive_name + .rsplit_once(ARCHIVE_EXTENSION) + .with_context(|| { + format!( + "Archive '{}' has incorrect extension, expected to contain '{}'", + archive_path.display(), + ARCHIVE_EXTENSION + ) + })?; let disk_consistent_lsn = Lsn::from_hex(lsn_str).with_context(|| { format!( "Archive '{}' has an invalid disk consistent lsn in its extension", @@ -374,7 +376,7 @@ async fn write_archive_contents( } let metadata_bytes_written = io::copy(&mut metadata_bytes.as_slice(), &mut archive_input) .await - .with_context(|| "Failed to add metadata into the archive")?; + .context("Failed to add metadata into the archive")?; ensure!( header.metadata_file_size == metadata_bytes_written, "Metadata file was written to the archive incompletely", diff --git a/pageserver/src/remote_storage/storage_sync/download.rs b/pageserver/src/remote_storage/storage_sync/download.rs index e4f511e92b..b00b746522 100644 --- a/pageserver/src/remote_storage/storage_sync/download.rs +++ b/pageserver/src/remote_storage/storage_sync/download.rs @@ -3,7 +3,7 @@ use std::{borrow::Cow, collections::BTreeSet, path::PathBuf, sync::Arc}; -use anyhow::{anyhow, ensure, Context}; +use anyhow::{ensure, Context}; use futures::{stream::FuturesUnordered, StreamExt}; use tokio::{fs, sync::RwLock}; use tracing::{debug, error, trace, warn}; @@ -189,7 +189,7 @@ async fn try_download_archive< debug!("Downloading archive {:?}", archive_id); let archive_to_download = remote_timeline .archive_data(archive_id) - .ok_or_else(|| anyhow!("Archive {:?} not found in remote storage", archive_id))?; + .with_context(|| format!("Archive {:?} not found in remote storage", archive_id))?; let (archive_header, header_size) = remote_timeline .restore_header(archive_id) .context("Failed to restore header when downloading an archive")?; diff --git a/pageserver/src/remote_storage/storage_sync/index.rs b/pageserver/src/remote_storage/storage_sync/index.rs index f22471dbbe..039ccf8ea0 100644 --- a/pageserver/src/remote_storage/storage_sync/index.rs +++ b/pageserver/src/remote_storage/storage_sync/index.rs @@ -9,7 +9,7 @@ use std::{ path::{Path, PathBuf}, }; -use anyhow::{anyhow, bail, ensure, Context}; +use anyhow::{bail, ensure, Context}; use serde::{Deserialize, Serialize}; use tracing::debug; use zenith_utils::{ @@ -214,7 +214,7 @@ impl RemoteTimeline { let archive = self .checkpoint_archives .get(&archive_id) - .ok_or_else(|| anyhow!("Archive {:?} not found", archive_id))?; + .with_context(|| format!("Archive {:?} not found", archive_id))?; let mut header_files = Vec::with_capacity(archive.files.len()); for (expected_archive_position, archive_file) in archive.files.iter().enumerate() { @@ -226,11 +226,10 @@ impl RemoteTimeline { archive_id, ); - let timeline_file = self.timeline_files.get(archive_file).ok_or_else(|| { - anyhow!( + let timeline_file = self.timeline_files.get(archive_file).with_context(|| { + format!( "File with id {:?} not found for archive {:?}", - archive_file, - archive_id + archive_file, archive_id ) })?; header_files.push(timeline_file.clone()); @@ -299,7 +298,7 @@ fn try_parse_index_entry( })? .iter() .next() - .ok_or_else(|| anyhow!("Found no tenant id in path '{}'", path.display()))? + .with_context(|| format!("Found no tenant id in path '{}'", path.display()))? .to_string_lossy() .parse::() .with_context(|| format!("Failed to parse tenant id from path '{}'", path.display()))?; @@ -321,8 +320,8 @@ fn try_parse_index_entry( let mut segments = timelines_subpath.iter(); let timeline_id = segments .next() - .ok_or_else(|| { - anyhow!( + .with_context(|| { + format!( "{} directory of tenant {} (path '{}') is not an index entry", TIMELINES_SEGMENT_NAME, tenant_id, @@ -345,7 +344,7 @@ fn try_parse_index_entry( let archive_name = path .file_name() - .ok_or_else(|| anyhow!("Archive '{}' has no file name", path.display()))? + .with_context(|| format!("Archive '{}' has no file name", path.display()))? .to_string_lossy() .to_string(); diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 7b8e8fe373..c1d06be4ae 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -9,7 +9,7 @@ use crate::thread_mgr; use crate::thread_mgr::ThreadKind; use crate::walredo::PostgresRedoManager; use crate::CheckpointConfig; -use anyhow::{anyhow, bail, Context, Result}; +use anyhow::{bail, Context, Result}; use lazy_static::lazy_static; use log::*; use serde::{Deserialize, Serialize}; @@ -208,7 +208,7 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenantid: ZTenantId) -> Re let mut m = access_tenants(); let tenant = m .get_mut(&tenantid) - .ok_or_else(|| anyhow!("Tenant not found for id {}", tenantid))?; + .with_context(|| format!("Tenant not found for id {}", tenantid))?; info!("activating tenant {}", tenantid); @@ -251,7 +251,7 @@ pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result Waiters { .lock() .unwrap() .remove(key) - .ok_or_else(|| anyhow!("key {} not found", key))?; + .with_context(|| format!("key {} not found", key))?; tx.send(value).context("channel hangup") } } diff --git a/walkeeper/src/bin/safekeeper.rs b/walkeeper/src/bin/safekeeper.rs index 23919d34d7..d85a70cd01 100644 --- a/walkeeper/src/bin/safekeeper.rs +++ b/walkeeper/src/bin/safekeeper.rs @@ -143,7 +143,7 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { // Prevent running multiple safekeepers on the same directory let lock_file_path = conf.workdir.join(LOCK_FILE_NAME); - let lock_file = File::create(&lock_file_path).with_context(|| "failed to open lockfile")?; + let lock_file = File::create(&lock_file_path).context("failed to open lockfile")?; lock_file.try_lock_exclusive().with_context(|| { format!( "control file {} is locked by some other process", diff --git a/walkeeper/src/callmemaybe.rs b/walkeeper/src/callmemaybe.rs index 41c82b3bec..7619e2c876 100644 --- a/walkeeper/src/callmemaybe.rs +++ b/walkeeper/src/callmemaybe.rs @@ -6,8 +6,7 @@ //! from the call list. //! use crate::SafeKeeperConf; -use anyhow::anyhow; -use anyhow::Result; +use anyhow::{Context, Result}; use std::collections::HashMap; use std::sync::Mutex; use std::time::{Duration, Instant}; @@ -198,7 +197,7 @@ pub async fn main_loop(conf: SafeKeeperConf, mut rx: UnboundedReceiver { - match request.ok_or_else(|| anyhow!("done"))? + match request.context("done")? { CallmeEvent::Subscribe(tenantid, timelineid, pageserver_connstr) => { diff --git a/walkeeper/src/handler.rs b/walkeeper/src/handler.rs index 31ae848bae..6d248eb0c5 100644 --- a/walkeeper/src/handler.rs +++ b/walkeeper/src/handler.rs @@ -6,7 +6,7 @@ use crate::receive_wal::ReceiveWalConn; use crate::send_wal::ReplicationConn; use crate::timeline::{Timeline, TimelineTools}; use crate::SafeKeeperConf; -use anyhow::{anyhow, bail, Context, Result}; +use anyhow::{bail, Context, Result}; use postgres_ffi::xlog_utils::PG_TLI; use regex::Regex; @@ -56,16 +56,15 @@ fn parse_cmd(cmd: &str) -> Result { let start_lsn = caps .next() .map(|cap| cap[1].parse::()) - .ok_or_else(|| anyhow!("failed to parse start LSN from START_REPLICATION command"))??; + .context("failed to parse start LSN from START_REPLICATION command")??; Ok(SafekeeperPostgresCommand::StartReplication { start_lsn }) } else if cmd.starts_with("IDENTIFY_SYSTEM") { Ok(SafekeeperPostgresCommand::IdentifySystem) } else if cmd.starts_with("JSON_CTRL") { - let cmd = cmd - .strip_prefix("JSON_CTRL") - .ok_or_else(|| anyhow!("invalid prefix"))?; - let parsed_cmd: AppendLogicalMessage = serde_json::from_str(cmd)?; - Ok(SafekeeperPostgresCommand::JSONCtrl { cmd: parsed_cmd }) + let cmd = cmd.strip_prefix("JSON_CTRL").context("invalid prefix")?; + Ok(SafekeeperPostgresCommand::JSONCtrl { + cmd: serde_json::from_str(cmd)?, + }) } else { bail!("unsupported command {}", cmd); } @@ -104,12 +103,8 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler { | SafekeeperPostgresCommand::StartReplication { .. } | SafekeeperPostgresCommand::IdentifySystem | SafekeeperPostgresCommand::JSONCtrl { .. } => { - let tenantid = self - .ztenantid - .ok_or_else(|| anyhow!("tenantid is required"))?; - let timelineid = self - .ztimelineid - .ok_or_else(|| anyhow!("timelineid is required"))?; + let tenantid = self.ztenantid.context("tenantid is required")?; + let timelineid = self.ztimelineid.context("timelineid is required")?; if self.timeline.is_none() { // START_WAL_PUSH is the only command that initializes the timeline in production. // There is also JSON_CTRL command, which should initialize the timeline for testing. @@ -128,12 +123,12 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler { SafekeeperPostgresCommand::StartWalPush { pageserver_connstr } => { ReceiveWalConn::new(pgb, pageserver_connstr) .run(self) - .with_context(|| "failed to run ReceiveWalConn")?; + .context("failed to run ReceiveWalConn")?; } SafekeeperPostgresCommand::StartReplication { start_lsn } => { ReplicationConn::new(pgb) .run(self, pgb, start_lsn) - .with_context(|| "failed to run ReplicationConn")?; + .context("failed to run ReplicationConn")?; } SafekeeperPostgresCommand::IdentifySystem => { self.handle_identify_system(pgb)?; diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index e50fec76f7..3378833dc1 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -137,7 +137,7 @@ impl<'pg> ReceiveWalConn<'pg> { .timeline .get() .process_msg(&msg) - .with_context(|| "failed to process ProposerAcceptorMessage")?; + .context("failed to process ProposerAcceptorMessage")?; if let Some(reply) = reply { self.write_msg(&reply)?; } diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index 72a0d4bcf3..1a8633edd0 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -1,13 +1,8 @@ //! Acceptor part of proposer-acceptor consensus algorithm. -use anyhow::Context; -use anyhow::{anyhow, bail, Result}; -use byteorder::LittleEndian; -use byteorder::ReadBytesExt; -use bytes::Buf; -use bytes::BufMut; -use bytes::Bytes; -use bytes::BytesMut; +use anyhow::{bail, Context, Result}; +use byteorder::{LittleEndian, ReadBytesExt}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use postgres_ffi::waldecoder::WalStreamDecoder; use postgres_ffi::xlog_utils::TimeLineID; use serde::{Deserialize, Serialize}; @@ -346,7 +341,7 @@ impl ProposerAcceptorMessage { let rec_size = hdr .end_lsn .checked_sub(hdr.begin_lsn) - .ok_or_else(|| anyhow!("begin_lsn > end_lsn in AppendRequest"))? + .context("begin_lsn > end_lsn in AppendRequest")? .0 as usize; if rec_size > MAX_SEND_SIZE { bail!( @@ -362,7 +357,7 @@ impl ProposerAcceptorMessage { Ok(ProposerAcceptorMessage::AppendRequest(msg)) } - _ => Err(anyhow!("unknown proposer-acceptor message tag: {}", tag,)), + _ => bail!("unknown proposer-acceptor message tag: {}", tag,), } } } @@ -567,7 +562,7 @@ where self.s.server.wal_seg_size = msg.wal_seg_size; self.storage .persist(&self.s) - .with_context(|| "failed to persist shared state")?; + .context("failed to persist shared state")?; self.metrics = SafeKeeperMetrics::new(self.s.server.timeline_id); diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index 6fd156f868..32d030d006 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -3,7 +3,7 @@ use crate::handler::SafekeeperPostgresHandler; use crate::timeline::{ReplicaState, Timeline, TimelineTools}; -use anyhow::{anyhow, bail, Context, Result}; +use anyhow::{bail, Context, Result}; use postgres_ffi::xlog_utils::{ get_current_timestamp, TimestampTz, XLogFileName, MAX_SEND_SIZE, PG_TLI, @@ -156,7 +156,7 @@ impl ReplicationConn { // Shutdown the connection, because rust-postgres client cannot be dropped // when connection is alive. let _ = stream_in.shutdown(Shutdown::Both); - return Err(anyhow!("Copy failed")); + bail!("Copy failed"); } _ => { // We only handle `CopyData`, 'Sync', 'CopyFail' messages. Anything else is ignored. diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index c7217b97e8..3e83c0151a 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -140,7 +140,7 @@ impl SharedState { create: CreateControlFile, ) -> Result { let state = FileStorage::load_control_file_conf(conf, timeline_id, create) - .with_context(|| "failed to load from control file")?; + .context("failed to load from control file")?; let file_storage = FileStorage::new(timeline_id, conf); let flush_lsn = if state.server.wal_seg_size != 0 { let wal_dir = conf.timeline_dir(&timeline_id); @@ -377,7 +377,7 @@ impl GlobalTimelines { fs::create_dir_all(timeline_id.to_string())?; let shared_state = SharedState::create_restore(conf, timeline_id, create) - .with_context(|| "failed to restore shared state")?; + .context("failed to restore shared state")?; let new_tli = Arc::new(Timeline::new(timeline_id, shared_state)); timelines.insert((tenant_id, timeline_id), Arc::clone(&new_tli)); @@ -470,7 +470,7 @@ impl FileStorage { let mut buf = Vec::new(); control_file .read_to_end(&mut buf) - .with_context(|| "failed to read control file")?; + .context("failed to read control file")?; let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]); @@ -554,7 +554,7 @@ impl Storage for FileStorage { // fsync the directory (linux specific) File::open(&self.timeline_dir) .and_then(|f| f.sync_all()) - .with_context(|| "failed to sync control file directory")?; + .context("failed to sync control file directory")?; Ok(()) } diff --git a/zenith/src/main.rs b/zenith/src/main.rs index 293b4ef464..976cbfac0c 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -1,5 +1,4 @@ -use anyhow::{anyhow, bail}; -use anyhow::{Context, Result}; +use anyhow::{bail, Context, Result}; use clap::{App, AppSettings, Arg, ArgMatches, SubCommand}; use control_plane::compute::ComputeControlPlane; use control_plane::local_env; @@ -271,7 +270,7 @@ fn print_branches_tree(branches: Vec) -> Result<()> { if let Some(tid) = &branch.ancestor_id { branches_hash .get_mut(tid) - .with_context(|| "missing branch info in the HashMap")? + .context("missing branch info in the HashMap")? .children .push(branch.timeline_id.to_string()); } @@ -310,7 +309,7 @@ fn print_branch( .info .ancestor_lsn .as_ref() - .with_context(|| "missing branch info in the HashMap")?; + .context("missing branch info in the HashMap")?; let mut br_sym = "┣━"; // Draw each nesting padding with proper style @@ -356,7 +355,7 @@ fn print_branch( &is_last_new, branches .get(child) - .with_context(|| "missing branch info in the HashMap")?, + .context("missing branch info in the HashMap")?, branches, )?; } @@ -402,10 +401,10 @@ fn handle_init(init_match: &ArgMatches) -> Result<()> { default_conf() }; - let mut env = LocalEnv::create_config(&toml_file) - .with_context(|| "Failed to create zenith configuration")?; + let mut env = + LocalEnv::create_config(&toml_file).context("Failed to create zenith configuration")?; env.init() - .with_context(|| "Failed to initialize zenith repository")?; + .context("Failed to initialize zenith repository")?; // Call 'pageserver init'. let pageserver = PageServerNode::from_env(&env); @@ -462,7 +461,7 @@ fn handle_branch(branch_match: &ArgMatches, env: &local_env::LocalEnv) -> Result if let Some(branchname) = branch_match.value_of("branchname") { let startpoint_str = branch_match .value_of("start-point") - .ok_or_else(|| anyhow!("Missing start-point"))?; + .context("Missing start-point")?; let branch = pageserver.branch_create(branchname, startpoint_str, &tenantid)?; println!( "Created branch '{}' at {:?} for tenant: {}", @@ -574,7 +573,7 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { let node = cplane .nodes .get(&(tenantid, node_name.to_owned())) - .ok_or_else(|| anyhow!("postgres {} is not found", node_name))?; + .with_context(|| format!("postgres {} is not found", node_name))?; node.stop(destroy)?; } diff --git a/zenith_utils/src/postgres_backend.rs b/zenith_utils/src/postgres_backend.rs index d55ead93bc..ad69d629b7 100644 --- a/zenith_utils/src/postgres_backend.rs +++ b/zenith_utils/src/postgres_backend.rs @@ -5,7 +5,7 @@ use crate::pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage, FeStartupPacket}; use crate::sock_split::{BidiStream, ReadStream, WriteStream}; -use anyhow::{anyhow, bail, ensure, Result}; +use anyhow::{bail, ensure, Context, Result}; use bytes::{Bytes, BytesMut}; use rand::Rng; use serde::{Deserialize, Serialize}; @@ -206,7 +206,7 @@ impl PostgresBackend { fn get_stream_in(&mut self) -> Result<&mut BidiStream> { match &mut self.stream { Some(Stream::Bidirectional(stream)) => Ok(stream), - _ => Err(anyhow!("reader taken")), + _ => bail!("reader taken"), } } @@ -399,9 +399,7 @@ impl PostgresBackend { match self.auth_type { AuthType::Trust => unreachable!(), AuthType::MD5 => { - let (_, md5_response) = m - .split_last() - .ok_or_else(|| anyhow!("protocol violation"))?; + let (_, md5_response) = m.split_last().context("protocol violation")?; if let Err(e) = handler.check_auth_md5(self, md5_response) { self.write_message(&BeMessage::ErrorResponse(format!("{}", e)))?; @@ -409,9 +407,7 @@ impl PostgresBackend { } } AuthType::ZenithJWT => { - let (_, jwt_response) = m - .split_last() - .ok_or_else(|| anyhow!("protocol violation"))?; + let (_, jwt_response) = m.split_last().context("protocol violation")?; if let Err(e) = handler.check_auth_jwt(self, jwt_response) { self.write_message(&BeMessage::ErrorResponse(format!("{}", e)))?; diff --git a/zenith_utils/src/pq_proto.rs b/zenith_utils/src/pq_proto.rs index 3ad4f41ee2..b8b92444ac 100644 --- a/zenith_utils/src/pq_proto.rs +++ b/zenith_utils/src/pq_proto.rs @@ -2,7 +2,7 @@ //! //! on message formats. -use anyhow::{anyhow, bail, ensure, Result}; +use anyhow::{bail, ensure, Context, Result}; use byteorder::{BigEndian, ByteOrder}; use byteorder::{ReadBytesExt, BE}; use bytes::{Buf, BufMut, Bytes, BytesMut}; @@ -122,7 +122,7 @@ impl FeMessage { // The message length includes itself, so it better be at least 4 let bodylen = len .checked_sub(4) - .ok_or_else(|| anyhow!("invalid message length: parsing u32"))?; + .context("invalid message length: parsing u32")?; // Read message body let mut body_buf: Vec = vec![0; bodylen as usize]; @@ -144,7 +144,7 @@ impl FeMessage { b'c' => Ok(Some(FeMessage::CopyDone)), b'f' => Ok(Some(FeMessage::CopyFail)), b'p' => Ok(Some(FeMessage::PasswordMessage(body))), - tag => Err(anyhow!("unknown message tag: {},'{:?}'", tag, body)), + tag => bail!("unknown message tag: {},'{:?}'", tag, body), } } } @@ -203,9 +203,9 @@ impl FeStartupPacket { let mut params_tokens = params_str.split('\0'); let mut params: HashMap = HashMap::new(); while let Some(name) = params_tokens.next() { - let value = params_tokens.next().ok_or_else(|| { - anyhow!("expected even number of params in StartupMessage") - })?; + let value = params_tokens + .next() + .context("expected even number of params in StartupMessage")?; if name == "options" { // deprecated way of passing params as cmd line args for cmdopt in value.split(' ') {